Skip to main content

Storage Partitioned Joins in Apache Iceberg with Spark

· 13 min read
Cazpian Engineering
Platform Engineering Team

Storage Partitioned Joins in Apache Iceberg with Spark

Every Spark join starts the same way: read both sides, shuffle the data across the network so matching keys end up on the same executor, then join. That shuffle is the single most expensive operation in most Spark jobs — it moves data across the network, writes temporary files to disk, and consumes memory on every executor in the cluster.

But what if both tables are already organized by the join key on disk? If the left table's customer_id=42 rows are in bucket 42 and the right table's customer_id=42 rows are also in bucket 42, there is nothing to shuffle. Each executor can join its local partitions independently.

That is exactly what Storage Partitioned Join (SPJ) does. Introduced in Spark 3.3 and matured in Spark 3.4+, SPJ is the most impactful — and least understood — optimization available for Iceberg+Spark workloads. This post shows you how it works, how to set it up, how to verify it, and where it breaks.

The Shuffle Problem

Detailed diagram showing how Storage Partitioned Joins eliminate shuffle by aligning bucket partitions across tables, with physical plan comparisons and Spark configuration requirements

Consider a common data pipeline: you join a 500-million-row orders table with a 10-million-row customers table on customer_id. Without SPJ, Spark's physical plan looks like this:

== Physical Plan ==
SortMergeJoin [customer_id], [customer_id]
├── Sort [customer_id ASC]
│ └── Exchange hashpartitioning(customer_id, 200) ← SHUFFLE 500M rows
│ └── BatchScan orders
└── Sort [customer_id ASC]
└── Exchange hashpartitioning(customer_id, 200) ← SHUFFLE 10M rows
└── BatchScan customers

Those two Exchange nodes represent shuffles. Spark reads every row from both tables, hashes customer_id, and sends each row across the network to the executor responsible for that hash partition. For the 500M-row table, this means serializing, compressing, writing to disk, transferring over the network, and deserializing — for every single row.

The shuffle is expensive because it is:

  • CPU-intensive: Serialization and hashing on every row.
  • Network-intensive: Moving potentially terabytes of data across the cluster.
  • Disk-intensive: Writing shuffle files to local executor disks.
  • Failure-prone: If an executor is lost mid-shuffle (common on spot instances), the entire shuffle stage must restart.

What is Storage Partitioned Join?

Storage Partitioned Join eliminates the shuffle by recognizing that the data is already physically co-located on disk.

How It Works

  1. Iceberg reports its physical layout. When Spark reads an Iceberg table, Iceberg's Data Source V2 implementation reports the table's partitioning scheme to Spark's query planner via the SupportsReportPartitioning interface. For a table partitioned by bucket(256, customer_id), Iceberg tells Spark: "this table's data is grouped into 256 buckets by a hash of customer_id."

  2. Spark detects compatible partitioning. During physical planning, Spark checks whether both sides of a join have compatible partitioning on the join key. If both tables are bucketed by customer_id with the same bucket count, their partitions are already aligned.

  3. Spark skips the Exchange operator. Instead of inserting Exchange (shuffle) nodes, Spark reads corresponding buckets from each table and feeds them directly into the join operator. Bucket 0 from orders joins with bucket 0 from customers, bucket 1 with bucket 1, and so on.

The resulting physical plan:

== Physical Plan ==
SortMergeJoin [customer_id], [customer_id]
├── Sort [customer_id ASC]
│ └── BatchScan orders ← NO SHUFFLE
└── Sort [customer_id ASC]
└── BatchScan customers ← NO SHUFFLE

No Exchange nodes. Zero network shuffle. Each executor processes only its assigned buckets locally.

The Internal Mechanism

Under the hood, Iceberg exposes partitioning information through KeyGroupedPartitioning. Spark's physical planner reads this from the outputPartitioning method of BatchScanExec and uses it to determine whether the join can be executed without redistribution. The key Iceberg property that makes this work is spark.sql.iceberg.planning.preserve-data-grouping, which ensures that Iceberg's partitioning metadata is preserved through Spark's query planning stages rather than being discarded during optimization.

Setting Up SPJ — Step by Step

Step 1: Create Tables with Matching Bucket Partitioning

Both tables must be Iceberg tables partitioned by the same bucket transform on the join column, with the same bucket count:

-- Fact table: 256 buckets on customer_id
CREATE TABLE analytics.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
product_id BIGINT
)
USING iceberg
PARTITIONED BY (bucket(256, customer_id));

-- Dimension table: 256 buckets on customer_id (must match)
CREATE TABLE analytics.customers (
customer_id BIGINT,
name STRING,
email STRING,
region STRING,
signup_date DATE
)
USING iceberg
PARTITIONED BY (bucket(256, customer_id));

Critical: The bucket count must be identical on both tables. bucket(256, customer_id) on orders and bucket(128, customer_id) on customers will not trigger SPJ in Spark 3.x. Spark 4.0 relaxes this restriction (see below).

