Skip to main content

Writing Efficient MERGE INTO Queries on Iceberg with Spark

· 13 min read
Cazpian Engineering
Platform Engineering Team

Writing Efficient MERGE INTO Queries on Iceberg

MERGE INTO is the most powerful and the most misused operation in Apache Iceberg. It handles upserts, conditional deletes, SCD Type-2 updates, and CDC application — all in a single atomic statement. But it is also the operation most likely to trigger a full table scan, blow up your compute costs, and produce thousands of small files if you do not write it carefully.

The difference between a well-written and a poorly-written MERGE INTO on the same table can be the difference between 30 seconds and 30 minutes — and between $2 and $200 in compute cost. This post shows you exactly how to write it right.

How MERGE INTO Works Under the Hood

Before optimizing, you need to understand what Spark actually does when you run a MERGE INTO:

Detailed diagram of MERGE INTO execution flow showing target scan, join, row classification, push-down predicate optimization, COW vs MOR write paths, and compaction strategies

MERGE INTO target_table t
USING source_data s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.value = s.value
WHEN NOT MATCHED THEN INSERT *

Step 1: Scan the target table. Spark reads the target Iceberg table — ideally, only the files that could contain matching rows.

Step 2: Join target with source. Spark performs a join between the target rows and the source data using the ON condition.

Step 3: Classify rows. Each row is classified as MATCHED (exists in both target and source), NOT MATCHED (exists in source but not target), or NOT MATCHED BY SOURCE (exists in target but not source).

Step 4: Write the result. Depending on Copy-on-Write or Merge-on-Read mode, Spark either rewrites entire data files or writes delete files + new data files.

The critical performance factor is Step 1 — how much of the target table Spark actually reads. A full table scan on a 10 TB table is catastrophically expensive. The goal is to make Spark read only the files that could contain matching rows.

The Push-Down Predicate Strategy

The single most impactful optimization for MERGE INTO is adding a push-down predicate — a condition in the ON clause that maps to the table's partition spec, enabling Iceberg to prune irrelevant partitions and files.

The Problem: Full Table Scans

Consider this common pattern:

MERGE INTO analytics.events t
USING staging.daily_events s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

This looks reasonable, but it forces a full table scan. Why? Because event_id is not a partition column. Iceberg cannot determine which partitions or files might contain matching event_id values without reading every file's statistics — and if event_id values are randomly distributed, min/max statistics will not help either.

If analytics.events is partitioned by day(event_time) and holds 2 years of data, Spark just read 730 daily partitions to merge one day of staging data.

The Solution: Add a Partition-Scoped Predicate

MERGE INTO analytics.events t
USING staging.daily_events s
ON t.event_id = s.event_id
AND t.event_time >= '2026-02-12'
AND t.event_time < '2026-02-13'
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

By adding t.event_time >= '2026-02-12' AND t.event_time < '2026-02-13' to the ON clause, Iceberg can now prune all partitions except February 12th. Instead of scanning 730 partitions, Spark scans exactly 1.

The improvement is proportional to the number of partitions you skip. For a table with 365 daily partitions, scoping to one day gives you a ~365x speedup on the target scan.

Rules for Effective Push-Down Predicates

RuleWhy
Include the partition column in the ON clause.Iceberg can only prune partitions if the filter references a column used in the partition spec.
Use range filters that align with the partition transform.day(event_time) needs a date range. bucket(user_id) needs an equality filter on user_id.
Put the predicate on the target table (not the source).Iceberg pushes down predicates on the target. Predicates on the source only filter the source side of the join.
Avoid OR conditions that span partition boundaries.t.event_date = '2026-02-12' OR t.user_id = 42 prevents partition pruning because the OR makes the entire predicate non-deterministic for partitions.
Avoid WHEN NOT MATCHED BY SOURCE if possible.This clause forces a full table scan because Iceberg needs to identify all target rows that have no match in the source — which requires reading every target file.

Dynamic Partition Scoping

If your staging data spans multiple days, compute the date range dynamically:

