Skip to main content

Iceberg CDC: Patterns, Best Practices, and Real-World Pipelines

· 14 min read
Cazpian Engineering
Platform Engineering Team

Iceberg CDC Patterns and Pipelines

You have an operational database — PostgreSQL, MySQL, or DynamoDB — and you need its data in your Iceberg lakehouse. Not a daily snapshot dump. Not a nightly batch export. You need changes replicated continuously so that your analytics, ML models, and dashboards reflect reality within minutes.

This is Change Data Capture (CDC) on Iceberg, and it is one of the most common — and most operationally challenging — data engineering patterns in production today. The ingestion part is straightforward. The hard parts are handling deletes efficiently, keeping read performance from degrading, managing schema changes, and operating the pipeline at scale without it falling over at 3 AM.

This guide covers the two primary CDC architectures (direct materialization and the bronze-silver pattern), table design for CDC workloads, Iceberg's built-in CDC capabilities, compaction strategies, and the operational patterns that keep CDC pipelines healthy in production.

Two CDC Architectures

There are two fundamental approaches to getting CDC data into Iceberg. Which one you choose depends on your tooling, transformation requirements, and operational maturity.

Detailed diagram of Iceberg CDC pipeline architectures showing direct CDC with Flink and the bronze-silver pattern with Spark MERGE INTO, including table design, compaction, and monitoring

Source DB → Debezium → Kafka → Flink / Kafka Connect → Iceberg Table

In this architecture, the CDC tool reads the database's transaction log (WAL for PostgreSQL, binlog for MySQL), publishes change events to Kafka, and a downstream consumer (Flink or Kafka Connect with an Iceberg sink) applies those changes directly to a "mirror" Iceberg table.

How it works:

  1. Debezium captures INSERT, UPDATE, and DELETE events from the source database's transaction log.
  2. Events are published to a Kafka topic in a CDC format (Debezium JSON, Avro, or Protobuf).
  3. Flink (or a Kafka Connect Iceberg sink) consumes these events and applies them to an Iceberg table — inserting new rows, updating existing ones, and deleting removed ones.

Flink example:

-- Flink SQL: Create a CDC source from Kafka
CREATE TABLE kafka_cdc_source (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);

-- Flink SQL: Write CDC events directly to Iceberg
CREATE TABLE iceberg_orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'iceberg_catalog',
'catalog-type' = 'rest',
'uri' = 'https://polaris.your-cloud.com/api/v1',
'warehouse' = 's3://your-bucket/warehouse'
);

-- Apply CDC events
INSERT INTO iceberg_orders SELECT * FROM kafka_cdc_source;

Flink natively understands CDC semantics. When it receives a Debezium UPDATE event (which contains both the before and after images), Flink writes an equality delete for the old row and an insert for the new row. Flink is currently the only engine that supports writing equality deletes to Iceberg — Spark and Trino use position deletes, which require knowing the exact file and row position.

Advantages:

  • Near-real-time latency (seconds to minutes).
  • Flink handles the merge logic — no manual MERGE INTO statements.
  • Equality deletes are more efficient than position deletes for CDC workloads.

Limitations:

  • Requires a Flink cluster (operational complexity).
  • Limited transformation capabilities — you get a mirror of the source table.
  • Schema evolution requires careful coordination between Debezium, Kafka, and Flink.

Architecture 2: Bronze-Silver Pattern (Spark MERGE INTO)

Source DB → Debezium → Kafka → Bronze Iceberg Table (raw events)


Spark MERGE INTO


Silver Iceberg Table (materialized)

In this architecture, raw CDC events are appended to a "bronze" Iceberg table without any merge logic. A separate Spark job periodically reads new events from the bronze table and applies them to a "silver" materialized table using MERGE INTO.

Step 1: Land raw CDC events in a bronze table.

CREATE TABLE bronze.orders_cdc (
-- CDC metadata
op STRING, -- 'c' (create), 'u' (update), 'd' (delete), 'r' (read/snapshot)
ts_ms BIGINT, -- source database timestamp
source_table STRING,
-- Payload (after image for inserts/updates, before image for deletes)
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (day(updated_at))
TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.target-file-size-bytes' = '268435456',
'write.parquet.compression-codec' = 'zstd'
);

The bronze table is append-only. Every CDC event is recorded as-is, preserving the full change history. This is your audit trail and your reprocessing safety net.

Step 2: Materialize into a silver table with MERGE INTO.

-- Get new CDC events since the last processed snapshot
CREATE OR REPLACE TEMP VIEW new_events AS
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY ts_ms DESC
) AS rn
FROM bronze.orders_cdc
WHERE updated_at >= '2026-02-12' -- incremental window
) WHERE rn = 1; -- latest event per key