Step 2: Enable Spark Configurations

Five configurations must be set for SPJ to activate:

# Core SPJ enablement
spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true

# Handle missing partition values on one side of the join
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true

# Allow join keys to be a superset of partition keys
spark.sql.requireAllClusterKeysForCoPartition=false

# Handle data skew across buckets
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true

Optional but often needed:

# Disable broadcast join so Spark doesn't bypass SPJ for smaller tables
spark.sql.autoBroadcastJoinThreshold=-1

# AQE can interfere with SPJ — disable if SPJ is not triggering
spark.sql.adaptive.enabled=false

Step 3: Write a Join on the Partition Column

SELECT o.order_id, o.amount, c.name, c.region
FROM analytics.orders o
JOIN analytics.customers c
ON o.customer_id = c.customer_id
WHERE o.order_date >= '2026-01-01';

The join condition must include the partition column (customer_id). Additional conditions (like order_date) are fine — they become filters, not part of the SPJ decision.

Step 4: Verify with EXPLAIN

EXPLAIN FORMATTED
SELECT o.order_id, o.amount, c.name, c.region
FROM analytics.orders o
JOIN analytics.customers c
ON o.customer_id = c.customer_id;

SPJ is working if: There are no Exchange nodes between the BatchScan and the Join operator.

SPJ is NOT working if: You see Exchange hashpartitioning(customer_id, N) — that means Spark is shuffling.

What Each Configuration Does

ConfigurationDefaultDescription
spark.sql.sources.v2.bucketing.enabledfalseMaster switch. Enables Spark to use V2 data source partitioning to avoid shuffles.
spark.sql.iceberg.planning.preserve-data-groupingfalseIceberg-specific. Preserves partition grouping through query planning stages. Without this, Spark's optimizer may discard Iceberg's partitioning metadata.
spark.sql.sources.v2.bucketing.pushPartValues.enabledfalseCreates empty partitions for partition values that exist on one side but not the other. Without this, SPJ fails when tables have asymmetric partition coverage.
spark.sql.requireAllClusterKeysForCoPartitiontrueWhen true, requires ALL partition keys to appear in the join. Set to false to allow joins on a superset of partition keys.
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabledfalseHandles skewed partitions by splitting large buckets across multiple tasks and replicating the smaller side's corresponding bucket.
spark.sql.adaptive.enabledtrueAQE can override SPJ decisions. Disable if SPJ is not triggering despite correct setup.
spark.sql.autoBroadcastJoinThreshold10485760Spark broadcasts small tables instead of using SPJ. Set to -1 to force SPJ for benchmarking or when you want consistent join behavior.

SPJ with MERGE INTO — The Killer Use Case

MERGE INTO is internally a join between the target table and the source dataset. By default, Spark shuffles both sides to execute this join — even when both tables are bucketed identically. Enabling SPJ eliminates this shuffle, which is where the largest performance gains occur.

The Setup

-- Target table: bucketed on order_id
CREATE TABLE analytics.orders (
order_id BIGINT,
status STRING,
amount DECIMAL(10,2),
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(128, order_id));

-- Staging table: bucketed identically
CREATE TABLE staging.daily_orders (
order_id BIGINT,
status STRING,
amount DECIMAL(10,2),
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(128, order_id));

The MERGE

MERGE INTO analytics.orders t
USING staging.daily_orders s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

With SPJ enabled, the join phase of this MERGE executes without any shuffle. Bucket 0 of staging.daily_orders is matched against bucket 0 of analytics.orders, bucket 1 against bucket 1, and so on.

Important: The source table must be an Iceberg table with matching bucket partitioning. If your source is a temporary view, a Parquet file, or a DataFrame without Iceberg bucketing, SPJ will not activate. Load your staging data into a properly bucketed Iceberg table first.

Real-World Impact

In GitHub issue #7832, a user reported a MERGE operation joining 50 million rows against 30,000 rows that took 40 minutes with shuffle. The root cause was that Spark performed a ShuffledHashJoin scanning the entire 50M-row table. With SPJ enabled and both tables bucketed identically, the shuffle was eliminated entirely.

The performance difference depends on your data size, cluster configuration, and network bandwidth — but eliminating a shuffle on a 500M+ row table typically reduces join time by 50-80% and cuts compute costs proportionally.

Handling Edge Cases

Missing Partition Values

If the orders table has data in buckets 0-255 but the customers table only has data in buckets 0-200, Spark needs to create empty partitions for buckets 201-255 on the customers side. This is handled by:

spark.sql.sources.v2.bucketing.pushPartValues.enabled=true

Without this, SPJ will fail silently and fall back to a shuffle.

Data Skew

If bucket 42 in orders contains 10x more data than other buckets (e.g., a power customer), a single executor gets overloaded. The skew handling configuration:

spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true

This splits the large bucket across multiple tasks and replicates the corresponding bucket from the smaller table. Note: this does not work with FULL OUTER JOIN.

AQE Interference

