Storage Partitioned Joins in Apache Iceberg with Spark
Every Spark join starts the same way: read both sides, shuffle the data across the network so matching keys end up on the same executor, then join. That shuffle is the single most expensive operation in most Spark jobs — it moves data across the network, writes temporary files to disk, and consumes memory on every executor in the cluster.
But what if both tables are already organized by the join key on disk? If the left table's customer_id=42 rows are in bucket 42 and the right table's customer_id=42 rows are also in bucket 42, there is nothing to shuffle. Each executor can join its local partitions independently.
That is exactly what Storage Partitioned Join (SPJ) does. Introduced in Spark 3.3 and matured in Spark 3.4+, SPJ is the most impactful — and least understood — optimization available for Iceberg+Spark workloads. This post shows you how it works, how to set it up, how to verify it, and where it breaks.
The Shuffle Problem
Consider a common data pipeline: you join a 500-million-row orders table with a 10-million-row customers table on customer_id. Without SPJ, Spark's physical plan looks like this:
== Physical Plan ==
SortMergeJoin [customer_id], [customer_id]
├── Sort [customer_id ASC]
│ └── Exchange hashpartitioning(customer_id, 200) ← SHUFFLE 500M rows
│ └── BatchScan orders
└── Sort [customer_id ASC]
└── Exchange hashpartitioning(customer_id, 200) ← SHUFFLE 10M rows
└── BatchScan customers
Those two Exchange nodes represent shuffles. Spark reads every row from both tables, hashes customer_id, and sends each row across the network to the executor responsible for that hash partition. For the 500M-row table, this means serializing, compressing, writing to disk, transferring over the network, and deserializing — for every single row.
The shuffle is expensive because it is:
- CPU-intensive: Serialization and hashing on every row.
- Network-intensive: Moving potentially terabytes of data across the cluster.
- Disk-intensive: Writing shuffle files to local executor disks.
- Failure-prone: If an executor is lost mid-shuffle (common on spot instances), the entire shuffle stage must restart.
What is Storage Partitioned Join?
Storage Partitioned Join eliminates the shuffle by recognizing that the data is already physically co-located on disk.
How It Works
-
Iceberg reports its physical layout. When Spark reads an Iceberg table, Iceberg's Data Source V2 implementation reports the table's partitioning scheme to Spark's query planner via the
SupportsReportPartitioninginterface. For a table partitioned bybucket(256, customer_id), Iceberg tells Spark: "this table's data is grouped into 256 buckets by a hash ofcustomer_id." -
Spark detects compatible partitioning. During physical planning, Spark checks whether both sides of a join have compatible partitioning on the join key. If both tables are bucketed by
customer_idwith the same bucket count, their partitions are already aligned. -
Spark skips the Exchange operator. Instead of inserting
Exchange(shuffle) nodes, Spark reads corresponding buckets from each table and feeds them directly into the join operator. Bucket 0 fromordersjoins with bucket 0 fromcustomers, bucket 1 with bucket 1, and so on.
The resulting physical plan:
== Physical Plan ==
SortMergeJoin [customer_id], [customer_id]
├── Sort [customer_id ASC]
│ └── BatchScan orders ← NO SHUFFLE
└── Sort [customer_id ASC]
└── BatchScan customers ← NO SHUFFLE
No Exchange nodes. Zero network shuffle. Each executor processes only its assigned buckets locally.
The Internal Mechanism
Under the hood, Iceberg exposes partitioning information through KeyGroupedPartitioning. Spark's physical planner reads this from the outputPartitioning method of BatchScanExec and uses it to determine whether the join can be executed without redistribution. The key Iceberg property that makes this work is spark.sql.iceberg.planning.preserve-data-grouping, which ensures that Iceberg's partitioning metadata is preserved through Spark's query planning stages rather than being discarded during optimization.
Setting Up SPJ — Step by Step
Step 1: Create Tables with Matching Bucket Partitioning
Both tables must be Iceberg tables partitioned by the same bucket transform on the join column, with the same bucket count:
-- Fact table: 256 buckets on customer_id
CREATE TABLE analytics.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
product_id BIGINT
)
USING iceberg
PARTITIONED BY (bucket(256, customer_id));
-- Dimension table: 256 buckets on customer_id (must match)
CREATE TABLE analytics.customers (
customer_id BIGINT,
name STRING,
email STRING,
region STRING,
signup_date DATE
)
USING iceberg
PARTITIONED BY (bucket(256, customer_id));
Critical: The bucket count must be identical on both tables. bucket(256, customer_id) on orders and bucket(128, customer_id) on customers will not trigger SPJ in Spark 3.x. Spark 4.0 relaxes this restriction (see below).
Step 2: Enable Spark Configurations
Five configurations must be set for SPJ to activate:
# Core SPJ enablement
spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true
# Handle missing partition values on one side of the join
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
# Allow join keys to be a superset of partition keys
spark.sql.requireAllClusterKeysForCoPartition=false
# Handle data skew across buckets
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true
Optional but often needed:
# Disable broadcast join so Spark doesn't bypass SPJ for smaller tables
spark.sql.autoBroadcastJoinThreshold=-1
# AQE can interfere with SPJ — disable if SPJ is not triggering
spark.sql.adaptive.enabled=false
Step 3: Write a Join on the Partition Column
SELECT o.order_id, o.amount, c.name, c.region
FROM analytics.orders o
JOIN analytics.customers c
ON o.customer_id = c.customer_id
WHERE o.order_date >= '2026-01-01';
The join condition must include the partition column (customer_id). Additional conditions (like order_date) are fine — they become filters, not part of the SPJ decision.
Step 4: Verify with EXPLAIN
EXPLAIN FORMATTED
SELECT o.order_id, o.amount, c.name, c.region
FROM analytics.orders o
JOIN analytics.customers c
ON o.customer_id = c.customer_id;
SPJ is working if: There are no Exchange nodes between the BatchScan and the Join operator.
SPJ is NOT working if: You see Exchange hashpartitioning(customer_id, N) — that means Spark is shuffling.
What Each Configuration Does
| Configuration | Default | Description |
|---|---|---|
spark.sql.sources.v2.bucketing.enabled | false | Master switch. Enables Spark to use V2 data source partitioning to avoid shuffles. |
spark.sql.iceberg.planning.preserve-data-grouping | false | Iceberg-specific. Preserves partition grouping through query planning stages. Without this, Spark's optimizer may discard Iceberg's partitioning metadata. |
spark.sql.sources.v2.bucketing.pushPartValues.enabled | false | Creates empty partitions for partition values that exist on one side but not the other. Without this, SPJ fails when tables have asymmetric partition coverage. |
spark.sql.requireAllClusterKeysForCoPartition | true | When true, requires ALL partition keys to appear in the join. Set to false to allow joins on a superset of partition keys. |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled | false | Handles skewed partitions by splitting large buckets across multiple tasks and replicating the smaller side's corresponding bucket. |
spark.sql.adaptive.enabled | true | AQE can override SPJ decisions. Disable if SPJ is not triggering despite correct setup. |
spark.sql.autoBroadcastJoinThreshold | 10485760 | Spark broadcasts small tables instead of using SPJ. Set to -1 to force SPJ for benchmarking or when you want consistent join behavior. |
SPJ with MERGE INTO — The Killer Use Case
MERGE INTO is internally a join between the target table and the source dataset. By default, Spark shuffles both sides to execute this join — even when both tables are bucketed identically. Enabling SPJ eliminates this shuffle, which is where the largest performance gains occur.
The Setup
-- Target table: bucketed on order_id
CREATE TABLE analytics.orders (
order_id BIGINT,
status STRING,
amount DECIMAL(10,2),
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(128, order_id));
-- Staging table: bucketed identically
CREATE TABLE staging.daily_orders (
order_id BIGINT,
status STRING,
amount DECIMAL(10,2),
updated_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(128, order_id));
The MERGE
MERGE INTO analytics.orders t
USING staging.daily_orders s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
With SPJ enabled, the join phase of this MERGE executes without any shuffle. Bucket 0 of staging.daily_orders is matched against bucket 0 of analytics.orders, bucket 1 against bucket 1, and so on.
Important: The source table must be an Iceberg table with matching bucket partitioning. If your source is a temporary view, a Parquet file, or a DataFrame without Iceberg bucketing, SPJ will not activate. Load your staging data into a properly bucketed Iceberg table first.
Real-World Impact
In GitHub issue #7832, a user reported a MERGE operation joining 50 million rows against 30,000 rows that took 40 minutes with shuffle. The root cause was that Spark performed a ShuffledHashJoin scanning the entire 50M-row table. With SPJ enabled and both tables bucketed identically, the shuffle was eliminated entirely.
The performance difference depends on your data size, cluster configuration, and network bandwidth — but eliminating a shuffle on a 500M+ row table typically reduces join time by 50-80% and cuts compute costs proportionally.
Handling Edge Cases
Missing Partition Values
If the orders table has data in buckets 0-255 but the customers table only has data in buckets 0-200, Spark needs to create empty partitions for buckets 201-255 on the customers side. This is handled by:
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
Without this, SPJ will fail silently and fall back to a shuffle.
Data Skew
If bucket 42 in orders contains 10x more data than other buckets (e.g., a power customer), a single executor gets overloaded. The skew handling configuration:
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true
This splits the large bucket across multiple tasks and replicates the corresponding bucket from the smaller table. Note: this does not work with FULL OUTER JOIN.
AQE Interference
Adaptive Query Execution can override SPJ decisions — AQE may decide that a broadcast join or a different strategy is better based on runtime statistics. If SPJ is not triggering despite correct table setup:
spark.sql.adaptive.enabled=false
Trade-off: Disabling AQE globally loses its benefits (dynamic partition coalescing, skew join handling, broadcast join conversion) for all other queries in the session. Consider setting this only for the specific SPJ query:
spark.conf.set("spark.sql.adaptive.enabled", "false")
# Run SPJ query
spark.sql("SELECT ... FROM orders JOIN customers ON ...")
# Re-enable AQE for other queries
spark.conf.set("spark.sql.adaptive.enabled", "true")
Partition Evolution Breaks SPJ
If a table undergoes partition evolution (adding/removing/reordering partition fields), the internal field IDs may no longer match between tables — even if the final partition spec looks identical. This is a known issue (GitHub #13530).
Workaround: Avoid partition evolution on tables where SPJ is critical. If you must evolve, create a new table with the desired partition spec and migrate data.
3-Way Joins Fail
SPJ currently fails when joining 3+ bucketed tables in a single query (GitHub #15015). The error is typically Unequal numbers of partitions.
Workaround: Break 3-way joins into two sequential 2-way joins:
-- Instead of: SELECT * FROM a JOIN b ON ... JOIN c ON ...
-- Do:
CREATE TEMPORARY VIEW ab AS
SELECT * FROM a JOIN b ON a.id = b.id;
SELECT * FROM ab JOIN c ON ab.id = c.id;
Both Tables Must Be Iceberg
SPJ requires both sides of the join to be Iceberg tables reporting compatible partitioning. Joining an Iceberg table with a Hive table, Parquet files, or a temporary DataFrame view will not trigger SPJ. If your source data comes from an external system, load it into a bucketed Iceberg staging table first.
Spark 4.0 SPJ Enhancements
Spark 4.0 significantly expands SPJ's applicability:
Join Keys as Subset of Partition Keys
In Spark 3.x, if a table is partitioned by bucket(256, customer_id), region, you must include both customer_id and region in the join condition (unless requireAllClusterKeysForCoPartition=false). Spark 4.0 introduces:
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled=true
This allows SPJ to work when the join key is only customer_id, even though the table is also partitioned by region. Spark groups partitions by the common column to enable the optimization.
Compatible Bucket Counts
In Spark 3.x, both tables must have identical bucket counts. Spark 4.0 can handle compatible (but different) bucket counts — for example, joining a table with bucket(256, id) against one with bucket(128, id). Spark groups multiple smaller buckets to align with the larger side.
These enhancements make SPJ practical for more real-world scenarios where tables are not designed from day one with identical partitioning.
When NOT to Use SPJ
SPJ is not always the right choice:
- Small dimension tables: If one side fits in memory, a broadcast join (
spark.sql.autoBroadcastJoinThreshold) is faster than SPJ because it avoids any join-side scan overhead. - Non-equi-joins: SPJ only works with equality join conditions (
=). Range joins, theta joins, and cross joins cannot use SPJ. - Tables with different access patterns: Bucketing a table solely for join performance penalizes other access patterns. If a table is primarily queried by
event_datebut only occasionally joined oncustomer_id, partitioning byday(event_date)may be the better overall choice. - Very few buckets: If both tables have only 4-8 buckets, the parallelism is too low and SPJ may be slower than a shuffle with 200 partitions.
Production Checklist
Table design:
- Both tables use
bucket(N, join_column)with the sameN - Bucket count is appropriate for data volume (64-512 buckets for most workloads)
- Both tables are Iceberg tables (not Hive, Parquet, or temp views)
- No partition evolution has occurred that reorders fields
Spark configuration:
-
spark.sql.sources.v2.bucketing.enabled=true -
spark.sql.iceberg.planning.preserve-data-grouping=true -
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true -
spark.sql.requireAllClusterKeysForCoPartition=false -
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true
Verification:
- Run
EXPLAIN FORMATTED— confirm noExchangenodes between scan and join - Check Spark UI DAG — confirm no shuffle stage between read and join stages
- Compare execution time with and without SPJ on a representative dataset
Maintenance:
- Run compaction regularly — fragmented buckets reduce SPJ effectiveness
- Monitor bucket data distribution — heavily skewed buckets need
partiallyClusteredDistribution - Keep bucket counts aligned when creating new tables that will be joined
How Cazpian Handles This
On Cazpian, SPJ configurations are enabled by default on all managed Spark clusters. When you create bucketed Iceberg tables through Cazpian's catalog, the platform automatically enables preserve-data-grouping and all required V2 bucketing settings. For MERGE INTO workloads, Cazpian's job optimizer detects compatible bucket partitioning and ensures SPJ activates — no manual configuration needed.
What's Next
This post covered how to eliminate shuffles with Storage Partitioned Joins. For related optimizations, see our other posts in this series:
- Writing Efficient MERGE INTO Queries — push-down predicates, COW vs MOR, and compaction after merges.
- Iceberg Table Design — how to choose bucket counts, partition transforms, and write properties.
- Iceberg Query Performance Tuning — partition pruning, bloom filters, and Spark read configs.
- Iceberg on AWS: S3FileIO and Glue Catalog — S3FileIO, ObjectStoreLocationProvider, and avoiding S3 throttling.