Kafka Partitionings vs DB partitions

Janice Chi 100 Reputation points
2025-06-11T12:44:08.54+00:00

We are working on a large-scale CDC ingestion pipeline after completion of One time historicsl Migration where we have already imported 80 TB of data vi ADF to bronze layer

where:

  • Source: IBM DB2 (on-prem)

CDC Tool: IBM InfoSphere CDC publishes to Kafka (1 topic = 1 table)

Ingestion: Databricks reads from Kafka and writes to Bronze and Silver layers (Delta format)

Transformation: MERGE INTO is used for upsert logic based on offsets

Historical Data: Already migrated to Bronze layer using FlashCopy-based method, including all partitions

now We are now implementing the CDC catch-up and streaming ingestion using Kafka. Each Kafka topic has multiple partitions, and each partition contains a sequence of offsets. We maintain a control table with metadata at topic-partition-offset granularity.

We have two specific questions regarding the relationship between Kafka partitioning and target Delta partitioning in our setup:


Q1. If a large table is pre-partitioned in Bronze (e.g., 10 physical partitions), and this table's changes are captured via a single Kafka topic with 10 partitions — Does the number of Kafka partitions always align with the number of DB partitions or Delta table partitions? Can we rely on this alignment, or is this mapping independent and arbitrary from Kafka producer side?


Q2. Suppose we are reading Topic 1, Partition 1, Offset range 1 to 100: We know this message range belongs to Topic 1 → Table X and Partition 1 of Kafka.

But during the MERGE INTO operation in the Silver layer:

How can we identify which specific table partition the rows in this offset range belong to?

Since partitioning in the Delta table (e.g., based on a column like claim_month) is logical and Kafka partitions are physical, what’s the best practice to correlate Kafka message offsets to Delta partitions and rows, so that MERGE operations are efficient and do not scan the entire table?

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,482 questions
{count} votes

1 answer