-- Apply to silver table
MERGE INTO silver.orders t
USING new_events s
ON t.order_id = s.order_id
WHEN MATCHED AND s.op = 'd' THEN
DELETE
WHEN MATCHED AND s.op IN ('u', 'c') THEN
UPDATE SET
t.customer_id = s.customer_id,
t.amount = s.amount,
t.status = s.status,
t.updated_at = s.updated_at
WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN
INSERT (order_id, customer_id, amount, status, updated_at)
VALUES (s.order_id, s.customer_id, s.amount, s.status, s.updated_at)

Advantages:

  • Full control over transformation logic.
  • Bronze table preserves complete change history for audit and replay.
  • Uses familiar Spark SQL — no Flink cluster required.
  • Can handle complex transformations (joins, aggregations, data quality checks) before materializing.

Limitations:

  • Higher latency (minutes to hours, depending on MERGE frequency).
  • Requires managing two tables (bronze and silver) and the MERGE job.
  • MERGE INTO can be expensive without proper push-down predicates.

Choosing Between the Two

FactorDirect CDC (Flink)Bronze-Silver (Spark)
LatencySeconds to minutesMinutes to hours
Operational complexityFlink cluster + KafkaSpark job (scheduled or streaming)
TransformationLimited (mirror only)Full (joins, aggregations, DQ)
Audit trailNo (events applied directly)Yes (bronze table preserves history)
Replay capabilityLimitedFull (reprocess from bronze)
Delete handlingEquality deletes (Flink)Position deletes via MERGE (Spark)
Schema evolutionRequires coordinationHandle in MERGE logic
Best forSimple replication, low latencyComplex pipelines, regulated industries

Table Design for CDC Workloads

CDC tables have unique requirements that differ from append-only analytical tables. Here are the key design decisions.

Partition Strategy

CDC workloads are update-heavy — most operations modify existing rows rather than appending new ones. This fundamentally changes the partition design:

-- CDC mirror table: bucket by primary key
CREATE TABLE silver.orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, order_id))
TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.target-file-size-bytes' = '268435456',
'write.parquet.compression-codec' = 'zstd',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read',
'write.metadata.previous-versions-max' = '15',
'write.metadata.delete-after-commit.enabled' = 'true',
'commit.retry.num-retries' = '10'
);

Why bucket(order_id) instead of day(updated_at)?

  • CDC MERGE INTO joins on the primary key (order_id). Bucketing by order_id means the merge only scans 1/16th of the table for each bucket — the partition predicate naturally aligns with the join key.
  • Time-based partitioning (day, month) does not help CDC merges because updated rows can belong to any time partition. A single MERGE batch might update orders from the last 6 months.

When to add time partitioning alongside buckets:

If your downstream queries always filter by time (dashboards showing "last 7 days"), add a compound partition:

PARTITIONED BY (bucket(16, order_id), day(updated_at))

But be careful — this increases the number of partitions and may create undersized files if the per-day-per-bucket volume is small.

Merge-on-Read Is Almost Always Right for CDC

CDC tables receive frequent updates. Copy-on-write rewrites entire files for each update — unacceptable for tables merged every hour. Merge-on-read writes small delete files instead, making each merge fast.

The trade-off is that reads slow down as delete files accumulate. This is why compaction is critical for CDC tables (covered below).

Iceberg's Built-In CDC: create_changelog_view

Iceberg itself can act as a CDC source. The create_changelog_view procedure generates a view showing row-level changes between two snapshots — including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER change types.

Basic Usage

CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '1234567890',
'end-snapshot-id', '9876543210'
)
);

-- Query the changelog view
SELECT * FROM silver.orders_changes;

The view includes a _change_type column with values:

  • INSERT — new row added.
  • DELETE — row removed.
  • UPDATE_BEFORE — the row's state before an update.
  • UPDATE_AFTER — the row's state after an update.

Using Identifier Columns for Update Detection

By specifying identifier_columns, Iceberg can distinguish updates from delete-insert pairs. If two rows (one deleted, one inserted) share the same identifier column values, Iceberg classifies them as UPDATE_BEFORE and UPDATE_AFTER:

CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '1234567890',
'end-snapshot-id', '9876543210'
),
identifier_columns => array('order_id')
);

Use Cases for create_changelog_view

Use CaseHow
Forward CDC from Iceberg to another systemRead the changelog view and publish events to Kafka or another sink.
Audit trailQuery changes between snapshots for compliance reporting.
Incremental ETLRead only the changed rows to feed downstream aggregation tables.
Data quality monitoringCompare changes between snapshots to detect anomalies (sudden spikes in deletes, unexpected NULL values).

