how-we-leverage-postgres-for-our-search-data-processing-pipeline-at-getyourguide
Engineering
Feb 21, 2024

How we Leverage Postgres for our Search Data Processing Pipeline at GetYourGuide

Dharin Shah
Senior Software Engineer

Dharin Shah, Senior Software Engineer, shares how we ramped up search functionality by transitioning from a Kafka Streams-based system to a PostgreSQL + Kafka architecture—enhancing scalability, efficiency, and cost-effectiveness.

{{divider}}

An introduction to Search at GetYourGuide

In the ever-evolving tech world, adaptability and innovation are paramount. At GetYourGuide, we embarked on a transformative journey, transitioning from a Kafka Streams-based system to PostgreSQL + Kafka for our data processing needs. The biggest reason for taking this journey was a scalability problem with our previous solution. As the company was growing—and still is!—we weren’t confident the old system would scale as well. 

Before diving into the crux of technological problems, Let’s first understand why this data processing is essential before tackling the core technical issues. The search function in GetYourGuide reflects the global state, supporting more than 22 languages worldwide—with ongoing expansion to additional languages. It offers an API with versatile options for filtering and aggregating data. The search feature generates the initial content you see on our website. However, each component of this content is sourced from services external to our search system.

A snapshot of GetYourGuide’s Search experience

This core requirement led the Search team to build data processing to combine various data sources into a view of the state of the GetYourGuide world. It's important to highlight that by having our APIs as read-only for clients and adopting a denormalized document data model in OpenSearch, we aimed to optimize for read operations. This approach allowed us to asynchronously establish relationships in our processing pipeline, enhancing read efficiency by reducing the need for complex joins or queries. As you see in the screenshot above—and you can also visit getyourguide.com to explore further—the first page shows the data pulled from the search APIs. And almost all pages in GetYourGuide show data served via Search APIs. Now, let’s dive into the technical challenge.

The Kafka Streams era

Representation of our Kafka Streams topology 

To build the data served via APIs, we must process it from various sources, combining and mapping it to the internal search world. To achieve that, we built our initial system on Kafka Streams. It was designed to process activity data, which was then denormalized and saved in OpenSearch to serve via the API. This data was a complex amalgamation of various elements like locations, categories, rankings, reviews, and more. To combine all this data, we heavily relied on Kafka Streams Ktable joins. 

As you can see in the image above, the complexity is not simplified even with visualization.

As our data grew and our requirements became intricate, we encountered several challenges:

  • Complexity with Kafka Streams DSL: Adding new data sources was a monumental task. The Kafka Streams DSL, while powerful, introduced a lot of hidden complexity. For instance, consider the Kafka Streams DSL function for activity and location processing. Although the code looks terribly simple, behind the scenes of each operation,heavy stateful processing is being triggered.  Making it unclear how the intermediate operations would change if we were to add or change the data source.


KTable groupLocationsByActivity(
    KTable catalogActivityTable,
    KTable locationTable
) {
// functional code to do quite heavy stateful operations
catalogActivityTable.filter().join(locationTable, keyExtractor).aggregate(..)
}

Every time we added or changed code, we had to reprocess all the data from the beginning. This caused more updates and put extra strain on Kafka brokers and OpenSearch—our search API DB. It is possible to add various data types and make the system more adaptable using Kafka Streams Processor API, which offers more control over data flow. However, this makes the code more complex and doesn’t solve the issue of scaling.

  • Scalability Concerns: Our system needed to be more scalable. With the addition of new languages and demands to support more, we had to increase the number of topics and partitions. We were nearing the limits set by our Kafka cluster.
  • Debugging Nightmares: Debugging issues with Kafka Streams was a daunting task. The complexity of the system made it hard to pinpoint and resolve issues.
  • Custom notifications: With Kafka streams and KTable, we could not easily control the flow of “notification” for an update—it is possible but again increases complexity. The new solution also addressed this problem.
  • Cost Overruns: We were spending a significant amount on storage and system resources with our streams application. The system needed to be more cost-effective.

Essentially, we had to send all our processed data back to Kafka to recover it if we restarted. This was because we didn’t have Persistent Volumes in our Kubernetes cluster.

The shift

The realization dawned on us that a paradigm shift was essential to address these challenges. We decided to transition our data processing from Kafka Streams to PostgreSQL. This decision was influenced by several factors:

  • Scaling concerns while adding new languages made it crucial for us to look for a better direction for our processing pipeline
  • In contrast to Kafka Streams DSL, SQL queries are straightforward to write, understand, and debug.
  • Flexibility: PostgreSQL offers out-of-line storage and powerful JSON functions, enabling us to process and merge data with finesse. POC with JSON functions went smoothly, again confirming our requirement of having an easier time debugging.
  • Infrastructure was readily available via AWS Aurora.

Transition

The transition was not without its challenges. We had to redesign our data processing flow, keeping in mind our previous system’s limitations and PostgreSQL’s capabilities.

New architecture

Representation of our new state management