Sort by: Most helpful
  1. J N S S Kasyap 3,300 Reputation points Microsoft External Staff Moderator
    2025-06-11T14:06:29.4066667+00:00

    Hi @Janice Chi

    If a large table is pre-partitioned in Bronze (e.g., 10 physical partitions), and this table's changes are captured via a single Kafka topic with 10 partitions — Does the number of Kafka partitions always align with the number of DB partitions or Delta table partitions? Can we rely on this alignment, or is this mapping independent and arbitrary from Kafka producer side?

    The number of Kafka partitions does not inherently align with the number of database (e.g., IBM DB2) or Delta table partitions (e.g., in the Bronze layer). Kafka partitions are physical divisions managed by the producer’s partitioning strategy (e.g., key-based, round-robin, or custom), while Delta table partitions are logical, typically based on a column like claim_month. Similarly, DB2 partitions are defined by the database’s partitioning scheme, which may not match the Kafka producer’s logic. 

    In your case, where IBM InfoSphere CDC publishes changes to Kafka (1 topic = 1 table), the assignment of messages to Kafka partitions depends on the CDC tool’s partitioning strategy. Unless explicitly configured to use a partitioning key that mirrors the Delta table’s or DB2’s partitioning scheme (e.g., claim_month), the mapping between Kafka partitions and DB/Delta partitions is arbitrary and independent. 

    You cannot rely on an automatic alignment between Kafka partitions and DB/Delta table partitions. To achieve alignment, you must: 

    1. Confirm or configure the Kafka producer (InfoSphere CDC) to partition messages using a key that matches the Delta table’s partitioning column (e.g., claim_month). 
    2. Validate this by inspecting Kafka partition data to ensure messages consistently map to corresponding Delta partitions. 

    Without such configuration, treat Kafka partitions as independent of DB/Delta partitions and design your pipeline to handle this mismatch, such as by extracting the partitioning column from message payloads during processing. 

    How can we efficiently identify and target the relevant Delta table partitions (e.g., claim_month) during a MERGE INTO operation, given that Kafka partitions and offsets (e.g., Topic 1, Partition 1, Offsets 1–100) are physical and do not directly map to the logical partitioning used in Delta Lake? 

    To correlate Kafka message offsets (e.g., Topic 1, Partition 1, Offset 1–100) with Delta table partitions (e.g., partitioned by claim_month) in the Silver layer for efficient MERGE INTO operations, you need to map Kafka messages to Delta partitions without scanning the entire table. Here’s a concise best practice approach 

    Best Practices: Correlate Kafka Offsets to Delta Partitions

    1. Include Partition Key in Kafka Messages Ensure Kafka messages contain the Delta table’s partition column (e.g., claim_month) in the payload or key. For example, a JSON message might include {"claim_id": "123", "claim_month": "2025-01", ...}. This allows mapping messages to Delta partitions like claim_month=2025-01.
    2. Extract Partition key in spark Use Spark Structured Streaming to read Kafka messages and extract claim_month. 
         from pyspark.sql.functions import col, from_json
         # Read Kafka stream
         kafka_df = spark.readStream \
         .format("kafka") \
         .option("kafka.bootstrap.servers", "broker:port") \
         .option("subscribe", "Topic1") \
         .option("startingOffsets", """{"Topic1":{"1":1}}""") \
         .option("endingOffsets", """{"Topic1":{"1":100}}""") \
         .load()
         # Parse JSON and extract claim_month
         schema = "claim_id STRING, claim_month STRING, ..."
         parsed_df = kafka_df.selectExpr("CAST(value AS STRING) as json_value") \
         .select(from_json(col("json_value"), schema).alias("data")) \
         .select("data.*", col("offset").alias("kafka_offset"), col("partition").alias("kafka_partition"))
      
      3.Group by Partition key
      Identify distinct claim_month values to determine affected Delta partitions. 
         partition_values = parsed_df.select("claim_month").distinct().collect()
         partition_values = [row["claim_month"] for row in partition_values]
      
      4.Optimize MERGE INTO with Partition Filtering: Perform MERGE INTO for each claim_month to target specific Delta partitions, avoiding full table scans. 
    python
    from delta.tables import DeltaTable
    
    delta_table = DeltaTable.forPath(spark, "/path/to/silver/tableX")
    for partition_value in partition_values:
        partition_df = parsed_df.filter(col("claim_month") == partition_value)
        delta_table.alias("target") \
            .merge(
                partition_df.alias("source"),
                f"target.claim_month = '{partition_value}' AND target.claim_id = source.claim_id"
            ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
    
    

    5.Track Kafka metadata

    Store kafka_offset, kafka_partition, and kafka_topic in the Delta table for traceability. Example Schema: 

    ColumnTypeDescriptionclaim_idStringUnique row identifierclaim_monthStringPartition key (e.g., 2025-01)......Other data fieldskafka_offsetLongKafka message offsetkafka_partitionintKafka partition numberkafka_topicStringKafka topic name 

    6.Use Checkpointing
    For streaming, use Spark checkpointing to track processed offsets. 

    query = parsed_df.writeStream \
        .format("delta") \
        .option("checkpointLocation", "/path/to/checkpoint") \
        .trigger(availableNow=True) \
        .start("/path/to/silver/tableX")
    
    

    To enhance query performance on the Delta table, apply Z-Order indexing on the claim_month and claim_id columns. This can be done using the command delta_table.optimize().executeZOrderBy("claim_month", "claim_id") in Spark. Z-Order indexing improves data skipping and query efficiency by clustering related data, making MERGE INTO operations faster, especially for large tables.  

    Maintaining appropriate partition granularity, such as monthly partitions (e.g., claim_month=2025-01), is also critical to balance performance and metadata overhead. Overly granular partitions (e.g., daily) can lead to excessive metadata management, slowing down operations. Together, these strategies aligned partitioning, Z-Order indexing, and optimal partition granularity ensure efficient MERGE INTO operations by targeting specific Delta partitions, leveraging claim_month for pruning, and preserving Kafka metadata for traceability. 
    This approach ensures efficient MERGE INTO operations by targeting specific Delta partitions, leveraging claim_month to prune irrelevant data, and maintaining traceability with Kafka metadata.

    I hope this info helpful


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.