Incremental Processing with Changelog View

Instead of processing the entire table on every ETL run, use the changelog view to process only what changed:

# Track the last processed snapshot
last_snapshot = spark.sql("""
SELECT snapshot_id FROM etl_control.checkpoints
WHERE table_name = 'silver.orders'
""").collect()[0].snapshot_id

current_snapshot = spark.sql("""
SELECT snapshot_id FROM silver.orders.snapshots
ORDER BY committed_at DESC LIMIT 1
""").collect()[0].snapshot_id

if last_snapshot != current_snapshot:
# Create changelog view for the delta
spark.sql(f"""
CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '{last_snapshot}',
'end-snapshot-id', '{current_snapshot}'
),
identifier_columns => array('order_id')
)
""")

# Process only changed rows
changes = spark.sql("""
SELECT * FROM silver.orders_changes
WHERE _change_type IN ('INSERT', 'UPDATE_AFTER')
""")

# Apply to downstream table
changes.writeTo("gold.order_summary").append()

# Update checkpoint
spark.sql(f"""
UPDATE etl_control.checkpoints
SET snapshot_id = '{current_snapshot}',
processed_at = current_timestamp()
WHERE table_name = 'silver.orders'
""")

Compaction: The Make-or-Break for CDC Tables

CDC tables degrade faster than any other Iceberg table pattern. Every MERGE creates delete files (MOR) or rewrites files (COW). Without compaction, read performance degrades linearly with the number of merges.

How Delete Files Accumulate

After 1 merge:   10 data files + 10 delete files
After 10 merges: 10 data files + 100 delete files
After 50 merges: 10 data files + 500 delete files
(each read must reconcile 500 delete files)

At 500 delete files, a query that should take 3 seconds takes 30+ seconds because the reader must cross-reference every data file row against every delete file.

Compaction Strategy for CDC Tables

-- Aggressive compaction for CDC tables
CALL system.rewrite_data_files(
table => 'silver.orders',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '268435456',
'min-file-size-bytes', '104857600',
'delete-file-threshold', '3'
)
);

-- Follow up with snapshot expiry
CALL system.expire_snapshots(
table => 'silver.orders',
older_than => TIMESTAMP '2026-02-10 00:00:00',
retain_last => 48
);

-- Remove orphan files
CALL system.remove_orphan_files(
table => 'silver.orders',
older_than => TIMESTAMP '2026-02-09 00:00:00'
);

Compaction Schedule

MERGE FrequencyCompaction Intervalretain_last for expire_snapshots
Every 5 minutes (streaming)Every 2 hours100 (keeps ~8 hours of snapshots)
Every 30 minutesEvery 4 hours48 (keeps ~24 hours)
Every hourEvery 6 hours24 (keeps ~24 hours)
Once per day (batch)Weekly14 (keeps ~2 weeks)

Monitoring Compaction Health

-- Check delete file count (should be < 100)
SELECT
COUNT(*) AS delete_file_count,
SUM(record_count) AS total_delete_records,
ROUND(SUM(file_size_in_bytes) / 1048576, 1) AS total_delete_mb
FROM silver.orders.delete_files;

-- Check data file health
SELECT
COUNT(*) AS file_count,
ROUND(AVG(file_size_in_bytes) / 1048576, 1) AS avg_file_mb,
ROUND(MIN(file_size_in_bytes) / 1048576, 1) AS min_file_mb
FROM silver.orders.files;

-- Alert if delete files exceed threshold
SELECT
CASE
WHEN COUNT(*) > 100 THEN 'CRITICAL: Compact immediately'
WHEN COUNT(*) > 50 THEN 'WARNING: Schedule compaction'
ELSE 'OK'
END AS status,
COUNT(*) AS delete_file_count
FROM silver.orders.delete_files;

Handling Schema Evolution in CDC

Source databases change their schemas — new columns are added, types are widened, columns are renamed. Your CDC pipeline must handle these changes without breaking.

Schema Changes That Iceberg Handles Automatically

ChangeIceberg SupportNotes
Add columnAutomaticNew column is added with NULL values for existing rows.
Widen type (int → long)AutomaticIceberg's schema evolution handles this.
Drop columnManualIceberg does not auto-drop columns. Remove from MERGE SELECT list.
Rename columnManualIceberg can rename, but CDC tools may send the old name.

Safe Schema Evolution Pattern

When a new column appears in the source database:

-- Step 1: Add the column to the Iceberg table
ALTER TABLE silver.orders ADD COLUMN loyalty_points INT;

