How to proactively avoid micro-batch data loss or duplication during Structured Streaming in high-volume Kafka-to-Azure SQL pipeline?

Janice Chi 100 Reputation points
2025-06-10T08:04:08.79+00:00

We are currently implementing a near real-time streaming architecture as part of a modernization project. In our streaming phase, we are consuming data from Kafka topics (one per table, approx. 800 total) using Databricks Structured Streaming and writing to Azure SQL Hyperscale.

Our event throughput ranges between 3,000 to 25,000 events per second, with auto-scaling enabled in the source Kafka cluster. Streaming jobs are designed to ingest per micro-batch (e.g., every 10 seconds) with checkpointing and offset tracking enabled.

We have one key concern based on our streaming tests and industry patterns:


Scenario:

We noticed that if a micro-batch job fails after consuming Kafka data but before committing the Kafka offsets, then on job restart, the same batch is reprocessed. This can result in duplicate rows in the sink if the writes are not idempotent. We understand this falls under the at-least-once processing model and is generally handled using deduplication or upserts.


Our Questions:

Is this type of scenario common in high-throughput Databricks-Kafka pipelines (especially with workloads of 3,000–25,000 events/sec and hundreds of partitions)? Are there any thresholds beyond which offset commit issues become more likely?

What are the recommended best practices from Microsoft and Databricks to:

Proactively avoid such situations?

  Minimize chances of duplicate data due to offset commit failures?
  
     Ensure correctness and reliability of the data pipeline during batch failures or restarts?
     
     What actions should we take **if such a situation has already occurred** (e.g., duplicates present in Azure SQL due to reprocessed batches)?
     
     Is it a recommended practice to maintain a **custom control table** in addition to Databricks checkpointing, to log:
     
        Batch ID
        
           Kafka min_offset and max_offset
           
              Row count
              
                 Hash value
                 
                    Status
                    
                    Would this provide better observability and fault recovery?
                    

We are looking to make this pipeline as resilient and audit-friendly as possible while keeping performance optimized. Any official documentation, sample architectures, or field experience guidance from Microsoft would be highly appreciated.

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,482 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. J N S S Kasyap 3,300 Reputation points Microsoft External Staff Moderator
    2025-06-10T10:32:49.1966667+00:00

    Hi @Janice Chi
    Your scenario is common in high-throughput, micro-batch-based streaming pipelines using Databricks Structured Streaming with Kafka. Below is a structured, experience-backed answer addressing each concern, with field-proven strategies to prevent micro-batch data loss or duplication and ensure auditability. 

    Is micro-batch reprocessing common in high-throughput pipelines? 

    Yes. This behavior is by design in Structured Streaming’s at-least-once model: 

    Offsets are committed after the micro-batch is successfully processed and checkpointed. 

    • If the pipeline crashes after processing but before checkpointing, Spark will reprocess the same offsets on restart. 

    Frequency increases with: 

    High partition counts (hundreds, as in your case). 

    High throughput (25K events/sec). 

    • Sink performance bottlenecks (Azure SQL latency, network issues). 

    Best practices to proactively avoid loss or duplication 

    Use idempotent writes with MERGE(UPSERT) Logic

    For Azure SQL sinks: 

    Avoid plain append mode, which leads to duplication. 

    • Use foreachBatch with a MERGE INTO or stored procedure to perform idempotent upserts
    def upsert_to_sql(df, batch_id):
        df.createOrReplaceTempView("updates")
        upsert_query = """
        MERGE target_table AS t
        USING updates AS s
        ON t.primary_key = s.primary_key
        WHEN MATCHED THEN UPDATE SET ...
        WHEN NOT MATCHED THEN INSERT ...
        """
        df._spark.sql(upsert_query)
    
    
    

    Deduplication in-stream with watermarking

    if the events had Unique ID + Timestamp:

    df.withWatermark("event_time", "10 minutes") \
      .dropDuplicates(["event_id"])
    
    

    Tune Micro batch size

    • Use maxOffsetsPerTrigger to control Kafka pull rate. 

    Ensure each batch is small enough to process within 10–30 seconds. 

    Use these Kafka parameters: 

    • spark.streaming.kafka.maxRatePerPartition 
    • startingOffsets = latest (unless recovery is needed) 

    Isolate write failures

    • Avoid using .writeStream.format("jdbc"). JDBC drivers often don't support transactions well in streaming. 
    • Prefer staging to Delta, then bulk load to SQL using Spark or ADF if you need exactly-once semantics. I hope this information helps. Please do let us know if you have any further queries.

    Kindly consider upvoting the comment if the information provided is helpful. This can assist other community members in resolving similar issues.

    Thank you.

    1 person found this answer helpful.

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.