Our new data processing flow was designed to be efficient, simple, scalable, and easy to manage:

  • We listened to various source data from Kafka and saved the data in individual tables/rows in PostgreSQL.
  • We pushed a notification in the form of an update message for the entity that has been updated to Kafka after the transaction was committed. We leveraged Kafka as a stateless notification system that effectively made all the operations real-time, efficient, and super fast. Micro-batching gave us the best of both worlds by providing efficient I/O. This decoupled “updates” and helped control the data flow without adding additional complexity.
  • The preparation stage (merge step described below) basically combines all the data we have for an entity via a bunch of SQLs and prepares that data in an intermediate table. All of this is done directly on the DB to avoid unnecessary IOPS.
  • We used PostgreSQL’s JSON features and our own buffering method to combine updates for the same activity and handle the data efficiently.

Merge Step

One of the significant positive changes was the introduction of a custom PostgreSQL function to merge JSON data. This simplified our merging strategy:


create aggregate jsonb_merge_agg(jsonb)
(
    sfunc = jsonb_concat,
    stype = jsonb,
    initcond = ’{}’
);

create function jsonb_concat(a jsonb, b jsonb) returns jsonb
    as ’select $1 || $2’
    language sql
    immutable
    parallel safe;

This simple function allowed us to merge data from various sources efficiently without the need for complex Kafka Streams operations. It also meant that the database merged data without “actually” integrating any business logic with the data.

An example to demonstrate our merge step

All these JSONs are saved as individual pieces, and the merge step combines them based on some conditions that are coded in the SQLs and merged using the JSON function above.

Simplified representation of complex SQL for Data Aggregation:


INSERT INTO summary_table(entity_id, lang_code, aggregated_data, updated_at)
SELECT entity_id, lang_code, jsonb_merge_agg(data), NOW()
FROM (
    SELECT
        entity_id,
        lang_code,
        jsonb_build_object('data_key', jsonb_agg(data)) as data
    FROM entity_map
    JOIN language_entity ON entity_map.related_id = language_entity.id
    WHERE entity_id IN :ids
    GROUP BY entity_id, lang_code
) AS combined
WHERE entity_id IN :ids
GROUP BY entity_id, lang_code
ON CONFLICT (entity_id, lang_code) DO UPDATE
SET aggregated_data = excluded.aggregated_data, updated_at = NOW();

Benefits of the new flow

  • Ease of adding new sources: With our new system, adding a new data source was as simple as consuming from a topic and saving it to a common existing table in PostgreSQL—this also avoids running migrations every time we have a new data source.
  • Scalability: Our new system was scalable. We could easily add new languages without worrying about Kafka cluster limitations.
  • Efficient debugging: Debugging became a breeze. With simple SQL queries, we could quickly identify and resolve issues.

Another important piece was the introduction of buffering logic in our new architecture. This buffering logic was designed to handle notification updates from Kafka efficiently—check out the code here.

This buffering logic allowed us to process and merge data efficiently, ensuring our system was fast and reliable.

The impact and future

The impact of our transition was profound:

  • Improved efficiency: Our data processing became faster and more efficient. We no longer faced the scalability issues that plagued our Kafka Streams-based system.
  • Improved stability: Due to the simplicity of the system and its ability to scale with ease, stability increased significantly. Eight months after the rollout, we haven’t had any changes in the pipeline. 
  • Cost savings: We saved a significant amount on storage and services. This allowed us to allocate resources to other critical areas. Our storage was reduced from ~20TB to a few GBs and significantly reduced IOPS.
  • Future Proofing: Our new system is designed to be scalable and flexible, ensuring it can handle our future data processing needs. We are now constantly making product improvements—instead of working on stability—and the new system allows us to add new features without breaking a sweat. 
  • Options for Scaling: Our system's scaling strategy is straightforward when dealing with increased data volumes. The bulk of our complex write operations is concentrated in a single primary step, allowing us to leverage read replicas for most aggregation tasks. To accommodate growth, we can easily expand by adding more read replicas, scaling up Kafka consumers/partitions, and it's worth noting that our current setup operates at only 20% of its full capacity, offering significant room for scaling up effortlessly.
The total storage used by search kafka streams topics (in TBs)

This image shows the amount of storage shaved off as the storage for our usage increased. The metric shows a decline when we migrated and cleaned up gradually. 

Furthermore, we have extended this architecture to other entities in our search system, making our entire data-processing ecosystem more robust and efficient. As a penultimate remark, we tied most of our operations to the database compute—i.e., letting the database do all the work—and services only bare the final result of all the intermediate operations. That effectively helped save quite a lot of Network IO and increased performance benefits with reduced cost. 

Conclusion

Our journey from Kafka Streams to stateless Kafka and PostgreSQL was filled with challenges but also a journey of learning, innovation, and growth. Today, our data processing system is more efficient, scalable, and cost-effective. We are better positioned to handle our future data processing needs and continue to deliver value.

Shoutout

Viktoriia Kucherenko for supporting through the project and initiation that led to prioritizing the project.

Other articles from this series
No items found.

Featured roles

Marketing Executive
Berlin
Full-time / Permanent
Marketing Executive
Berlin
Full-time / Permanent
Marketing Executive
Berlin
Full-time / Permanent

Join the journey.

Our 800+ strong team is changing the way millions experience the world, and you can help.

Keep up to date with the latest news

Oops! Something went wrong while submitting the form.