-- First, compute the date range of the incoming data
CREATE OR REPLACE TEMP VIEW source_bounds AS
SELECT MIN(event_time) AS min_time, MAX(event_time) AS max_time
FROM staging.daily_events;

-- Then, scope the merge to that range
MERGE INTO analytics.events t
USING staging.daily_events s
ON t.event_id = s.event_id
AND t.event_time >= (SELECT min_time FROM source_bounds)
AND t.event_time < (SELECT DATE_ADD(max_time, 1) FROM source_bounds)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Or in PySpark:

# Get the bounds from source data
bounds = spark.sql("""
SELECT
MIN(event_time) AS min_time,
MAX(event_time) AS max_time
FROM staging.daily_events
""").collect()[0]

min_time = bounds.min_time
max_time = bounds.max_time

spark.sql(f"""
MERGE INTO analytics.events t
USING staging.daily_events s
ON t.event_id = s.event_id
AND t.event_time >= '{min_time}'
AND t.event_time < DATE_ADD('{max_time}', 1)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Copy-on-Write vs Merge-on-Read for MERGE

The write.merge.mode table property fundamentally changes how MERGE INTO executes:

Copy-on-Write (COW) — Default

Before: [file_1.parquet: 256 MB, 1M rows]
MERGE updates 100 rows in file_1

After: [file_1_rewritten.parquet: 256 MB, 1M rows (100 updated)]
file_1.parquet is marked as deleted in the snapshot

COW rewrites every file that contains at least one matched row. If your merge touches rows spread across 500 files, all 500 files are rewritten — even if only 1 row changed in each file.

COW performance characteristics:

  • Write cost: High — rewrites entire files.
  • Read cost: Low — no delete files to reconcile at query time.
  • Best for: Batch merges where reads are frequent and writes are infrequent.

Merge-on-Read (MOR)

Before: [file_1.parquet: 256 MB, 1M rows]
MERGE updates 100 rows in file_1

After: [file_1.parquet: 256 MB, unchanged]
[delete_file_1.parquet: tiny, marks 100 rows as deleted]
[new_data_file.parquet: tiny, contains 100 updated rows]

MOR writes small delete files and new data files instead of rewriting existing files. Much faster for writes, but reads must merge data files with delete files at query time.

MOR performance characteristics:

  • Write cost: Low — only writes deltas.
  • Read cost: Higher — must reconcile delete files during reads. Degrades over time without compaction.
  • Best for: Frequent merges (CDC, streaming upserts) where write performance matters more.

Choosing the Right Mode

ScenarioRecommended ModeWhy
Daily batch upsert (one merge/day)COWOne heavy write is acceptable. Reads are fast all day.
Hourly CDC applicationMORMultiple merges per day — cannot afford to rewrite files every hour.
Streaming micro-batch upsertsMORDozens of merges per day. MOR keeps write latency low.
Small source, large targetCOWIf the merge touches few files, COW overhead is minimal.
Large source, large targetMORRewriting thousands of files is prohibitively expensive.
-- Set MOR for a CDC table
ALTER TABLE analytics.user_profiles
SET TBLPROPERTIES (
'write.merge.mode' = 'merge-on-read',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read'
);

Distribution Mode Impact on MERGE

The write.distribution-mode property affects how Spark shuffles data before writing the merge result.

ModeBehavior During MERGEImpact
noneNo shuffle after the merge join. Each task writes wherever its output lands.Fastest write, but creates many small files across partitions.
hashShuffle by partition key after the merge join.Balanced. Produces well-sized files per partition.
rangeSort by partition key + sort order after the merge join.Best file layout, but most expensive shuffle.

For MERGE operations, hash is almost always the right choice for partitioned tables. It ensures that all rows for a given partition land on the same write task, producing well-sized output files.

With none, a merge that touches 32 bucket partitions from 200 Spark tasks can produce 200 x 32 = 6,400 tiny files. With hash, it produces ~32 well-sized files (one per partition).

-- Ensure hash distribution for merge-heavy tables
ALTER TABLE analytics.events
SET TBLPROPERTIES (
'write.distribution-mode' = 'hash'
);

Compaction After MERGE

Every MERGE INTO — especially MOR merges — generates file artifacts that degrade read performance over time:

  • MOR merges produce delete files that accumulate and slow down reads.
  • COW merges may produce undersized files if the merge only rewrites a few rows per file.
  • Both modes may leave files with suboptimal sort order, weakening min/max statistics.

Compaction Strategy

-- After merge: compact to merge delete files and right-size data files
CALL system.rewrite_data_files(
table => 'analytics.events',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '268435456',
'min-file-size-bytes', '104857600',
'max-file-size-bytes', '536870912',
'delete-file-threshold', '3'
)
);

The delete-file-threshold parameter tells Iceberg to only rewrite files that have 3 or more associated delete files. This avoids rewriting files that have been touched by only one or two merges — saving compute while still cleaning up the worst offenders.

Compaction Frequency

Merge FrequencyCompaction ScheduleStrategy
Once per day (batch)Weekly or after N mergesbinpack or sort
Hourly CDCEvery 4-6 hoursbinpack
Streaming micro-batchEvery 2-4 hoursbinpack with delete-file-threshold=5

Partial Compaction

For large tables, compact only the partitions that were affected by recent merges:

-- Compact only the partitions touched by today's merge
CALL system.rewrite_data_files(
table => 'analytics.events',
where => 'event_time >= TIMESTAMP ''2026-02-12 00:00:00''
AND event_time < TIMESTAMP ''2026-02-13 00:00:00'''
);

MERGE INTO Patterns for Common Use Cases

Pattern 1: Simple Upsert (Insert or Update)

The most common pattern — insert new rows, update existing ones:

MERGE INTO analytics.customers t
USING staging.customer_updates s
ON t.customer_id = s.customer_id
AND t.region = s.region -- push-down predicate (partition column)
WHEN MATCHED THEN
UPDATE SET
t.full_name = s.full_name,
t.email = s.email,
t.segment = s.segment,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (customer_id, full_name, email, segment, region, created_at, updated_at)
VALUES (s.customer_id, s.full_name, s.email, s.segment, s.region, current_timestamp(), current_timestamp())

Pattern 2: Conditional Update (Only If Changed)

Avoid unnecessary file rewrites by only updating rows where values actually changed:

MERGE INTO analytics.products t
USING staging.product_feed s
ON t.product_id = s.product_id
WHEN MATCHED AND (
t.price <> s.price OR
t.stock_quantity <> s.stock_quantity OR
t.description <> s.description
) THEN
UPDATE SET
t.price = s.price,
t.stock_quantity = s.stock_quantity,
t.description = s.description,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT *

The WHEN MATCHED AND (...) condition prevents Iceberg from rewriting files for rows where nothing changed. This can dramatically reduce write amplification when your source feed contains both changed and unchanged records.

Pattern 3: SCD Type-2 (Slowly Changing Dimension)

Close the current record and insert a new version:

MERGE INTO analytics.dim_customers t
USING (
SELECT s.*, current_timestamp() AS effective_from
FROM staging.customer_changes s
) s
ON t.customer_id = s.customer_id
AND t.is_current = true
WHEN MATCHED THEN
UPDATE SET
t.is_current = false,
t.effective_to = s.effective_from,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (customer_id, full_name, email, segment,
is_current, effective_from, effective_to, created_at, updated_at)
VALUES (s.customer_id, s.full_name, s.email, s.segment,
true, s.effective_from, null, current_timestamp(), current_timestamp())

Note: After the MERGE, insert the new current records:

INSERT INTO analytics.dim_customers
SELECT
customer_id, full_name, email, segment,
true AS is_current,
current_timestamp() AS effective_from,
null AS effective_to,
current_timestamp() AS created_at,
current_timestamp() AS updated_at
FROM staging.customer_changes;

Pattern 4: Delete + Upsert (CDC Apply)

Apply CDC events that include inserts, updates, and deletes:

MERGE INTO analytics.orders t
USING staging.cdc_events s
ON t.order_id = s.order_id
AND t.order_date >= s.min_event_date -- push-down
AND t.order_date <= s.max_event_date -- push-down
WHEN MATCHED AND s.op = 'D' THEN
DELETE
WHEN MATCHED AND s.op IN ('U', 'C') THEN
UPDATE SET *
WHEN NOT MATCHED AND s.op IN ('I', 'C') THEN
INSERT *

Pattern 5: Partition-Aligned Merge (INSERT OVERWRITE Alternative)

When you know the merge replaces an entire partition, INSERT OVERWRITE is simpler and faster. But when you need to update some rows in a partition while keeping others, scope the merge:

MERGE INTO analytics.events t
USING staging.corrected_events s
ON t.event_id = s.event_id
AND t.event_time >= '2026-02-01'
AND t.event_time < '2026-03-01'
WHEN MATCHED THEN
UPDATE SET
t.event_type = s.event_type,
t.properties = s.properties,
t.corrected = true,
t.updated_at = current_timestamp()

Spark Configuration for MERGE Performance

# Optimal Spark settings for MERGE INTO workloads
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "268435456")

# Increase shuffle partitions for large merges
spark.conf.set("spark.sql.shuffle.partitions", "400")

# Enable broadcast join for small source datasets
# (Spark will auto-broadcast sources under this threshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600") # 100 MB

Broadcast join optimization: If your source dataset is small (under 100 MB), Spark can broadcast it to all executors, avoiding the expensive shuffle join. This is common for daily batch merges where you merge a small staging table into a large target.

-- Force broadcast for small source
MERGE INTO analytics.events t
USING /*+ BROADCAST(s) */ staging.daily_events s
ON t.event_id = s.event_id
AND t.event_time >= '2026-02-12'
AND t.event_time < '2026-02-13'
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Performance Checklist

Before running a MERGE INTO in production, verify:

  • Partition column is in the ON clause. If not, you are doing a full table scan.
  • Source data is bounded to a known range. Compute min/max of the partition column from the source and scope the merge accordingly.
  • Distribution mode is hash. Prevents small file explosion after the merge.
  • COW vs MOR is set correctly. Use COW for infrequent batch merges, MOR for frequent/streaming merges.
  • No WHEN NOT MATCHED BY SOURCE clause. This forces a full table scan regardless of push-down predicates.
  • Conditional update (WHEN MATCHED AND ...) is used if the source may contain unchanged rows.
  • Compaction is scheduled after the merge (especially for MOR tables).
  • Broadcast hint is added if the source dataset is small (under 100 MB).

Common Mistakes

  1. No partition predicate in the ON clause. This is the number one cause of slow MERGE INTO. Adding a single date range predicate can turn a 30-minute merge into a 30-second merge.

  2. Using WHEN NOT MATCHED BY SOURCE THEN DELETE. This clause requires scanning the entire target table to find rows that have no match. Avoid it. Instead, compute the deletes separately and run a targeted DELETE FROM with partition predicates.

  3. Running MOR merges without compaction. Each MOR merge creates delete files. After 50 merges, your table may have thousands of delete files, making reads 10x slower. Schedule compaction.

  4. Merging the entire history when only today changed. If your staging table contains data for February 12th, do not merge against the entire target table. Scope it to February 12th.

  5. Ignoring the source dataset size. A 50 MB source joined against a 5 TB target should use a broadcast join. A 50 GB source should not. Check spark.sql.autoBroadcastJoinThreshold.

  6. Using UPDATE SET * when only a few columns changed. SET * updates every column, potentially triggering unnecessary file rewrites in COW mode. Use conditional updates or explicit column lists to minimize write amplification.


This post is part of our Apache Iceberg deep-dive series. For table design and property configuration, see Iceberg Table Design: Properties, Partitioning, and Commit Best Practices. For CDC pipeline architecture, see Iceberg CDC: Patterns, Best Practices, and Real-World Pipelines. Check out the full series on our blog.