Skip to main content
← Back to Blogs

Building a Real-time Data Sync Between Postgres and OpenSearch

Published on October 8, 2025

Introduction

This post shows how we built a real-time search service using Change Data Capture (CDC), AWS services, and OpenSearch to sync data instantly. We'll cover the architecture and how we built it.

Why?

Our CEO wanted a new search service that operates in real-time. He had past experiences with significant data lag and wanted to completely avoid it.

Data inconsistencies create frustrating user experiences. Imagine inventory updates showing items as available in the UI but not appearing in search results. Or price changes reflecting on product pages while search results still display old prices.

These issues arise when OpenSearch lags behind the primary database. Our goal was to eliminate this lag by syncing data between Postgres and OpenSearch in real-time.

Our Solution at a Glance

We built a Change Data Capture (CDC)-based Search Service powered by AWS DMS, Lambda, and OpenSearch. Here's the architecture at a high level:

How It Works

1. Change Data Capture Configuration

We configured AWS DMS to track changes on the products and variants tables. Any INSERT, UPDATE, or DELETE event automatically streams into a Kinesis stream. This way, every product change — new SKU, updated price, or modified title — gets detected in seconds.

2. Streaming Through Kinesis

All changes flow into Amazon Kinesis. Each message carries essential details like:

{
  "table": "products",
  "operation": "UPDATE",
  "id": 102939,
  "timestamp": "2025-10-05T18:30:00Z"
}

3. Processing Changes in AWS Lambda

A Lambda function is triggered for every message. Lambda fetches the latest record from our Data Service API to ensure data correctness before publishing to OpenSearch.

Example logic:

def handler(event, context):
    for record in event['Records']:
        id = record['id']
        item = fetch_from_rds(id)
        transformed = transform_item(item)
        publish_to_opensearch(transformed)

This approach ensures:

  • The latest record is indexed (avoiding out-of-order writes)
  • Failed updates can be retried easily
  • Zero long-running servers — we are not adding products daily!

4. OpenSearch

Lambda publishes transformed product data into OpenSearch, where it’s indexed with appropriate analyzers, boosts, and scoring functions.

How Is That Data Being Used?

Once the data is indexed in OpenSearch, we expose it through a Flask API that serves search queries to our frontend applications. The API provides:

  • Product Search: Full-text search across product titles, descriptions, and metadata
  • Filtering: Real-time filtering by category, price range, availability, and other attributes
  • Autocomplete: Fast suggestions as users type
  • Faceted Search: Aggregate results by categories, brands, and price ranges

The Flask API queries OpenSearch with optimized DSL queries and returns results in milliseconds, ensuring a responsive search experience for end users.

What We Haven't Solved (Properly) Yet

When a product is inserted with multiple variants, Lambda triggers separately for each entity once for the product and once per variant. For a product with 4 variants, that's 5 separate Lambda invocations. This works fine at our current scale and is a problem for future us!

Conclusion

Does it work? Yes. Is it perfect? No. But the search is fast, and nobody's complaining (at this moment)!