-- Step 2: Update the MERGE to include the new column
-- (The bronze table already has it if Debezium schema evolution is enabled)
MERGE INTO silver.orders t
USING new_events s
ON t.order_id = s.order_id
WHEN MATCHED THEN
UPDATE SET
t.customer_id = s.customer_id,
t.amount = s.amount,
t.status = s.status,
t.loyalty_points = s.loyalty_points, -- new column
t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT *

Debezium Schema Registry Integration

For automated schema evolution, use Debezium with a Schema Registry (Confluent or Apicurio):

  1. Debezium detects schema changes in the source database.
  2. The new schema is registered in the Schema Registry.
  3. Your CDC consumer reads the schema from the registry and applies ALTER TABLE to the Iceberg table before processing events with the new columns.

This avoids manual intervention for the most common schema changes (adding columns, widening types).

Production Operational Patterns

Pattern 1: Watermark-Based Incremental Processing

Track a high-water mark to process only new CDC events:

# Read high-water mark
hwm = spark.sql("""
SELECT COALESCE(MAX(last_ts_ms), 0) AS hwm
FROM etl_control.watermarks
WHERE pipeline = 'orders_cdc'
""").collect()[0].hwm

# Read new events from bronze
new_events = spark.sql(f"""
SELECT * FROM bronze.orders_cdc
WHERE ts_ms > {hwm}
""")

if new_events.count() > 0:
# Deduplicate: keep latest event per key
deduped = new_events.sql("""
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY order_id ORDER BY ts_ms DESC
) AS rn
FROM new_events
) WHERE rn = 1
""")

# Apply MERGE
# ... (merge logic here)

# Update watermark
new_hwm = new_events.agg({"ts_ms": "max"}).collect()[0][0]
spark.sql(f"""
UPDATE etl_control.watermarks
SET last_ts_ms = {new_hwm}, updated_at = current_timestamp()
WHERE pipeline = 'orders_cdc'
""")

Pattern 2: Dead Letter Queue for Failed Events

Not all CDC events can be processed successfully. Schema mismatches, data type errors, or constraint violations can cause individual events to fail. Rather than failing the entire batch, route failures to a dead letter table:

try:
# Process events
spark.sql("MERGE INTO silver.orders ...")
except Exception as e:
# Write failed events to dead letter table
failed_events.writeTo("bronze.orders_cdc_dlq") \
.option("error_message", str(e)) \
.append()

Pattern 3: Monitoring and Alerting

Track key CDC pipeline metrics:

-- Create a monitoring view
CREATE OR REPLACE VIEW ops.cdc_health AS
SELECT
'orders' AS table_name,
(SELECT COUNT(*) FROM silver.orders.delete_files) AS delete_files,
(SELECT COUNT(*) FROM silver.orders.files) AS data_files,
(SELECT MAX(committed_at) FROM silver.orders.snapshots) AS last_commit,
TIMESTAMPDIFF(
MINUTE,
(SELECT MAX(committed_at) FROM silver.orders.snapshots),
current_timestamp()
) AS minutes_since_last_commit;

Alert thresholds:

  • Delete files > 100: Compaction is overdue.
  • Minutes since last commit > 30: CDC pipeline may be stalled.
  • Average file size < 50 MB: Small file accumulation from frequent merges.

Quick Reference

CDC Table Properties

PropertyRecommended ValueWhy
format-version2Required for MOR and position deletes.
write.merge.modemerge-on-readFast writes for frequent merges.
write.delete.modemerge-on-readFast deletes for CDC.
write.update.modemerge-on-readFast updates for CDC.
write.distribution-modehashPrevents small files.
write.metadata.previous-versions-max15Streaming commits create many metadata files.
write.metadata.delete-after-commit.enabledtrueAuto-clean metadata.
commit.retry.num-retries10Handle concurrent write conflicts.

CDC Pipeline Decision Matrix

QuestionIf YesIf No
Need sub-minute latency?Direct CDC (Flink)Bronze-Silver (Spark)
Need transformation before materialization?Bronze-SilverDirect CDC
Need audit trail of all changes?Bronze-SilverEither
Have Flink expertise?Direct CDCBronze-Silver
Source schema changes frequently?Bronze-Silver (more control)Either
Regulated industry (compliance)?Bronze-Silver (full history)Either

This post is part of our Apache Iceberg deep-dive series. For MERGE INTO optimization techniques, see Writing Efficient MERGE INTO Queries on Iceberg with Spark. For table design, see Iceberg Table Design: Properties, Partitioning, and Commit Best Practices. Check out the full series on our blog.