Iceberg CDC: Patterns, Best Practices, and Real-World Pipelines
You have an operational database — PostgreSQL, MySQL, or DynamoDB — and you need its data in your Iceberg lakehouse. Not a daily snapshot dump. Not a nightly batch export. You need changes replicated continuously so that your analytics, ML models, and dashboards reflect reality within minutes.
This is Change Data Capture (CDC) on Iceberg, and it is one of the most common — and most operationally challenging — data engineering patterns in production today. The ingestion part is straightforward. The hard parts are handling deletes efficiently, keeping read performance from degrading, managing schema changes, and operating the pipeline at scale without it falling over at 3 AM.
This guide covers the two primary CDC architectures (direct materialization and the bronze-silver pattern), table design for CDC workloads, Iceberg's built-in CDC capabilities, compaction strategies, and the operational patterns that keep CDC pipelines healthy in production.
Two CDC Architectures
There are two fundamental approaches to getting CDC data into Iceberg. Which one you choose depends on your tooling, transformation requirements, and operational maturity.
Architecture 1: Direct CDC (Flink, Debezium Sink)
Source DB → Debezium → Kafka → Flink / Kafka Connect → Iceberg Table
In this architecture, the CDC tool reads the database's transaction log (WAL for PostgreSQL, binlog for MySQL), publishes change events to Kafka, and a downstream consumer (Flink or Kafka Connect with an Iceberg sink) applies those changes directly to a "mirror" Iceberg table.
How it works:
- Debezium captures INSERT, UPDATE, and DELETE events from the source database's transaction log.
- Events are published to a Kafka topic in a CDC format (Debezium JSON, Avro, or Protobuf).
- Flink (or a Kafka Connect Iceberg sink) consumes these events and applies them to an Iceberg table — inserting new rows, updating existing ones, and deleting removed ones.
Flink example:
-- Flink SQL: Create a CDC source from Kafka
CREATE TABLE kafka_cdc_source (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- Flink SQL: Write CDC events directly to Iceberg
CREATE TABLE iceberg_orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'iceberg_catalog',
'catalog-type' = 'rest',
'uri' = 'https://polaris.your-cloud.com/api/v1',
'warehouse' = 's3://your-bucket/warehouse'
);
-- Apply CDC events
INSERT INTO iceberg_orders SELECT * FROM kafka_cdc_source;
Flink natively understands CDC semantics. When it receives a Debezium UPDATE event (which contains both the before and after images), Flink writes an equality delete for the old row and an insert for the new row. Flink is currently the only engine that supports writing equality deletes to Iceberg — Spark and Trino use position deletes, which require knowing the exact file and row position.
Advantages:
- Near-real-time latency (seconds to minutes).
- Flink handles the merge logic — no manual
MERGE INTOstatements. - Equality deletes are more efficient than position deletes for CDC workloads.
Limitations:
- Requires a Flink cluster (operational complexity).
- Limited transformation capabilities — you get a mirror of the source table.
- Schema evolution requires careful coordination between Debezium, Kafka, and Flink.
Architecture 2: Bronze-Silver Pattern (Spark MERGE INTO)
Source DB → Debezium → Kafka → Bronze Iceberg Table (raw events)
│
▼
Spark MERGE INTO
│
▼
Silver Iceberg Table (materialized)
In this architecture, raw CDC events are appended to a "bronze" Iceberg table without any merge logic. A separate Spark job periodically reads new events from the bronze table and applies them to a "silver" materialized table using MERGE INTO.
Step 1: Land raw CDC events in a bronze table.
CREATE TABLE bronze.orders_cdc (
-- CDC metadata
op STRING, -- 'c' (create), 'u' (update), 'd' (delete), 'r' (read/snapshot)
ts_ms BIGINT, -- source database timestamp
source_table STRING,
-- Payload (after image for inserts/updates, before image for deletes)
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (day(updated_at))
TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.target-file-size-bytes' = '268435456',
'write.parquet.compression-codec' = 'zstd'
);
The bronze table is append-only. Every CDC event is recorded as-is, preserving the full change history. This is your audit trail and your reprocessing safety net.
Step 2: Materialize into a silver table with MERGE INTO.
-- Get new CDC events since the last processed snapshot
CREATE OR REPLACE TEMP VIEW new_events AS
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY ts_ms DESC
) AS rn
FROM bronze.orders_cdc
WHERE updated_at >= '2026-02-12' -- incremental window
) WHERE rn = 1; -- latest event per key
-- Apply to silver table
MERGE INTO silver.orders t
USING new_events s
ON t.order_id = s.order_id
WHEN MATCHED AND s.op = 'd' THEN
DELETE
WHEN MATCHED AND s.op IN ('u', 'c') THEN
UPDATE SET
t.customer_id = s.customer_id,
t.amount = s.amount,
t.status = s.status,
t.updated_at = s.updated_at
WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN
INSERT (order_id, customer_id, amount, status, updated_at)
VALUES (s.order_id, s.customer_id, s.amount, s.status, s.updated_at)
Advantages:
- Full control over transformation logic.
- Bronze table preserves complete change history for audit and replay.
- Uses familiar Spark SQL — no Flink cluster required.
- Can handle complex transformations (joins, aggregations, data quality checks) before materializing.
Limitations:
- Higher latency (minutes to hours, depending on MERGE frequency).
- Requires managing two tables (bronze and silver) and the MERGE job.
- MERGE INTO can be expensive without proper push-down predicates.
Choosing Between the Two
| Factor | Direct CDC (Flink) | Bronze-Silver (Spark) |
|---|---|---|
| Latency | Seconds to minutes | Minutes to hours |
| Operational complexity | Flink cluster + Kafka | Spark job (scheduled or streaming) |
| Transformation | Limited (mirror only) | Full (joins, aggregations, DQ) |
| Audit trail | No (events applied directly) | Yes (bronze table preserves history) |
| Replay capability | Limited | Full (reprocess from bronze) |
| Delete handling | Equality deletes (Flink) | Position deletes via MERGE (Spark) |
| Schema evolution | Requires coordination | Handle in MERGE logic |
| Best for | Simple replication, low latency | Complex pipelines, regulated industries |
Table Design for CDC Workloads
CDC tables have unique requirements that differ from append-only analytical tables. Here are the key design decisions.
Partition Strategy
CDC workloads are update-heavy — most operations modify existing rows rather than appending new ones. This fundamentally changes the partition design:
-- CDC mirror table: bucket by primary key
CREATE TABLE silver.orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, order_id))
TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.target-file-size-bytes' = '268435456',
'write.parquet.compression-codec' = 'zstd',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read',
'write.metadata.previous-versions-max' = '15',
'write.metadata.delete-after-commit.enabled' = 'true',
'commit.retry.num-retries' = '10'
);
Why bucket(order_id) instead of day(updated_at)?
- CDC
MERGE INTOjoins on the primary key (order_id). Bucketing byorder_idmeans the merge only scans 1/16th of the table for each bucket — the partition predicate naturally aligns with the join key. - Time-based partitioning (day, month) does not help CDC merges because updated rows can belong to any time partition. A single MERGE batch might update orders from the last 6 months.
When to add time partitioning alongside buckets:
If your downstream queries always filter by time (dashboards showing "last 7 days"), add a compound partition:
PARTITIONED BY (bucket(16, order_id), day(updated_at))
But be careful — this increases the number of partitions and may create undersized files if the per-day-per-bucket volume is small.
Merge-on-Read Is Almost Always Right for CDC
CDC tables receive frequent updates. Copy-on-write rewrites entire files for each update — unacceptable for tables merged every hour. Merge-on-read writes small delete files instead, making each merge fast.
The trade-off is that reads slow down as delete files accumulate. This is why compaction is critical for CDC tables (covered below).
Iceberg's Built-In CDC: create_changelog_view
Iceberg itself can act as a CDC source. The create_changelog_view procedure generates a view showing row-level changes between two snapshots — including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER change types.
Basic Usage
CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '1234567890',
'end-snapshot-id', '9876543210'
)
);
-- Query the changelog view
SELECT * FROM silver.orders_changes;
The view includes a _change_type column with values:
INSERT— new row added.DELETE— row removed.UPDATE_BEFORE— the row's state before an update.UPDATE_AFTER— the row's state after an update.
Using Identifier Columns for Update Detection
By specifying identifier_columns, Iceberg can distinguish updates from delete-insert pairs. If two rows (one deleted, one inserted) share the same identifier column values, Iceberg classifies them as UPDATE_BEFORE and UPDATE_AFTER:
CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '1234567890',
'end-snapshot-id', '9876543210'
),
identifier_columns => array('order_id')
);
Use Cases for create_changelog_view
| Use Case | How |
|---|---|
| Forward CDC from Iceberg to another system | Read the changelog view and publish events to Kafka or another sink. |
| Audit trail | Query changes between snapshots for compliance reporting. |
| Incremental ETL | Read only the changed rows to feed downstream aggregation tables. |
| Data quality monitoring | Compare changes between snapshots to detect anomalies (sudden spikes in deletes, unexpected NULL values). |
Incremental Processing with Changelog View
Instead of processing the entire table on every ETL run, use the changelog view to process only what changed:
# Track the last processed snapshot
last_snapshot = spark.sql("""
SELECT snapshot_id FROM etl_control.checkpoints
WHERE table_name = 'silver.orders'
""").collect()[0].snapshot_id
current_snapshot = spark.sql("""
SELECT snapshot_id FROM silver.orders.snapshots
ORDER BY committed_at DESC LIMIT 1
""").collect()[0].snapshot_id
if last_snapshot != current_snapshot:
# Create changelog view for the delta
spark.sql(f"""
CALL spark_catalog.system.create_changelog_view(
table => 'silver.orders',
options => map(
'start-snapshot-id', '{last_snapshot}',
'end-snapshot-id', '{current_snapshot}'
),
identifier_columns => array('order_id')
)
""")
# Process only changed rows
changes = spark.sql("""
SELECT * FROM silver.orders_changes
WHERE _change_type IN ('INSERT', 'UPDATE_AFTER')
""")
# Apply to downstream table
changes.writeTo("gold.order_summary").append()
# Update checkpoint
spark.sql(f"""
UPDATE etl_control.checkpoints
SET snapshot_id = '{current_snapshot}',
processed_at = current_timestamp()
WHERE table_name = 'silver.orders'
""")
Compaction: The Make-or-Break for CDC Tables
CDC tables degrade faster than any other Iceberg table pattern. Every MERGE creates delete files (MOR) or rewrites files (COW). Without compaction, read performance degrades linearly with the number of merges.
How Delete Files Accumulate
After 1 merge: 10 data files + 10 delete files
After 10 merges: 10 data files + 100 delete files
After 50 merges: 10 data files + 500 delete files
(each read must reconcile 500 delete files)
At 500 delete files, a query that should take 3 seconds takes 30+ seconds because the reader must cross-reference every data file row against every delete file.
Compaction Strategy for CDC Tables
-- Aggressive compaction for CDC tables
CALL system.rewrite_data_files(
table => 'silver.orders',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '268435456',
'min-file-size-bytes', '104857600',
'delete-file-threshold', '3'
)
);
-- Follow up with snapshot expiry
CALL system.expire_snapshots(
table => 'silver.orders',
older_than => TIMESTAMP '2026-02-10 00:00:00',
retain_last => 48
);
-- Remove orphan files
CALL system.remove_orphan_files(
table => 'silver.orders',
older_than => TIMESTAMP '2026-02-09 00:00:00'
);
Compaction Schedule
| MERGE Frequency | Compaction Interval | retain_last for expire_snapshots |
|---|---|---|
| Every 5 minutes (streaming) | Every 2 hours | 100 (keeps ~8 hours of snapshots) |
| Every 30 minutes | Every 4 hours | 48 (keeps ~24 hours) |
| Every hour | Every 6 hours | 24 (keeps ~24 hours) |
| Once per day (batch) | Weekly | 14 (keeps ~2 weeks) |
Monitoring Compaction Health
-- Check delete file count (should be < 100)
SELECT
COUNT(*) AS delete_file_count,
SUM(record_count) AS total_delete_records,
ROUND(SUM(file_size_in_bytes) / 1048576, 1) AS total_delete_mb
FROM silver.orders.delete_files;
-- Check data file health
SELECT
COUNT(*) AS file_count,
ROUND(AVG(file_size_in_bytes) / 1048576, 1) AS avg_file_mb,
ROUND(MIN(file_size_in_bytes) / 1048576, 1) AS min_file_mb
FROM silver.orders.files;
-- Alert if delete files exceed threshold
SELECT
CASE
WHEN COUNT(*) > 100 THEN 'CRITICAL: Compact immediately'
WHEN COUNT(*) > 50 THEN 'WARNING: Schedule compaction'
ELSE 'OK'
END AS status,
COUNT(*) AS delete_file_count
FROM silver.orders.delete_files;
Handling Schema Evolution in CDC
Source databases change their schemas — new columns are added, types are widened, columns are renamed. Your CDC pipeline must handle these changes without breaking.
Schema Changes That Iceberg Handles Automatically
| Change | Iceberg Support | Notes |
|---|---|---|
| Add column | Automatic | New column is added with NULL values for existing rows. |
| Widen type (int → long) | Automatic | Iceberg's schema evolution handles this. |
| Drop column | Manual | Iceberg does not auto-drop columns. Remove from MERGE SELECT list. |
| Rename column | Manual | Iceberg can rename, but CDC tools may send the old name. |
Safe Schema Evolution Pattern
When a new column appears in the source database:
-- Step 1: Add the column to the Iceberg table
ALTER TABLE silver.orders ADD COLUMN loyalty_points INT;
-- Step 2: Update the MERGE to include the new column
-- (The bronze table already has it if Debezium schema evolution is enabled)
MERGE INTO silver.orders t
USING new_events s
ON t.order_id = s.order_id
WHEN MATCHED THEN
UPDATE SET
t.customer_id = s.customer_id,
t.amount = s.amount,
t.status = s.status,
t.loyalty_points = s.loyalty_points, -- new column
t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT *
Debezium Schema Registry Integration
For automated schema evolution, use Debezium with a Schema Registry (Confluent or Apicurio):
- Debezium detects schema changes in the source database.
- The new schema is registered in the Schema Registry.
- Your CDC consumer reads the schema from the registry and applies
ALTER TABLEto the Iceberg table before processing events with the new columns.
This avoids manual intervention for the most common schema changes (adding columns, widening types).
Production Operational Patterns
Pattern 1: Watermark-Based Incremental Processing
Track a high-water mark to process only new CDC events:
# Read high-water mark
hwm = spark.sql("""
SELECT COALESCE(MAX(last_ts_ms), 0) AS hwm
FROM etl_control.watermarks
WHERE pipeline = 'orders_cdc'
""").collect()[0].hwm
# Read new events from bronze
new_events = spark.sql(f"""
SELECT * FROM bronze.orders_cdc
WHERE ts_ms > {hwm}
""")
if new_events.count() > 0:
# Deduplicate: keep latest event per key
deduped = new_events.sql("""
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY order_id ORDER BY ts_ms DESC
) AS rn
FROM new_events
) WHERE rn = 1
""")
# Apply MERGE
# ... (merge logic here)
# Update watermark
new_hwm = new_events.agg({"ts_ms": "max"}).collect()[0][0]
spark.sql(f"""
UPDATE etl_control.watermarks
SET last_ts_ms = {new_hwm}, updated_at = current_timestamp()
WHERE pipeline = 'orders_cdc'
""")
Pattern 2: Dead Letter Queue for Failed Events
Not all CDC events can be processed successfully. Schema mismatches, data type errors, or constraint violations can cause individual events to fail. Rather than failing the entire batch, route failures to a dead letter table:
try:
# Process events
spark.sql("MERGE INTO silver.orders ...")
except Exception as e:
# Write failed events to dead letter table
failed_events.writeTo("bronze.orders_cdc_dlq") \
.option("error_message", str(e)) \
.append()
Pattern 3: Monitoring and Alerting
Track key CDC pipeline metrics:
-- Create a monitoring view
CREATE OR REPLACE VIEW ops.cdc_health AS
SELECT
'orders' AS table_name,
(SELECT COUNT(*) FROM silver.orders.delete_files) AS delete_files,
(SELECT COUNT(*) FROM silver.orders.files) AS data_files,
(SELECT MAX(committed_at) FROM silver.orders.snapshots) AS last_commit,
TIMESTAMPDIFF(
MINUTE,
(SELECT MAX(committed_at) FROM silver.orders.snapshots),
current_timestamp()
) AS minutes_since_last_commit;
Alert thresholds:
- Delete files > 100: Compaction is overdue.
- Minutes since last commit > 30: CDC pipeline may be stalled.
- Average file size < 50 MB: Small file accumulation from frequent merges.
Quick Reference
CDC Table Properties
| Property | Recommended Value | Why |
|---|---|---|
format-version | 2 | Required for MOR and position deletes. |
write.merge.mode | merge-on-read | Fast writes for frequent merges. |
write.delete.mode | merge-on-read | Fast deletes for CDC. |
write.update.mode | merge-on-read | Fast updates for CDC. |
write.distribution-mode | hash | Prevents small files. |
write.metadata.previous-versions-max | 15 | Streaming commits create many metadata files. |
write.metadata.delete-after-commit.enabled | true | Auto-clean metadata. |
commit.retry.num-retries | 10 | Handle concurrent write conflicts. |
CDC Pipeline Decision Matrix
| Question | If Yes | If No |
|---|---|---|
| Need sub-minute latency? | Direct CDC (Flink) | Bronze-Silver (Spark) |
| Need transformation before materialization? | Bronze-Silver | Direct CDC |
| Need audit trail of all changes? | Bronze-Silver | Either |
| Have Flink expertise? | Direct CDC | Bronze-Silver |
| Source schema changes frequently? | Bronze-Silver (more control) | Either |
| Regulated industry (compliance)? | Bronze-Silver (full history) | Either |
This post is part of our Apache Iceberg deep-dive series. For MERGE INTO optimization techniques, see Writing Efficient MERGE INTO Queries on Iceberg with Spark. For table design, see Iceberg Table Design: Properties, Partitioning, and Commit Best Practices. Check out the full series on our blog.