Spark SQL Join Strategy: The Complete Optimization Guide
Your Spark job runs for 45 minutes. You check the Spark UI and find that a single join stage consumed 38 of those minutes — shuffling 800 GB across the network because the optimizer picked SortMergeJoin for a query where one side was 40 MB after filtering. Nobody ran ANALYZE TABLE. No statistics existed. The optimizer had no idea the table was small enough to broadcast.
The join strategy is the single most impactful decision in a Spark SQL query plan. It determines whether your data shuffles across the network, whether it spills to disk, whether your driver runs out of memory, and whether your query finishes in seconds or hours. Spark offers five distinct join strategies, each with different performance characteristics, memory requirements, and failure modes. The optimizer picks one based on statistics, hints, configuration, and join type — and it often picks wrong when it lacks information.
This post covers every join strategy in Spark, how the JoinSelection decision tree works internally, how the Catalyst optimizer estimates sizes, how CBO reorders multi-table joins, every join hint and when to use it, how AQE converts strategies at runtime, the equi vs non-equi join problem, how to read physical plans, the most common anti-patterns, and a real-world decision framework you can use in production.
The Five Join Strategies
Spark implements five physical join strategies. Each has distinct tradeoffs in shuffle cost, CPU overhead, memory consumption, and failure risk. Understanding what each one does internally is the foundation for making good optimization decisions.
BroadcastHashJoin (BHJ)
BroadcastHashJoin is the fastest join strategy for queries where one side is small. It eliminates the shuffle entirely by broadcasting the small table to every executor.
How it works internally:
- Collect to driver. The small side (build side) is collected back to the driver via a
collect()operation - Build HashedRelation. The driver constructs an in-memory hash map. For single
LongorIntjoin keys, Spark usesLongHashedRelationbacked byLongToUnsafeRowMap(no boxing overhead). For all other key types —String, composite keys, etc — it usesUnsafeHashedRelationbacked byBytesToBytesMap, which leverages Tungsten off-heap memory management - Serialize and chunk. The driver serializes the
HashedRelationinto chunks ofspark.broadcast.blockSize(default 4 MB), compressed with LZ4 by default - TorrentBroadcast P2P distribution. Executors fetch chunks from the driver or from other executors that already have them. This BitTorrent-like protocol creates exponential fan-out — instead of the driver sending N copies, the load distributes across the cluster
- Local hash join. Each executor iterates through its local partitions of the large table (probe side) and looks up each row's join key in the hash map. No shuffle of the large table occurs
Constraints:
- Hard limit: 8 GB serialized size, 512 million rows
- The driver needs approximately 2x the broadcast table size in heap during the broadcast phase (deserialized + serialized copy)
- Broadcast timeout:
spark.sql.broadcastTimeoutdefaults to 300 seconds - Compressed on-disk size can be misleading — 100 MB of Parquet can expand to 300 MB–1 GB in memory
Best for: Small-to-large table joins where the small side fits comfortably in driver and executor memory.
SortMergeJoin (SMJ)
SortMergeJoin is Spark's default join strategy for equi-joins between two large tables. It is the safest strategy because it handles arbitrary data sizes through spilling.
How it works internally:
- Exchange (shuffle). Both sides are repartitioned by join keys using
HashPartitioning. Each row is serialized, hashed on its join keys, written to local disk, transferred over the network, and deserialized on the destination executor. The number of partitions is controlled byspark.sql.shuffle.partitions(default 200) - Sort. After shuffling, each partition on each side is sorted on the join keys using
UnsafeExternalSorter. Sorting uses Tungsten memory management — when execution memory is exhausted, data spills to disk transparently - Merge. Two sorted iterators are merged in a single linear pass. For each matching key, rows from both sides are combined. For keys with multiple matches on the right side,
ExternalAppendOnlyUnsafeRowArraybuffers the matching rows — and spills to anUnsafeExternalSorterif the buffer exceedsspark.sql.sortMergeJoinExec.buffer.in.memory.threshold(default ~1 billion rows)
Constraints:
- Requires two full shuffles (one per side)
- Requires sorting both sides (CPU-intensive for large datasets)
- Join keys must be orderable (sortable)
- Spill to disk handles memory pressure but degrades performance significantly
Spill optimization note: SPARK-30536 reduced the default memory read buffer in UnsafeSorterSpillReader from 1 MB to 1 KB, significantly improving spill read performance.
Best for: Large-to-large equi-joins where neither side is small enough for broadcast.
ShuffledHashJoin (SHJ)
ShuffledHashJoin builds a hash table from the smaller side's partitions after shuffle. It avoids the sort phase of SMJ — which can be significant for large datasets — but trades sort for hash table construction.
How it works internally:
- Exchange (shuffle). Both sides are repartitioned by join keys (same as SMJ)
- Build. On each executor, the smaller side's local partition is loaded into a
HashedRelation(in-memory hash table) - Probe. The larger side's local partition streams through, looking up each row in the hash table
When Spark chooses it:
spark.sql.join.preferSortMergeJoinisfalse(defaulttrue— so SHJ is disabled by default)- OR: A
SHUFFLE_HASHhint is present - OR: AQE converts SMJ to SHJ at runtime when
spark.sql.adaptive.maxShuffledHashJoinLocalMapThresholdis set - Additional requirements: the build side's
sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions(per-partition data fits in memory) AND the build side is at least 3x smaller than the probe side
Constraints:
- The entire build-side partition must fit in executor memory as a hash table — there is no spill fallback for the hash build phase in older Spark versions (SPARK-32634 introduced a sort-based fallback)
- If a partition is too large, the task fails with
SparkOutOfMemoryError - Join keys do not need to be orderable (unlike SMJ)
Join type support has expanded across versions:
- Spark 3.0: Inner, Cross, LeftOuter, LeftSemi, LeftAnti, RightOuter, ExistenceJoin
- Spark 3.1: Added FullOuter (SPARK-32399)
- Spark 3.3: Added FullOuter codegen (SPARK-32567)
- Spark 3.5: Added left-outer-build-left and right-outer-build-right (SPARK-36612)
Best for: Large-to-large joins where one side is significantly smaller and you can guarantee per-partition data fits in memory. Faster than SMJ when sort is the bottleneck.
BroadcastNestedLoopJoin (BNLJ)
BroadcastNestedLoopJoin is the strategy Spark uses for non-equi joins (theta joins) when one side is small enough to broadcast.
How it works internally:
- Broadcast. The smaller side is broadcast to all executors (same mechanism as BHJ)
- Nested loop. For every row in the streamed (larger) side, it is compared against every row in the broadcast side using the join condition. Complexity: O(N * M)
When used:
- The join condition contains only non-equality predicates (
<,>,>=,<=,!=,LIKE,BETWEEN) - Cross joins with a broadcast hint
- Fallback when no other strategy applies and one side can be broadcast
Supports all join types: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti, Cross, ExistenceJoin.
Best for: Non-equi joins where one side is small. Avoid for large tables — O(N * M) becomes catastrophic quickly.
CartesianProduct
CartesianProduct is the last resort for non-equi inner joins where neither side is small enough to broadcast.
How it works internally:
Every row from the left side is combined with every row from the right side. For a 10,000-row table joined with a 1,000-row table, the result is 10,000,000 rows.
When used:
- Non-equi joins where neither side can be broadcast and the join type is
InnerLike - Explicit
CROSS JOINwithout a join condition - Accidentally missing join conditions
Detection: If spark.sql.crossJoin.enabled=false (the default in Spark 2.x, true in Spark 3.x), Spark throws: "Detected cartesian product for INNER join between logical plans". In Spark 3.x, check EXPLAIN output for CartesianProduct nodes.
Best for: Almost never. If you see this in a plan, verify it is intentional.
How JoinSelection Picks a Strategy
Spark's JoinSelection execution planning strategy (defined in SparkStrategies.scala) follows a strict priority order. Understanding this order explains why your query uses the strategy it does.
For Equi-Joins (with equality join keys)
The decision tree, in exact priority order:
Priority 1 — Check join hints (highest priority):
BROADCAST/BROADCASTJOIN/MAPJOINhint →BroadcastHashJoinExecSHUFFLE_MERGE/MERGE/MERGEJOINhint →SortMergeJoinExecSHUFFLE_HASHhint →ShuffledHashJoinExecSHUFFLE_REPLICATE_NLhint →CartesianProductExec(if InnerLike)
Priority 2 — BroadcastHashJoin (automatic):
If canBroadcast(left) or canBroadcast(right) — meaning the side's estimated sizeInBytes is at or below spark.sql.autoBroadcastJoinThreshold (default 10 MB) — and the join type supports broadcast on that side.
Priority 3 — SortMergeJoin (default):
If spark.sql.join.preferSortMergeJoin=true (default) and the join keys are orderable.
Priority 4 — ShuffledHashJoin:
If spark.sql.join.preferSortMergeJoin=false and canBuildLocalHashMap (per-partition data fits) and muchSmaller (build side is 3x+ smaller than probe side).
Priority 5 — SortMergeJoin (fallback): For remaining equi-joins where keys are orderable.
Priority 6 — CartesianProduct: Final fallback for InnerLike joins.
Priority 7 — BroadcastNestedLoopJoin: Final fallback for non-InnerLike joins.
For Non-Equi Joins (no equality join keys)
- Check join hints
- BroadcastNestedLoopJoin: If one side can be broadcast
- CartesianProduct: If join type is InnerLike and neither side can be broadcast
- BroadcastNestedLoopJoin: Final fallback (Spark picks a side to broadcast)
Key Helper Functions
These internal functions determine eligibility:
canBroadcast(plan):plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThresholdand noNO_BROADCAST_HASHhintcanBuildLeft(joinType): Returnstruefor Cross, Inner, RightOutercanBuildRight(joinType): Returnstruefor Cross, Inner, LeftAnti, LeftOuter, LeftSemi, ExistenceJoincanBuildLocalHashMap(plan):plan.stats.sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitionsmuchSmaller(a, b):a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
Join Type and Strategy Compatibility
Not every strategy supports every join type. This matrix determines what is possible:
| Join Type | BHJ | SMJ | SHJ | BNLJ | CartesianProduct |
|---|---|---|---|---|---|
| Inner | Build either side | Yes | Yes | Yes | Yes |
| Cross | Build either side | Yes | Yes | Yes | Yes |
| LeftOuter | Build right only | Yes | Yes | Yes | No |
| RightOuter | Build left only | Yes | Yes | Yes | No |
| FullOuter | No | Yes | Yes (Spark 3.1+) | Yes | No |
| LeftSemi | Build right only | Yes | Yes | Yes | No |
| LeftAnti | Build right only | Yes | Yes | Yes | No |
| ExistenceJoin | Build right only | Yes | Yes | Yes | No |
Key implications:
- FullOuter joins cannot use BroadcastHashJoin. Even with a
BROADCASThint, Spark falls back to SMJ. This catches many people off guard - BHJ build side depends on join type. For LeftOuter, only the right (smaller) side can be broadcast. For RightOuter, only the left side. For Inner and Cross, either side works
- ShuffledHashJoin gained FullOuter support in Spark 3.1 — before that, the only option for FullOuter was SMJ or BNLJ
How Catalyst Estimates Sizes
The optimizer's strategy selection depends entirely on sizeInBytes estimates. When these estimates are wrong, the optimizer makes wrong decisions.
Statistics Collection with ANALYZE TABLE
-- Table-level stats (row count, size in bytes)
ANALYZE TABLE orders COMPUTE STATISTICS;
-- Column-level stats (min, max, null count, distinct count, avg length, max length)
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, order_date, amount;
-- All columns
ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS;
Column-level statistics are critical for CBO. Without them, the optimizer cannot estimate join cardinality — it does not know how many distinct values exist in a join key column.
Equi-Height Histograms
spark.sql.statistics.histogram.enabled=true -- default false
spark.sql.statistics.histogram.numBins=254 -- default 254
Histograms enable accurate selectivity estimation for skewed data distributions. Without histograms, Spark assumes uniform distribution — which is almost never true in practice.
View collected stats:
DESCRIBE EXTENDED orders customer_id;
File-Level Statistics
Parquet and ORC store metadata in file footers:
- Parquet: Row group metadata with min/max values, null counts, and row counts per column
- ORC: Stripe-level and file-level statistics including counts, min/max, and sum for numeric columns
Spark reads these automatically during planning. They feed into sizeInBytes estimates and enable row group skipping.
sizeInBytes Estimation by Plan Node
- Scan nodes: Estimated from file sizes on disk, Parquet/ORC footer stats, or catalog statistics from
ANALYZE TABLE - Filter: Scaled by estimated selectivity (if column stats exist) or a default factor
- Project: Adjusted based on selected columns
- Join: With CBO, uses cardinality estimation from column NDV, null fractions, and min/max overlap. Without CBO, uses simple heuristics
- Aggregate: Estimated from grouping key cardinality
When Estimates Go Wrong
This is where most join strategy problems originate:
- Missing statistics. Without
ANALYZE TABLE, Spark falls back to file-size heuristics. Parquet files are typically 2-10x smaller on disk than in memory due to compression and encoding. A 100 MB Parquet file might be 500 MB in memory — but the optimizer uses the 100 MB figure - Stale statistics. After large data loads or deletes, statistics become inaccurate. Spark does not automatically refresh them
- Complex expressions. Spark cannot estimate selectivity for UDFs, complex expressions, or non-standard functions
- Compounding errors. Each estimation error compounds through the plan. A 2x error on a filter selectivity combined with a 3x error on a join cardinality produces a 6x error in the final estimate
The practical consequence: the optimizer picks SortMergeJoin when BroadcastHashJoin would be 20x faster, or picks BroadcastHashJoin when the table is too large and the driver OOMs.
Cost-Based Optimizer and Join Reordering
When joining more than two tables, the order in which joins execute can dramatically affect performance.
Enabling CBO
spark.sql.cbo.enabled=true -- Enable CBO (default false)
spark.sql.cbo.joinReorder.enabled=true -- Enable join reordering (default false)
spark.sql.cbo.joinReorder.dp.threshold=12 -- Max tables for DP algorithm (default 12)
spark.sql.cbo.joinReorder.card.weight=0.7 -- Cardinality weight in cost formula
How Join Reordering Works
Spark uses a dynamic programming algorithm (adapted from Selinger 1979):
- Decompose the query into individual table sources and join conditions
- Initialize: each table is a level-1 plan
- For each level m (from 2 to N tables): combine every pair of plans that sum to m, check if a valid join condition exists, compute cost, keep only the best plan
- Return the plan covering all N tables with lowest cost
Cost formula:
cost = rows * weight + size * (1 - weight)
where weight = spark.sql.cbo.joinReorder.card.weight (default 0.7).
Why Join Order Matters
Consider three tables: orders (100M rows), customers (50K rows), products (10K rows).
Bad order: (orders JOIN products) JOIN customers
- First join: 100M x 10K matching rows → potentially billions of intermediate rows before filtering with customers
Good order: (orders JOIN customers) JOIN products
- First join: 100M x 50K → filtered intermediate result, then join with products
CBO finds the optimal order using cardinality estimates from column-level statistics.
Star Schema Detection
spark.sql.cbo.starSchemaDetection=true -- default false
When enabled, Spark's StarSchemaDetection identifies fact tables (the largest table) and dimension tables (smaller tables with foreign key relationships). It places the fact table on the driving arm of the join tree and applies the most selective dimension filters first — reducing data flow early.
This heuristic works even without full CBO statistics and can significantly improve star-schema query performance.
Limitations
- Join reordering applies only to consecutive Inner or Cross joins — not outer joins
- Requires statistics for all tables (
ANALYZE TABLE) dp.thresholddefault of 12 limits the number of tables (the algorithm has exponential complexity)- Disabled when join hints are present (SPARK-26840)
All Join Hints
Hints override the optimizer's strategy choice. Use them when you know better than the optimizer — which is common when statistics are missing or stale.
Available Hints
| Hint | Aliases | Strategy Forced | Since |
|---|---|---|---|
BROADCAST | BROADCASTJOIN, MAPJOIN | BroadcastHashJoin | Spark 2.2 |
MERGE | SHUFFLE_MERGE, MERGEJOIN | SortMergeJoin | Spark 3.0 |
SHUFFLE_HASH | — | ShuffledHashJoin | Spark 3.0 |
SHUFFLE_REPLICATE_NL | — | CartesianProduct or BNLJ | Spark 3.0 |
Hint Priority When Both Sides Have Hints
BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL
If both sides have BROADCAST, the smaller side (by stats) is broadcast. If both sides have SHUFFLE_HASH, the smaller side is the build side. If a hint specifies a strategy that does not support the join type (e.g., BROADCAST on a FullOuter join), Spark ignores the hint.
SQL Syntax
-- Broadcast the dimension table
SELECT /*+ BROADCAST(dim_customers) */ *
FROM fact_orders o
INNER JOIN dim_customers c ON o.customer_id = c.customer_id;
-- Force SortMergeJoin (useful when broadcast causes driver OOM)
SELECT /*+ SHUFFLE_MERGE(fact_orders) */ *
FROM fact_orders o
INNER JOIN large_returns r ON o.order_id = r.order_id;
-- Force ShuffledHashJoin
SELECT /*+ SHUFFLE_HASH(medium_table) */ *
FROM large_table l
INNER JOIN medium_table m ON l.key = m.key;
-- Force CartesianProduct / BNLJ
SELECT /*+ SHUFFLE_REPLICATE_NL(small_table) */ *
FROM large_table l
INNER JOIN small_table s ON l.key > s.key;
DataFrame API Syntax
from pyspark.sql.functions import broadcast
# Broadcast hint (most common)
result = fact_orders.join(broadcast(dim_customers), "customer_id")
# Or using hint() method
result = fact_orders.join(dim_customers.hint("broadcast"), "customer_id")
# Force SortMergeJoin
result = df1.hint("merge").join(df2, "key")
# Force ShuffledHashJoin
result = df1.join(df2.hint("shuffle_hash"), "key")
When to Use Each Hint
- BROADCAST: When you know a table is small after filtering but the optimizer does not (missing stats, complex filter expressions, post-filter size unknown)
- MERGE: When you need to force SMJ to avoid broadcast OOM on a table the optimizer thinks is small but is actually large in memory
- SHUFFLE_HASH: When both sides are large but one side is significantly smaller and sort is the bottleneck
- SHUFFLE_REPLICATE_NL: Rare. For explicit cartesian products when you need them
AQE Runtime Join Optimization
Adaptive Query Execution (AQE), enabled by default since Spark 3.2, optimizes join strategies at runtime using actual shuffle statistics instead of compile-time estimates.
Runtime SortMerge to Broadcast Conversion
After shuffle stages complete, AQE examines the actual shuffle data sizes. If either side of a SortMergeJoin has actual runtime size below spark.sql.adaptive.autoBroadcastJoinThreshold (defaults to the same value as spark.sql.autoBroadcastJoinThreshold, 10 MB), AQE converts the join to BroadcastHashJoin.
This is less efficient than planning BHJ from the start (the shuffle already happened), but still beneficial: it avoids sorting both sides and reads shuffle files locally via a custom shuffle reader instead of over the network.
spark.sql.adaptive.autoBroadcastJoinThreshold=10485760 -- default 10 MB
Runtime SortMerge to ShuffledHash Conversion
AQE can also convert SMJ to SHJ when all post-shuffle partitions are smaller than a threshold:
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=0 -- default 0 (disabled)
Set this to a value like 64MB to enable. AQE checks actual partition sizes and converts when the hash table will fit in memory.
DemoteBroadcastHashJoin
AQE can also remove broadcast when it detects the data has many empty partitions. If the percentage of non-empty partitions falls below spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin (default 0.2), the broadcast is demoted to SortMergeJoin.
This prevents broadcast from being used on tables with highly skewed partition distributions where most partitions are empty — which indicates the broadcast may not be beneficial.
OptimizeSkewedJoin
AQE's skew join handling splits oversized partitions at runtime:
spark.sql.adaptive.skewJoin.enabled=true -- default true
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB -- default
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 -- default
A partition is considered skewed when both conditions are true:
partition_size > skewedPartitionFactor * median_partition_sizepartition_size > skewedPartitionThresholdInBytes
Skewed partitions are split into smaller sub-partitions, and the corresponding partition on the other side is replicated to match. The result: roughly evenly-sized tasks instead of one straggler holding up the entire stage.
How AQE Changes Your Plans
Check the Spark UI SQL tab. AQE shows both the initial plan and the final (optimized) plan. Look for:
AdaptiveSparkPlan isFinalPlan=trueat the rootBroadcastHashJoinappearing whereSortMergeJoinwas plannedCustomShuffleReadernodes indicating optimized shuffle reads- Partition coalescing reducing the number of shuffle partitions
Equi-Join vs Non-Equi Join
The distinction between equi-joins and non-equi joins is fundamental because it determines which strategies are available.
Why Equi-Joins Enable Hash and Sort Strategies
- Hash strategies (BHJ, SHJ): Equality conditions allow hash partitioning.
hash(key)on both sides produces the same hash for matching rows, guaranteeing co-location. This makes O(N) probe possible - Sort-merge (SMJ): Equality on sortable keys allows a single linear-time merge pass through two sorted iterators
- Non-equi (theta) joins: Conditions like
a.ts BETWEEN b.start AND b.endora.price > b.thresholdcannot be hash-partitioned. There is no way to guarantee co-location without examining every combination
Strategies Available by Join Type
| BHJ | SMJ | SHJ | BNLJ | CartesianProduct | |
|---|---|---|---|---|---|
| Equi-join | Yes | Yes | Yes | Yes | Yes |
| Non-equi join | No | No | No | Yes (broadcast) | Yes (InnerLike) |
Range Join Workarounds
Range joins (a.ts BETWEEN b.start AND b.end) are common in event correlation, session analysis, and time-series processing. The naive approach uses BNLJ or CartesianProduct — both catastrophic for large tables.
Binning strategy:
Convert the range join into an equi-join by discretizing the range into bins:
-- Instead of O(N*M) range join:
SELECT * FROM events e JOIN intervals i
ON e.ts BETWEEN i.start_ts AND i.end_ts;
-- Add bin column and do equi-join + range filter:
SELECT * FROM events e JOIN intervals_exploded i
ON e.ts_bin = i.ts_bin
AND e.ts BETWEEN i.start_ts AND i.end_ts;
The binning approach requires exploding the interval table so that each row appears in every bin it spans. This increases the interval table size but converts O(NM) to O(NK) where K is the average number of bins per interval — typically 10-100x faster.
Rewriting OR Conditions
-- This PREVENTS hash/sort-merge join (becomes BNLJ or CartesianProduct):
SELECT * FROM t1 JOIN t2 ON t1.id = t2.id OR t1.name = t2.name;
OR in join conditions prevents hash partitioning because a row could match on id alone, name alone, or both — there is no single hash key.
Workaround — split into separate joins and UNION:
SELECT * FROM t1 JOIN t2 ON t1.id = t2.id
UNION
SELECT * FROM t1 JOIN t2 ON t1.name = t2.name;
Each individual join can now use BHJ or SMJ. The UNION deduplicates the results.
Join Conditions Deep Dive
ON vs WHERE for Outer Joins
For INNER joins, ON and WHERE are semantically equivalent — the optimizer treats them the same. For OUTER joins, they are critically different:
-- Filter in ON: non-matching rows still appear (with NULLs)
SELECT * FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id AND c.country = 'US';
-- Returns ALL orders. Non-US customers appear as NULL rows.
-- Filter in WHERE: effectively converts LEFT JOIN to INNER JOIN
SELECT * FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US';
-- Filters out rows where c.country IS NULL (non-matching rows).
-- LEFT JOIN becomes meaningless.
This is one of the most common bugs in SQL queries. The fix depends on intent: if you want only US-customer orders, use INNER JOIN explicitly. If you want all orders with US customer info when available, use the ON clause.
Single-Key vs Composite-Key Joins
- Single-key: Simpler hash computation. For
Long/Intkeys, Spark uses the optimizedLongHashedRelationwhich avoids boxing overhead - Composite-key: Uses
UnsafeHashedRelationwithBytesToBytesMap. Higher hash collision potential. Ensure all join columns have statistics collected
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, product_id;
Null-Safe Equal (<=>)
Standard equality (=) returns NULL when either operand is NULL — so NULL keys never match in joins. The null-safe equal operator (<=>) returns TRUE when both are NULL:
-- Standard: NULL = NULL → NULL (rows don't match)
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key;
-- Null-safe: NULL <=> NULL → TRUE (rows match)
SELECT * FROM t1 JOIN t2 ON t1.key <=> t2.key;
# PySpark
result = df1.join(df2, df1["key"].eqNullSafe(df2["key"]))
Null-safe equal can be used as an equi-join key and supports BHJ, SHJ, and SMJ.
Filtering Before vs After Joins
Always filter as early as possible. Fewer rows entering the join means smaller shuffle, faster hash build, and less memory:
# Anti-pattern: filter after join
result = orders.join(customers, "customer_id").filter(col("country") == "US")
# Better: filter before join (Catalyst usually does this, but verify with EXPLAIN)
filtered_customers = customers.filter(col("country") == "US")
result = orders.join(filtered_customers, "customer_id")
Catalyst's PushDownPredicates rule usually pushes filters down through joins, but complex expressions, UDFs, and certain plan shapes can prevent pushdown. Always verify with EXPLAIN.
Reading Physical Plans
Knowing how to read a physical plan is essential for diagnosing join strategy problems.
EXPLAIN Variants
EXPLAIN select_statement; -- Basic physical plan
EXPLAIN EXTENDED select_statement; -- Parsed, Analyzed, Optimized, and Physical
EXPLAIN FORMATTED select_statement; -- User-friendly two-section format
EXPLAIN CODEGEN select_statement; -- Generated Java code (whole-stage codegen)
EXPLAIN COST select_statement; -- With cost/statistics information
Identifying Join Strategy from the Plan
== Physical Plan ==
*(5) SortMergeJoin [customer_id#1], [customer_id#2], Inner
:- *(2) Sort [customer_id#1 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer_id#1, 200), ENSURE_REQUIREMENTS
: +- *(1) Filter isnotnull(customer_id#1)
: +- *(1) ColumnarToRow
: +- FileScan parquet [customer_id#1, amount#2] ...
+- *(4) Sort [customer_id#2 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer_id#2, 200), ENSURE_REQUIREMENTS
+- *(3) Filter isnotnull(customer_id#2)
+- *(3) ColumnarToRow
+- FileScan parquet [customer_id#2, name#3] ...
What to look for:
| Pattern in Plan | Meaning |
|---|---|
BroadcastHashJoin | BHJ — look for BroadcastExchange child |
SortMergeJoin | SMJ — look for Exchange + Sort children on both sides |
ShuffledHashJoin | SHJ — look for Exchange children but no Sort |
BroadcastNestedLoopJoin | BNLJ — look for BroadcastExchange child |
CartesianProduct | Full cross join — every row x every row |
Exchange hashpartitioning(...) | Shuffle operation |
BroadcastExchange | Broadcast distribution |
*(N) prefix | Whole-stage codegen stage ID |
Reading direction: Bottom-to-top. Data flows upward from FileScan through Exchange, Sort, and into the Join node.
Key Spark UI SQL Tab Metrics
After running a query, the SQL tab shows the executed plan with actual metrics:
- number of output rows: Total rows produced by each node
- time in join: CPU time spent in the join operator
- shuffle read/write: Bytes transferred during Exchange
- broadcast time: Time to broadcast the build side (BHJ only)
- spill size (disk): Indicates memory pressure during sort or hash operations
- data size: Actual data size at each node (compare with estimated)
What to look for:
Exchangenodes before joins — these are shuffles. No Exchange = SPJ or broadcast- Estimated vs actual row counts — large discrepancies indicate bad statistics
- Spill metrics > 0 — indicates memory pressure, consider increasing executor memory or reducing partition sizes
- Single-task duration >> median task duration — indicates data skew
Runtime Bloom Filter Joins (Spark 3.3+)
Spark 3.3 introduced runtime Bloom filter optimization for shuffle joins. This is a separate optimization from the join strategy itself — it reduces the amount of data entering the join.
How It Works
When one side of a shuffle join has a selective filter, Spark can build a Bloom filter from the filtered side's join keys and push it down as a runtime filter to the other side's scan. This reduces the amount of data read and shuffled from the unfiltered side.
spark.sql.optimizer.runtime.bloomFilter.enabled=true -- default false
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold=10GB -- max creation side
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems=1000000
spark.sql.optimizer.runtime.bloomFilter.maxNumBits=67108864 -- 8 MB max
When It Helps
Bloom filters are most effective in star-schema patterns where a filtered dimension table joins with a large fact table:
-- Without Bloom filter: scan all of fact_orders, then join
-- With Bloom filter: build filter from matching customer_ids,
-- push into fact_orders scan, skip non-matching rows early
SELECT o.*, c.name
FROM fact_orders o
JOIN dim_customers c ON o.customer_id = c.customer_id
WHERE c.country = 'US';
This complements Dynamic Partition Pruning (DPP) and can achieve up to 10x speedup for star-schema queries when DPP alone is not sufficient (e.g., the fact table is not partitioned on the join key).
Dynamic Partition Pruning (DPP)
DPP (enabled by default since Spark 3.0) pushes filter results from dimension tables into fact table scans at runtime.
spark.sql.optimizer.dynamicPartitionPruning.enabled=true -- default true
How It Works
- Dimension table filter produces a set of partition values
- These values are pushed as a subquery into the fact table scan
- The fact table scan skips entire partitions (files) that do not match
- Only matching partitions are read and shuffled
Requirements
- The fact table must be partitioned on the join key column
- The dimension table must have a selective filter
- Works best with star-schema patterns
DPP + AQE Interaction
DPP creates a subquery dependency that can interact with AQE's stage scheduling. In some cases, DPP's subquery must complete before the fact table scan starts, which can reduce parallelism. Spark 3.3+ improved this interaction.
Common Anti-Patterns
1. Missing Statistics — The Silent Killer
-- Without this, every size estimate is a guess
ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS;
ANALYZE TABLE customers COMPUTE STATISTICS FOR ALL COLUMNS;
Without ANALYZE TABLE, Spark uses file-size heuristics that can be 10x off for compressed columnar formats. A 200 MB Parquet file might contain 2 GB of data in memory — but the optimizer uses 200 MB, keeping the table under the broadcast threshold when it should not be broadcast.
2. Accidental Cartesian Products
-- Missing join condition produces N * M rows
SELECT * FROM orders, customers; -- 100M * 50K = 5 TRILLION rows
-- Spark 3.x allows this by default (spark.sql.crossJoin.enabled=true)
-- Detection: look for CartesianProduct in EXPLAIN output
Fix: Always verify join conditions exist. Set spark.sql.crossJoin.enabled=false during development to catch accidental cartesians at compile time.
3. Broadcasting Too-Large Tables
A table that is 100 MB on disk as Parquet can expand to 300 MB–1 GB in memory due to decompression and columnar-to-row conversion. The optimizer uses the on-disk size (100 MB) which is under the default 10 MB threshold if you increased it. But the driver needs to hold the full in-memory representation.
Fix: When increasing autoBroadcastJoinThreshold, account for the compression ratio. Use EXPLAIN COST to check estimated sizes. Monitor driver memory usage.
4. SortMergeJoin on Small Tables
Without statistics, Spark defaults to SMJ even for a 1 MB table joined with a 1 TB table — shuffling both tables across the cluster when a simple broadcast would eliminate the shuffle entirely.
Fix: Run ANALYZE TABLE, or use BROADCAST hint when you know a table is small.
5. Join Explosion (1:N:M)
When join keys are not unique, rows multiply:
-- orders has 100 rows per customer
-- returns has 50 rows per customer
-- Result: 100 * 50 = 5,000 rows per customer (not 100 or 50)
SELECT * FROM orders o
JOIN returns r ON o.customer_id = r.customer_id;
Multi-table cascading joins make this worse: 100 * 50 * 200 = 1,000,000 rows from 100 input rows.
Fix: Deduplicate or aggregate before joining. Use DISTINCT or GROUP BY on join keys when you only need unique matches. Check output row counts in Spark UI.
6. Joining on Skewed Keys
A single hot key (e.g., NULL, "unknown", 0, default values) causes one partition to hold 80%+ of data. One task runs for 45 minutes while 199 executors sit idle.
Fix: Enable AQE skew join (spark.sql.adaptive.skewJoin.enabled=true), or apply salting for severe skew. Filter out null keys before joining if they should not match.
7. Subqueries Instead of Joins
-- Correlated subquery — optimizer rewrites to LEFT SEMI join
SELECT * FROM orders WHERE customer_id IN (SELECT id FROM customers WHERE country = 'US');
-- Non-correlated subquery — may use inefficient BNLJ
SELECT * FROM orders WHERE amount > (SELECT AVG(amount) FROM orders);
Fix: Rewrite as explicit joins when possible. The optimizer handles correlated subqueries well but may struggle with non-correlated ones.
8. Filtering After Join Instead of Before
-- Anti-pattern: shuffle all data, then filter
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01' AND c.country = 'US';
Catalyst usually pushes predicates down, but verify with EXPLAIN. Complex predicates, UDFs, or certain DataFrame operations can prevent pushdown.
9. Not Using AQE
If you are on Spark 3.2+ and have not explicitly enabled AQE, verify it is on:
spark.sql.adaptive.enabled=true -- default true since Spark 3.2
AQE's runtime broadcast conversion catches many cases where compile-time statistics are wrong. It is essentially free optimization.
Optimization Techniques
Pre-Filtering Before Joins
The single most effective optimization. Reduce row counts on both sides before the join:
# Filter both sides aggressively
recent_orders = orders.filter(col("order_date") >= "2024-01-01")
active_customers = customers.filter(col("active") == True)
result = recent_orders.join(broadcast(active_customers), "customer_id")
Bucketing for Shuffle-Free Joins
If two tables are bucketed on the same join key with the same number of buckets, Spark can skip the shuffle entirely.
Hive-style bucketing:
CREATE TABLE orders USING parquet
CLUSTERED BY (customer_id) INTO 256 BUCKETS;
CREATE TABLE customers USING parquet
CLUSTERED BY (customer_id) INTO 256 BUCKETS;
Iceberg SPJ (Storage Partitioned Joins):
CREATE TABLE orders (...) USING iceberg
PARTITIONED BY (bucket(256, customer_id));
CREATE TABLE customers (...) USING iceberg
PARTITIONED BY (bucket(256, customer_id));
Required configurations for Iceberg SPJ:
spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
spark.sql.requireAllClusterKeysForCoPartition=false
Verify with EXPLAIN — no Exchange nodes before the join confirms SPJ is working.
Salting for Skewed Joins
When a join key has extreme skew (e.g., one key has 80% of data), salting distributes the hot key across multiple partitions:
from pyspark.sql import functions as F
SALT_BUCKETS = 10
# Add random salt to the skewed (left) side
salted_left = left_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int"))
# Explode salt on the right side (replicate each row N times)
salted_right = right_df.crossJoin(
spark.range(SALT_BUCKETS).withColumnRenamed("id", "salt")
)
# Join on original key + salt
result = salted_left.join(salted_right, ["join_key", "salt"]).drop("salt")
Tradeoff: the right side is replicated SALT_BUCKETS times. Use only when the skewed side is significantly larger than the replicated side.
Semi Joins to Reduce Data
When you only need rows from one side that have matches in the other, use a semi join instead of a full join:
-- Instead of joining and then selecting only left columns:
SELECT o.* FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id;
-- Use LEFT SEMI JOIN (only returns matching left rows, no duplication):
SELECT o.* FROM orders o
LEFT SEMI JOIN customers c ON o.customer_id = c.customer_id;
Semi joins avoid the row-multiplication problem of full joins and can use broadcast efficiently.
Denormalization to Avoid Joins
For frequently-joined tables, consider pre-joining at write time:
# At ETL time, flatten dimension data into fact table
enriched_orders = orders.join(
broadcast(customers.select("id", "name", "country")),
orders.customer_id == customers.id
)
enriched_orders.writeTo("catalog.db.enriched_orders").append()
Tradeoff: larger storage, stale dimension data, but eliminates runtime join entirely. Effective when dimension data changes infrequently.
Spark Version Join Improvements
Spark 3.0 (2020)
- AQE with runtime broadcast conversion
- AQE skew join handling (OptimizeSkewedJoin)
- DemoteBroadcastHashJoin
- Dynamic Partition Pruning (DPP)
- New join hints: MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL
Spark 3.1 (2021)
- FullOuter join support in ShuffledHashJoin (SPARK-32399)
- ShuffledHashJoin improvements: better partitioning preservation, stream-side ordering
- Code-gen for ShuffledHashJoin (SPARK-32421)
Spark 3.2 (2022)
- AQE enabled by default
- AQE SMJ-to-SHJ conversion via
maxShuffledHashJoinLocalMapThreshold - Improved partition coalescing
Spark 3.3 (2022)
- Runtime Bloom filter for joins
- FullOuter ShuffledHashJoin codegen (SPARK-32567) — 10-20% speedup
- Improved DPP + AQE interaction
Spark 3.4 (2023)
- Storage Partitioned Joins (SPJ) improvements for V2 data sources including Iceberg
spark.sql.sources.v2.bucketing.enabledconfigspark.sql.requireAllClusterKeysForCoPartitionrelaxed requirement
Spark 3.5 (2023)
- Left-outer-build-left / Right-outer-build-right in SHJ (SPARK-36612)
- Improved SPJ for more scenarios
- Better AQE partition coalescing
Spark 4.0 (2025)
- ANSI mode by default (impacts NULL handling in join conditions)
- Collation support in joins (hash joins for queries with inline COLLATE)
- Improved optimizer rules: removing unnecessary outer joins when join keys are unique
- Stream-stream join improvements
Iceberg Integration
How Iceberg Statistics Feed Catalyst
Iceberg stores per-data-file metrics in manifest files: file size, record count, column sizes, value counts, null counts, lower bounds, upper bounds. Spark reads these during planning to estimate sizeInBytes for scan nodes.
NDV (Number of Distinct Values) via Puffin files:
ANALYZE TABLE catalog.db.orders COMPUTE STATISTICS FOR COLUMNS customer_id;
This triggers ComputeTableStatsSparkAction which computes Theta sketches for NDV per column, saved in Puffin format (Iceberg's statistics file format). When spark.sql.cbo.enabled=true, NDV is retrieved from the statistics file and used for join cardinality estimation — critical for CBO join reordering and strategy selection.
Storage Partitioned Joins (SPJ)
SPJ eliminates shuffle entirely by aligning table partitions on disk. When both tables are bucketed on the same join key with the same number of buckets, Spark joins corresponding partitions directly — bucket 0 from left joins bucket 0 from right.
Setup:
CREATE TABLE fact_orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2)
) USING iceberg
PARTITIONED BY (bucket(128, customer_id));
CREATE TABLE dim_customers (
customer_id BIGINT,
name STRING,
country STRING
) USING iceberg
PARTITIONED BY (bucket(128, customer_id));
Required configs:
spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
spark.sql.requireAllClusterKeysForCoPartition=false
Known limitations:
- AQE can interfere with SPJ decisions — may need to disable AQE for SPJ
- 3-way joins can fail (Iceberg Issue #15015)
- Both tables must be Iceberg V2 data source
Partition Pruning Reducing Join Input
Iceberg's manifest-level min/max statistics enable file pruning beyond partition-level filtering. Combined with DPP, this drastically reduces the data entering joins — potentially making tables small enough for broadcast that would otherwise require SMJ.
Real-World Decision Framework
Decision Tree
Two tables need to join.
|
├── Is one side < 10 MB (or your tuned threshold)?
│ YES → BroadcastHashJoin (automatic)
│ NO → continue
│
├── Is one side < 1 GB and driver has 4+ GB heap?
│ YES → Consider raising autoBroadcastJoinThreshold
│ Account for compression ratio (Parquet 2-10x expansion)
│ NO → continue
│
├── Is the join equi-join?
│ NO → Can one side be broadcast?
│ │ YES → BroadcastNestedLoopJoin
│ │ NO → Rewrite: binning for range, OR → UNION
│ │ If InnerLike: CartesianProduct (last resort)
│ YES → continue
│
├── Are both tables bucketed on the join key (same N buckets)?
│ YES → Storage Partitioned Join (zero shuffle)
│ NO → continue
│
├── Is one side 3x+ smaller and fits in per-partition memory?
│ YES → Consider ShuffledHashJoin (SHUFFLE_HASH hint)
│ NO → SortMergeJoin (default, safest for large-large)
│
├── Is there key skew?
│ YES → Enable AQE skew join or apply salting
│ NO → default should work
│
└── Are runtime sizes different from compile-time estimates?
YES → Enable AQE for runtime optimization
NO → Verify with EXPLAIN and Spark UI
When to Override the Optimizer
- Statistics are wrong or missing — use
BROADCASThint when you know one side is small - AQE overhead unacceptable — for latency-sensitive queries, use explicit hints
- SPJ not being selected — verify configs, check
EXPLAINfor Exchange nodes - Shuffle is the bottleneck — consider broadcast for slightly-larger-than-threshold tables
- Driver memory is limited — use
MERGEhint to force SMJ and avoid broadcast
Cost Analysis by Strategy
| Strategy | Shuffle Cost | CPU Cost | Memory Cost | Failure Risk |
|---|---|---|---|---|
| BHJ | None (broadcast only) | O(N) probe | O(M) per executor | Driver OOM if M too large |
| SMJ | O(N+M) both sides | O(N log N + M log M) sort | Sort buffers, spill-safe | Slow for small M |
| SHJ | O(N+M) both sides | O(N+M) build + probe | O(M/P) per partition | OOM if partition large |
| BNLJ | None (broadcast only) | O(N * M) comparisons | O(M) per executor | Slow for large M |
| CartesianProduct | O(N * M) | O(N * M) | O(N * M) output | Catastrophic |
Where N = larger table, M = smaller table, P = number of partitions.
How Cazpian Uses Join Optimization
Cazpian's compute pools run Apache Spark against Iceberg tables on S3. The join optimization strategies in this guide are directly applicable:
- Automatic statistics. Cazpian's catalog integration means Iceberg manifest-level statistics are always available to the Catalyst optimizer. This prevents the "missing statistics" anti-pattern that causes most join strategy mistakes
- SPJ-ready architecture. Cazpian tables can be bucketed with
PARTITIONED BY (bucket(N, column))to enable Storage Partitioned Joins, eliminating shuffle for frequently-joined table pairs - AQE enabled by default. Cazpian compute pools run with AQE enabled, providing runtime broadcast conversion, skew join handling, and partition coalescing out of the box
- Compute pool sizing. Understanding join memory requirements — driver memory for broadcast, executor memory for hash tables, sort buffers for SMJ — directly informs Cazpian compute pool configuration
- Query federation. When Cazpian queries JDBC sources alongside Iceberg tables, join strategy becomes even more critical. Broadcasting a small JDBC lookup table against a large Iceberg fact table avoids shuffling data that was already expensive to read
Complete Configuration Reference
Join Strategy Selection
| Configuration | Default | Description |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Max size for automatic broadcast. Set to -1 to disable |
spark.sql.join.preferSortMergeJoin | true | Prefer SMJ over SHJ when both are eligible |
spark.sql.broadcastTimeout | 300 (seconds) | Timeout for broadcast operations |
spark.broadcast.blockSize | 4m | TorrentBroadcast chunk size |
spark.broadcast.compress | true | Compress broadcast data with LZ4 |
spark.sql.shuffle.partitions | 200 | Number of shuffle partitions |
spark.sql.crossJoin.enabled | true (3.x+) | Allow cartesian products |
Adaptive Query Execution
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.enabled | true (3.2+) | Enable AQE |
spark.sql.adaptive.autoBroadcastJoinThreshold | same as autoBroadcastJoinThreshold | AQE runtime broadcast threshold |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold | 0 (disabled) | AQE SMJ-to-SHJ conversion threshold |
spark.sql.adaptive.skewJoin.enabled | true | AQE skew join handling |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | Min size to consider skewed |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | Factor vs median for skew detection |
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin | 0.2 | DemoteBroadcastHashJoin threshold |
Cost-Based Optimizer
| Configuration | Default | Description |
|---|---|---|
spark.sql.cbo.enabled | false | Enable CBO |
spark.sql.cbo.joinReorder.enabled | false | Enable join reordering |
spark.sql.cbo.joinReorder.dp.threshold | 12 | Max tables for DP reorder algorithm |
spark.sql.cbo.joinReorder.card.weight | 0.7 | Cardinality weight in cost formula |
spark.sql.cbo.starSchemaDetection | false | Star schema heuristic |
spark.sql.statistics.histogram.enabled | false | Column histograms for selectivity |
spark.sql.statistics.histogram.numBins | 254 | Histogram bins |
Runtime Filters
| Configuration | Default | Description |
|---|---|---|
spark.sql.optimizer.dynamicPartitionPruning.enabled | true | Dynamic Partition Pruning |
spark.sql.optimizer.runtime.bloomFilter.enabled | false | Runtime Bloom filter |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold | 10GB | Max creation side for Bloom filter |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits | 67108864 (8 MB) | Max Bloom filter size |
SortMergeJoin Internals
| Configuration | Default | Description |
|---|---|---|
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | 2^30 (~1B rows) | SMJ in-memory buffer rows |
spark.sql.sortMergeJoinExec.buffer.spill.threshold | 2^30 (~1B rows) | SMJ spill threshold rows |
Storage Partitioned Joins (Iceberg)
| Configuration | Default | Description |
|---|---|---|
spark.sql.sources.v2.bucketing.enabled | false | Enable SPJ for V2 sources |
spark.sql.iceberg.planning.preserve-data-grouping | false | Preserve Iceberg data grouping |
spark.sql.sources.v2.bucketing.pushPartValues.enabled | false | Push partition values for SPJ |
spark.sql.requireAllClusterKeysForCoPartition | true | Require all cluster keys to match |
Summary
The join strategy is the most consequential decision in a Spark SQL query plan. Getting it right means the difference between queries that finish in seconds and queries that run for hours while burning compute resources.
The critical takeaways:
- Run
ANALYZE TABLE. Missing statistics is the root cause of most join strategy mistakes. The optimizer cannot choose the right strategy if it does not know the table sizes - Understand the five strategies. BHJ eliminates shuffle for small-large joins. SMJ handles any size through spilling. SHJ avoids sort overhead. BNLJ handles non-equi joins. CartesianProduct is the last resort
- Use hints when statistics fail.
BROADCAST,MERGE, andSHUFFLE_HASHhints override the optimizer's choice when you know better - Enable AQE. Runtime optimization catches cases where compile-time statistics are wrong. It is the single most impactful "set and forget" optimization
- Read your plans.
EXPLAINand the Spark UI SQL tab reveal exactly what strategy Spark chose and why. Check for Exchange nodes, compare estimated vs actual row counts, and look for spill metrics - Bucket for frequently-joined tables. Iceberg SPJ eliminates shuffle entirely for co-bucketed tables — the biggest free performance win for repeated join patterns
- Filter early, join late. Every row filtered before a join is a row that does not shuffle, sort, hash, or compare. Pre-filtering is the simplest and most effective optimization
The next time a Spark job runs slowly, check the join strategy first. It is almost always where the time goes.