Adaptive Query Execution can override SPJ decisions — AQE may decide that a broadcast join or a different strategy is better based on runtime statistics. If SPJ is not triggering despite correct table setup:

spark.sql.adaptive.enabled=false

Trade-off: Disabling AQE globally loses its benefits (dynamic partition coalescing, skew join handling, broadcast join conversion) for all other queries in the session. Consider setting this only for the specific SPJ query:

spark.conf.set("spark.sql.adaptive.enabled", "false")
# Run SPJ query
spark.sql("SELECT ... FROM orders JOIN customers ON ...")
# Re-enable AQE for other queries
spark.conf.set("spark.sql.adaptive.enabled", "true")

Partition Evolution Breaks SPJ

If a table undergoes partition evolution (adding/removing/reordering partition fields), the internal field IDs may no longer match between tables — even if the final partition spec looks identical. This is a known issue (GitHub #13530).

Workaround: Avoid partition evolution on tables where SPJ is critical. If you must evolve, create a new table with the desired partition spec and migrate data.

3-Way Joins Fail

SPJ currently fails when joining 3+ bucketed tables in a single query (GitHub #15015). The error is typically Unequal numbers of partitions.

Workaround: Break 3-way joins into two sequential 2-way joins:

-- Instead of: SELECT * FROM a JOIN b ON ... JOIN c ON ...
-- Do:
CREATE TEMPORARY VIEW ab AS
SELECT * FROM a JOIN b ON a.id = b.id;

SELECT * FROM ab JOIN c ON ab.id = c.id;

Both Tables Must Be Iceberg

SPJ requires both sides of the join to be Iceberg tables reporting compatible partitioning. Joining an Iceberg table with a Hive table, Parquet files, or a temporary DataFrame view will not trigger SPJ. If your source data comes from an external system, load it into a bucketed Iceberg staging table first.

Spark 4.0 SPJ Enhancements

Spark 4.0 significantly expands SPJ's applicability:

Join Keys as Subset of Partition Keys

In Spark 3.x, if a table is partitioned by bucket(256, customer_id), region, you must include both customer_id and region in the join condition (unless requireAllClusterKeysForCoPartition=false). Spark 4.0 introduces:

spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled=true

This allows SPJ to work when the join key is only customer_id, even though the table is also partitioned by region. Spark groups partitions by the common column to enable the optimization.

Compatible Bucket Counts

In Spark 3.x, both tables must have identical bucket counts. Spark 4.0 can handle compatible (but different) bucket counts — for example, joining a table with bucket(256, id) against one with bucket(128, id). Spark groups multiple smaller buckets to align with the larger side.

These enhancements make SPJ practical for more real-world scenarios where tables are not designed from day one with identical partitioning.

When NOT to Use SPJ

SPJ is not always the right choice:

  • Small dimension tables: If one side fits in memory, a broadcast join (spark.sql.autoBroadcastJoinThreshold) is faster than SPJ because it avoids any join-side scan overhead.
  • Non-equi-joins: SPJ only works with equality join conditions (=). Range joins, theta joins, and cross joins cannot use SPJ.
  • Tables with different access patterns: Bucketing a table solely for join performance penalizes other access patterns. If a table is primarily queried by event_date but only occasionally joined on customer_id, partitioning by day(event_date) may be the better overall choice.
  • Very few buckets: If both tables have only 4-8 buckets, the parallelism is too low and SPJ may be slower than a shuffle with 200 partitions.

Production Checklist

Table design:

  • Both tables use bucket(N, join_column) with the same N
  • Bucket count is appropriate for data volume (64-512 buckets for most workloads)
  • Both tables are Iceberg tables (not Hive, Parquet, or temp views)
  • No partition evolution has occurred that reorders fields

Spark configuration:

  • spark.sql.sources.v2.bucketing.enabled=true
  • spark.sql.iceberg.planning.preserve-data-grouping=true
  • spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
  • spark.sql.requireAllClusterKeysForCoPartition=false
  • spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true

Verification:

  • Run EXPLAIN FORMATTED — confirm no Exchange nodes between scan and join
  • Check Spark UI DAG — confirm no shuffle stage between read and join stages
  • Compare execution time with and without SPJ on a representative dataset

Maintenance:

  • Run compaction regularly — fragmented buckets reduce SPJ effectiveness
  • Monitor bucket data distribution — heavily skewed buckets need partiallyClusteredDistribution
  • Keep bucket counts aligned when creating new tables that will be joined

How Cazpian Handles This

On Cazpian, SPJ configurations are enabled by default on all managed Spark clusters. When you create bucketed Iceberg tables through Cazpian's catalog, the platform automatically enables preserve-data-grouping and all required V2 bucketing settings. For MERGE INTO workloads, Cazpian's job optimizer detects compatible bucket partitioning and ensures SPJ activates — no manual configuration needed.

What's Next

This post covered how to eliminate shuffles with Storage Partitioned Joins. For related optimizations, see our other posts in this series: