Skip to main content

Spark SQL Join Strategy: The Complete Optimization Guide

· 36 min read
Cazpian Engineering
Platform Engineering Team

Spark SQL Join Strategy: The Complete Optimization Guide

Your Spark job runs for 45 minutes. You check the Spark UI and find that a single join stage consumed 38 of those minutes — shuffling 800 GB across the network because the optimizer picked SortMergeJoin for a query where one side was 40 MB after filtering. Nobody ran ANALYZE TABLE. No statistics existed. The optimizer had no idea the table was small enough to broadcast.

The join strategy is the single most impactful decision in a Spark SQL query plan. It determines whether your data shuffles across the network, whether it spills to disk, whether your driver runs out of memory, and whether your query finishes in seconds or hours. Spark offers five distinct join strategies, each with different performance characteristics, memory requirements, and failure modes. The optimizer picks one based on statistics, hints, configuration, and join type — and it often picks wrong when it lacks information.

This post covers every join strategy in Spark, how the JoinSelection decision tree works internally, how the Catalyst optimizer estimates sizes, how CBO reorders multi-table joins, every join hint and when to use it, how AQE converts strategies at runtime, the equi vs non-equi join problem, how to read physical plans, the most common anti-patterns, and a real-world decision framework you can use in production.

The Five Join Strategies

Spark implements five physical join strategies. Each has distinct tradeoffs in shuffle cost, CPU overhead, memory consumption, and failure risk. Understanding what each one does internally is the foundation for making good optimization decisions.

Detailed diagram of all five Spark join strategies showing BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin, BroadcastNestedLoopJoin, and CartesianProduct with the JoinSelection decision tree

BroadcastHashJoin (BHJ)

BroadcastHashJoin is the fastest join strategy for queries where one side is small. It eliminates the shuffle entirely by broadcasting the small table to every executor.

How it works internally:

  1. Collect to driver. The small side (build side) is collected back to the driver via a collect() operation
  2. Build HashedRelation. The driver constructs an in-memory hash map. For single Long or Int join keys, Spark uses LongHashedRelation backed by LongToUnsafeRowMap (no boxing overhead). For all other key types — String, composite keys, etc — it uses UnsafeHashedRelation backed by BytesToBytesMap, which leverages Tungsten off-heap memory management
  3. Serialize and chunk. The driver serializes the HashedRelation into chunks of spark.broadcast.blockSize (default 4 MB), compressed with LZ4 by default
  4. TorrentBroadcast P2P distribution. Executors fetch chunks from the driver or from other executors that already have them. This BitTorrent-like protocol creates exponential fan-out — instead of the driver sending N copies, the load distributes across the cluster
  5. Local hash join. Each executor iterates through its local partitions of the large table (probe side) and looks up each row's join key in the hash map. No shuffle of the large table occurs

Constraints:

  • Hard limit: 8 GB serialized size, 512 million rows
  • The driver needs approximately 2x the broadcast table size in heap during the broadcast phase (deserialized + serialized copy)
  • Broadcast timeout: spark.sql.broadcastTimeout defaults to 300 seconds
  • Compressed on-disk size can be misleading — 100 MB of Parquet can expand to 300 MB–1 GB in memory

Best for: Small-to-large table joins where the small side fits comfortably in driver and executor memory.

SortMergeJoin (SMJ)

SortMergeJoin is Spark's default join strategy for equi-joins between two large tables. It is the safest strategy because it handles arbitrary data sizes through spilling.

How it works internally:

  1. Exchange (shuffle). Both sides are repartitioned by join keys using HashPartitioning. Each row is serialized, hashed on its join keys, written to local disk, transferred over the network, and deserialized on the destination executor. The number of partitions is controlled by spark.sql.shuffle.partitions (default 200)
  2. Sort. After shuffling, each partition on each side is sorted on the join keys using UnsafeExternalSorter. Sorting uses Tungsten memory management — when execution memory is exhausted, data spills to disk transparently
  3. Merge. Two sorted iterators are merged in a single linear pass. For each matching key, rows from both sides are combined. For keys with multiple matches on the right side, ExternalAppendOnlyUnsafeRowArray buffers the matching rows — and spills to an UnsafeExternalSorter if the buffer exceeds spark.sql.sortMergeJoinExec.buffer.in.memory.threshold (default ~1 billion rows)

Constraints:

  • Requires two full shuffles (one per side)
  • Requires sorting both sides (CPU-intensive for large datasets)
  • Join keys must be orderable (sortable)
  • Spill to disk handles memory pressure but degrades performance significantly

Spill optimization note: SPARK-30536 reduced the default memory read buffer in UnsafeSorterSpillReader from 1 MB to 1 KB, significantly improving spill read performance.

Best for: Large-to-large equi-joins where neither side is small enough for broadcast.

ShuffledHashJoin (SHJ)

ShuffledHashJoin builds a hash table from the smaller side's partitions after shuffle. It avoids the sort phase of SMJ — which can be significant for large datasets — but trades sort for hash table construction.

How it works internally:

  1. Exchange (shuffle). Both sides are repartitioned by join keys (same as SMJ)
  2. Build. On each executor, the smaller side's local partition is loaded into a HashedRelation (in-memory hash table)
  3. Probe. The larger side's local partition streams through, looking up each row in the hash table

When Spark chooses it:

  • spark.sql.join.preferSortMergeJoin is false (default true — so SHJ is disabled by default)
  • OR: A SHUFFLE_HASH hint is present
  • OR: AQE converts SMJ to SHJ at runtime when spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold is set
  • Additional requirements: the build side's sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions (per-partition data fits in memory) AND the build side is at least 3x smaller than the probe side

Constraints:

  • The entire build-side partition must fit in executor memory as a hash table — there is no spill fallback for the hash build phase in older Spark versions (SPARK-32634 introduced a sort-based fallback)
  • If a partition is too large, the task fails with SparkOutOfMemoryError
  • Join keys do not need to be orderable (unlike SMJ)

Join type support has expanded across versions:

  • Spark 3.0: Inner, Cross, LeftOuter, LeftSemi, LeftAnti, RightOuter, ExistenceJoin
  • Spark 3.1: Added FullOuter (SPARK-32399)
  • Spark 3.3: Added FullOuter codegen (SPARK-32567)
  • Spark 3.5: Added left-outer-build-left and right-outer-build-right (SPARK-36612)

Best for: Large-to-large joins where one side is significantly smaller and you can guarantee per-partition data fits in memory. Faster than SMJ when sort is the bottleneck.

BroadcastNestedLoopJoin (BNLJ)

BroadcastNestedLoopJoin is the strategy Spark uses for non-equi joins (theta joins) when one side is small enough to broadcast.

How it works internally:

  1. Broadcast. The smaller side is broadcast to all executors (same mechanism as BHJ)
  2. Nested loop. For every row in the streamed (larger) side, it is compared against every row in the broadcast side using the join condition. Complexity: O(N * M)

When used:

  • The join condition contains only non-equality predicates (<, >, >=, <=, !=, LIKE, BETWEEN)
  • Cross joins with a broadcast hint
  • Fallback when no other strategy applies and one side can be broadcast

Supports all join types: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti, Cross, ExistenceJoin.

Best for: Non-equi joins where one side is small. Avoid for large tables — O(N * M) becomes catastrophic quickly.

CartesianProduct

CartesianProduct is the last resort for non-equi inner joins where neither side is small enough to broadcast.

How it works internally:

Every row from the left side is combined with every row from the right side. For a 10,000-row table joined with a 1,000-row table, the result is 10,000,000 rows.

When used:

  • Non-equi joins where neither side can be broadcast and the join type is InnerLike
  • Explicit CROSS JOIN without a join condition
  • Accidentally missing join conditions

Detection: If spark.sql.crossJoin.enabled=false (the default in Spark 2.x, true in Spark 3.x), Spark throws: "Detected cartesian product for INNER join between logical plans". In Spark 3.x, check EXPLAIN output for CartesianProduct nodes.

Best for: Almost never. If you see this in a plan, verify it is intentional.

How JoinSelection Picks a Strategy

Spark's JoinSelection execution planning strategy (defined in SparkStrategies.scala) follows a strict priority order. Understanding this order explains why your query uses the strategy it does.

For Equi-Joins (with equality join keys)

The decision tree, in exact priority order:

Priority 1 — Check join hints (highest priority):

  • BROADCAST / BROADCASTJOIN / MAPJOIN hint → BroadcastHashJoinExec
  • SHUFFLE_MERGE / MERGE / MERGEJOIN hint → SortMergeJoinExec
  • SHUFFLE_HASH hint → ShuffledHashJoinExec
  • SHUFFLE_REPLICATE_NL hint → CartesianProductExec (if InnerLike)

Priority 2 — BroadcastHashJoin (automatic): If canBroadcast(left) or canBroadcast(right) — meaning the side's estimated sizeInBytes is at or below spark.sql.autoBroadcastJoinThreshold (default 10 MB) — and the join type supports broadcast on that side.

Priority 3 — SortMergeJoin (default): If spark.sql.join.preferSortMergeJoin=true (default) and the join keys are orderable.

Priority 4 — ShuffledHashJoin: If spark.sql.join.preferSortMergeJoin=false and canBuildLocalHashMap (per-partition data fits) and muchSmaller (build side is 3x+ smaller than probe side).

Priority 5 — SortMergeJoin (fallback): For remaining equi-joins where keys are orderable.

Priority 6 — CartesianProduct: Final fallback for InnerLike joins.

Priority 7 — BroadcastNestedLoopJoin: Final fallback for non-InnerLike joins.

For Non-Equi Joins (no equality join keys)

  1. Check join hints
  2. BroadcastNestedLoopJoin: If one side can be broadcast
  3. CartesianProduct: If join type is InnerLike and neither side can be broadcast
  4. BroadcastNestedLoopJoin: Final fallback (Spark picks a side to broadcast)

Key Helper Functions

These internal functions determine eligibility:

  • canBroadcast(plan): plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold and no NO_BROADCAST_HASH hint
  • canBuildLeft(joinType): Returns true for Cross, Inner, RightOuter
  • canBuildRight(joinType): Returns true for Cross, Inner, LeftAnti, LeftOuter, LeftSemi, ExistenceJoin
  • canBuildLocalHashMap(plan): plan.stats.sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions
  • muchSmaller(a, b): a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes

Join Type and Strategy Compatibility

Not every strategy supports every join type. This matrix determines what is possible:

Join TypeBHJSMJSHJBNLJCartesianProduct
InnerBuild either sideYesYesYesYes
CrossBuild either sideYesYesYesYes
LeftOuterBuild right onlyYesYesYesNo
RightOuterBuild left onlyYesYesYesNo
FullOuterNoYesYes (Spark 3.1+)YesNo
LeftSemiBuild right onlyYesYesYesNo
LeftAntiBuild right onlyYesYesYesNo
ExistenceJoinBuild right onlyYesYesYesNo

Key implications:

  • FullOuter joins cannot use BroadcastHashJoin. Even with a BROADCAST hint, Spark falls back to SMJ. This catches many people off guard
  • BHJ build side depends on join type. For LeftOuter, only the right (smaller) side can be broadcast. For RightOuter, only the left side. For Inner and Cross, either side works
  • ShuffledHashJoin gained FullOuter support in Spark 3.1 — before that, the only option for FullOuter was SMJ or BNLJ

How Catalyst Estimates Sizes

The optimizer's strategy selection depends entirely on sizeInBytes estimates. When these estimates are wrong, the optimizer makes wrong decisions.

Statistics Collection with ANALYZE TABLE

-- Table-level stats (row count, size in bytes)
ANALYZE TABLE orders COMPUTE STATISTICS;

-- Column-level stats (min, max, null count, distinct count, avg length, max length)
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, order_date, amount;

-- All columns
ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS;

Column-level statistics are critical for CBO. Without them, the optimizer cannot estimate join cardinality — it does not know how many distinct values exist in a join key column.

Equi-Height Histograms

spark.sql.statistics.histogram.enabled=true    -- default false
spark.sql.statistics.histogram.numBins=254 -- default 254

Histograms enable accurate selectivity estimation for skewed data distributions. Without histograms, Spark assumes uniform distribution — which is almost never true in practice.

View collected stats:

DESCRIBE EXTENDED orders customer_id;

File-Level Statistics

Parquet and ORC store metadata in file footers:

  • Parquet: Row group metadata with min/max values, null counts, and row counts per column
  • ORC: Stripe-level and file-level statistics including counts, min/max, and sum for numeric columns

Spark reads these automatically during planning. They feed into sizeInBytes estimates and enable row group skipping.

sizeInBytes Estimation by Plan Node

  • Scan nodes: Estimated from file sizes on disk, Parquet/ORC footer stats, or catalog statistics from ANALYZE TABLE
  • Filter: Scaled by estimated selectivity (if column stats exist) or a default factor
  • Project: Adjusted based on selected columns
  • Join: With CBO, uses cardinality estimation from column NDV, null fractions, and min/max overlap. Without CBO, uses simple heuristics
  • Aggregate: Estimated from grouping key cardinality

When Estimates Go Wrong

This is where most join strategy problems originate:

  1. Missing statistics. Without ANALYZE TABLE, Spark falls back to file-size heuristics. Parquet files are typically 2-10x smaller on disk than in memory due to compression and encoding. A 100 MB Parquet file might be 500 MB in memory — but the optimizer uses the 100 MB figure
  2. Stale statistics. After large data loads or deletes, statistics become inaccurate. Spark does not automatically refresh them
  3. Complex expressions. Spark cannot estimate selectivity for UDFs, complex expressions, or non-standard functions
  4. Compounding errors. Each estimation error compounds through the plan. A 2x error on a filter selectivity combined with a 3x error on a join cardinality produces a 6x error in the final estimate

The practical consequence: the optimizer picks SortMergeJoin when BroadcastHashJoin would be 20x faster, or picks BroadcastHashJoin when the table is too large and the driver OOMs.

Cost-Based Optimizer and Join Reordering

When joining more than two tables, the order in which joins execute can dramatically affect performance.

Enabling CBO

spark.sql.cbo.enabled=true                  -- Enable CBO (default false)
spark.sql.cbo.joinReorder.enabled=true -- Enable join reordering (default false)
spark.sql.cbo.joinReorder.dp.threshold=12 -- Max tables for DP algorithm (default 12)
spark.sql.cbo.joinReorder.card.weight=0.7 -- Cardinality weight in cost formula

How Join Reordering Works

Spark uses a dynamic programming algorithm (adapted from Selinger 1979):

  1. Decompose the query into individual table sources and join conditions
  2. Initialize: each table is a level-1 plan
  3. For each level m (from 2 to N tables): combine every pair of plans that sum to m, check if a valid join condition exists, compute cost, keep only the best plan
  4. Return the plan covering all N tables with lowest cost

Cost formula:

cost = rows * weight + size * (1 - weight)

where weight = spark.sql.cbo.joinReorder.card.weight (default 0.7).

Why Join Order Matters

Consider three tables: orders (100M rows), customers (50K rows), products (10K rows).

Bad order: (orders JOIN products) JOIN customers

  • First join: 100M x 10K matching rows → potentially billions of intermediate rows before filtering with customers

Good order: (orders JOIN customers) JOIN products

  • First join: 100M x 50K → filtered intermediate result, then join with products

CBO finds the optimal order using cardinality estimates from column-level statistics.

Star Schema Detection

spark.sql.cbo.starSchemaDetection=true  -- default false

When enabled, Spark's StarSchemaDetection identifies fact tables (the largest table) and dimension tables (smaller tables with foreign key relationships). It places the fact table on the driving arm of the join tree and applies the most selective dimension filters first — reducing data flow early.

This heuristic works even without full CBO statistics and can significantly improve star-schema query performance.

Limitations

  • Join reordering applies only to consecutive Inner or Cross joins — not outer joins
  • Requires statistics for all tables (ANALYZE TABLE)
  • dp.threshold default of 12 limits the number of tables (the algorithm has exponential complexity)
  • Disabled when join hints are present (SPARK-26840)

All Join Hints

Hints override the optimizer's strategy choice. Use them when you know better than the optimizer — which is common when statistics are missing or stale.

Available Hints

HintAliasesStrategy ForcedSince
BROADCASTBROADCASTJOIN, MAPJOINBroadcastHashJoinSpark 2.2
MERGESHUFFLE_MERGE, MERGEJOINSortMergeJoinSpark 3.0
SHUFFLE_HASHShuffledHashJoinSpark 3.0
SHUFFLE_REPLICATE_NLCartesianProduct or BNLJSpark 3.0

Hint Priority When Both Sides Have Hints

BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL

If both sides have BROADCAST, the smaller side (by stats) is broadcast. If both sides have SHUFFLE_HASH, the smaller side is the build side. If a hint specifies a strategy that does not support the join type (e.g., BROADCAST on a FullOuter join), Spark ignores the hint.

SQL Syntax

-- Broadcast the dimension table
SELECT /*+ BROADCAST(dim_customers) */ *
FROM fact_orders o
INNER JOIN dim_customers c ON o.customer_id = c.customer_id;

-- Force SortMergeJoin (useful when broadcast causes driver OOM)
SELECT /*+ SHUFFLE_MERGE(fact_orders) */ *
FROM fact_orders o
INNER JOIN large_returns r ON o.order_id = r.order_id;

-- Force ShuffledHashJoin
SELECT /*+ SHUFFLE_HASH(medium_table) */ *
FROM large_table l
INNER JOIN medium_table m ON l.key = m.key;

-- Force CartesianProduct / BNLJ
SELECT /*+ SHUFFLE_REPLICATE_NL(small_table) */ *
FROM large_table l
INNER JOIN small_table s ON l.key > s.key;

DataFrame API Syntax

from pyspark.sql.functions import broadcast

# Broadcast hint (most common)
result = fact_orders.join(broadcast(dim_customers), "customer_id")

# Or using hint() method
result = fact_orders.join(dim_customers.hint("broadcast"), "customer_id")

# Force SortMergeJoin
result = df1.hint("merge").join(df2, "key")

# Force ShuffledHashJoin
result = df1.join(df2.hint("shuffle_hash"), "key")

When to Use Each Hint

  • BROADCAST: When you know a table is small after filtering but the optimizer does not (missing stats, complex filter expressions, post-filter size unknown)
  • MERGE: When you need to force SMJ to avoid broadcast OOM on a table the optimizer thinks is small but is actually large in memory
  • SHUFFLE_HASH: When both sides are large but one side is significantly smaller and sort is the bottleneck
  • SHUFFLE_REPLICATE_NL: Rare. For explicit cartesian products when you need them

AQE Runtime Join Optimization

Adaptive Query Execution (AQE), enabled by default since Spark 3.2, optimizes join strategies at runtime using actual shuffle statistics instead of compile-time estimates.

Runtime SortMerge to Broadcast Conversion

After shuffle stages complete, AQE examines the actual shuffle data sizes. If either side of a SortMergeJoin has actual runtime size below spark.sql.adaptive.autoBroadcastJoinThreshold (defaults to the same value as spark.sql.autoBroadcastJoinThreshold, 10 MB), AQE converts the join to BroadcastHashJoin.

This is less efficient than planning BHJ from the start (the shuffle already happened), but still beneficial: it avoids sorting both sides and reads shuffle files locally via a custom shuffle reader instead of over the network.

spark.sql.adaptive.autoBroadcastJoinThreshold=10485760  -- default 10 MB

Runtime SortMerge to ShuffledHash Conversion

AQE can also convert SMJ to SHJ when all post-shuffle partitions are smaller than a threshold:

spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=0  -- default 0 (disabled)

Set this to a value like 64MB to enable. AQE checks actual partition sizes and converts when the hash table will fit in memory.

DemoteBroadcastHashJoin

AQE can also remove broadcast when it detects the data has many empty partitions. If the percentage of non-empty partitions falls below spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin (default 0.2), the broadcast is demoted to SortMergeJoin.

This prevents broadcast from being used on tables with highly skewed partition distributions where most partitions are empty — which indicates the broadcast may not be beneficial.

OptimizeSkewedJoin

AQE's skew join handling splits oversized partitions at runtime:

spark.sql.adaptive.skewJoin.enabled=true                              -- default true
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB -- default
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 -- default

A partition is considered skewed when both conditions are true:

  1. partition_size > skewedPartitionFactor * median_partition_size
  2. partition_size > skewedPartitionThresholdInBytes

Skewed partitions are split into smaller sub-partitions, and the corresponding partition on the other side is replicated to match. The result: roughly evenly-sized tasks instead of one straggler holding up the entire stage.

How AQE Changes Your Plans

Check the Spark UI SQL tab. AQE shows both the initial plan and the final (optimized) plan. Look for:

  • AdaptiveSparkPlan isFinalPlan=true at the root
  • BroadcastHashJoin appearing where SortMergeJoin was planned
  • CustomShuffleReader nodes indicating optimized shuffle reads
  • Partition coalescing reducing the number of shuffle partitions

Equi-Join vs Non-Equi Join

The distinction between equi-joins and non-equi joins is fundamental because it determines which strategies are available.

Why Equi-Joins Enable Hash and Sort Strategies

  • Hash strategies (BHJ, SHJ): Equality conditions allow hash partitioning. hash(key) on both sides produces the same hash for matching rows, guaranteeing co-location. This makes O(N) probe possible
  • Sort-merge (SMJ): Equality on sortable keys allows a single linear-time merge pass through two sorted iterators
  • Non-equi (theta) joins: Conditions like a.ts BETWEEN b.start AND b.end or a.price > b.threshold cannot be hash-partitioned. There is no way to guarantee co-location without examining every combination

Strategies Available by Join Type

BHJSMJSHJBNLJCartesianProduct
Equi-joinYesYesYesYesYes
Non-equi joinNoNoNoYes (broadcast)Yes (InnerLike)

Range Join Workarounds

Range joins (a.ts BETWEEN b.start AND b.end) are common in event correlation, session analysis, and time-series processing. The naive approach uses BNLJ or CartesianProduct — both catastrophic for large tables.

Binning strategy:

Convert the range join into an equi-join by discretizing the range into bins:

-- Instead of O(N*M) range join:
SELECT * FROM events e JOIN intervals i
ON e.ts BETWEEN i.start_ts AND i.end_ts;

-- Add bin column and do equi-join + range filter:
SELECT * FROM events e JOIN intervals_exploded i
ON e.ts_bin = i.ts_bin
AND e.ts BETWEEN i.start_ts AND i.end_ts;

The binning approach requires exploding the interval table so that each row appears in every bin it spans. This increases the interval table size but converts O(NM) to O(NK) where K is the average number of bins per interval — typically 10-100x faster.

Rewriting OR Conditions

-- This PREVENTS hash/sort-merge join (becomes BNLJ or CartesianProduct):
SELECT * FROM t1 JOIN t2 ON t1.id = t2.id OR t1.name = t2.name;

OR in join conditions prevents hash partitioning because a row could match on id alone, name alone, or both — there is no single hash key.

Workaround — split into separate joins and UNION:

SELECT * FROM t1 JOIN t2 ON t1.id = t2.id
UNION
SELECT * FROM t1 JOIN t2 ON t1.name = t2.name;

Each individual join can now use BHJ or SMJ. The UNION deduplicates the results.

Join Conditions Deep Dive

ON vs WHERE for Outer Joins

For INNER joins, ON and WHERE are semantically equivalent — the optimizer treats them the same. For OUTER joins, they are critically different:

-- Filter in ON: non-matching rows still appear (with NULLs)
SELECT * FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id AND c.country = 'US';
-- Returns ALL orders. Non-US customers appear as NULL rows.

-- Filter in WHERE: effectively converts LEFT JOIN to INNER JOIN
SELECT * FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US';
-- Filters out rows where c.country IS NULL (non-matching rows).
-- LEFT JOIN becomes meaningless.

This is one of the most common bugs in SQL queries. The fix depends on intent: if you want only US-customer orders, use INNER JOIN explicitly. If you want all orders with US customer info when available, use the ON clause.

Single-Key vs Composite-Key Joins

  • Single-key: Simpler hash computation. For Long/Int keys, Spark uses the optimized LongHashedRelation which avoids boxing overhead
  • Composite-key: Uses UnsafeHashedRelation with BytesToBytesMap. Higher hash collision potential. Ensure all join columns have statistics collected
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, product_id;

Null-Safe Equal (<=>)

Standard equality (=) returns NULL when either operand is NULL — so NULL keys never match in joins. The null-safe equal operator (<=>) returns TRUE when both are NULL:

-- Standard: NULL = NULL → NULL (rows don't match)
SELECT * FROM t1 JOIN t2 ON t1.key = t2.key;

-- Null-safe: NULL <=> NULL → TRUE (rows match)
SELECT * FROM t1 JOIN t2 ON t1.key <=> t2.key;
# PySpark
result = df1.join(df2, df1["key"].eqNullSafe(df2["key"]))

Null-safe equal can be used as an equi-join key and supports BHJ, SHJ, and SMJ.

Filtering Before vs After Joins

Always filter as early as possible. Fewer rows entering the join means smaller shuffle, faster hash build, and less memory:

# Anti-pattern: filter after join
result = orders.join(customers, "customer_id").filter(col("country") == "US")

# Better: filter before join (Catalyst usually does this, but verify with EXPLAIN)
filtered_customers = customers.filter(col("country") == "US")
result = orders.join(filtered_customers, "customer_id")

Catalyst's PushDownPredicates rule usually pushes filters down through joins, but complex expressions, UDFs, and certain plan shapes can prevent pushdown. Always verify with EXPLAIN.

Reading Physical Plans

Knowing how to read a physical plan is essential for diagnosing join strategy problems.

EXPLAIN Variants

EXPLAIN select_statement;              -- Basic physical plan
EXPLAIN EXTENDED select_statement; -- Parsed, Analyzed, Optimized, and Physical
EXPLAIN FORMATTED select_statement; -- User-friendly two-section format
EXPLAIN CODEGEN select_statement; -- Generated Java code (whole-stage codegen)
EXPLAIN COST select_statement; -- With cost/statistics information

Identifying Join Strategy from the Plan

== Physical Plan ==
*(5) SortMergeJoin [customer_id#1], [customer_id#2], Inner
:- *(2) Sort [customer_id#1 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer_id#1, 200), ENSURE_REQUIREMENTS
: +- *(1) Filter isnotnull(customer_id#1)
: +- *(1) ColumnarToRow
: +- FileScan parquet [customer_id#1, amount#2] ...
+- *(4) Sort [customer_id#2 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer_id#2, 200), ENSURE_REQUIREMENTS
+- *(3) Filter isnotnull(customer_id#2)
+- *(3) ColumnarToRow
+- FileScan parquet [customer_id#2, name#3] ...

What to look for:

Pattern in PlanMeaning
BroadcastHashJoinBHJ — look for BroadcastExchange child
SortMergeJoinSMJ — look for Exchange + Sort children on both sides
ShuffledHashJoinSHJ — look for Exchange children but no Sort
BroadcastNestedLoopJoinBNLJ — look for BroadcastExchange child
CartesianProductFull cross join — every row x every row
Exchange hashpartitioning(...)Shuffle operation
BroadcastExchangeBroadcast distribution
*(N) prefixWhole-stage codegen stage ID

Reading direction: Bottom-to-top. Data flows upward from FileScan through Exchange, Sort, and into the Join node.

Key Spark UI SQL Tab Metrics

After running a query, the SQL tab shows the executed plan with actual metrics:

  • number of output rows: Total rows produced by each node
  • time in join: CPU time spent in the join operator
  • shuffle read/write: Bytes transferred during Exchange
  • broadcast time: Time to broadcast the build side (BHJ only)
  • spill size (disk): Indicates memory pressure during sort or hash operations
  • data size: Actual data size at each node (compare with estimated)

What to look for:

  1. Exchange nodes before joins — these are shuffles. No Exchange = SPJ or broadcast
  2. Estimated vs actual row counts — large discrepancies indicate bad statistics
  3. Spill metrics > 0 — indicates memory pressure, consider increasing executor memory or reducing partition sizes
  4. Single-task duration >> median task duration — indicates data skew

Runtime Bloom Filter Joins (Spark 3.3+)

Spark 3.3 introduced runtime Bloom filter optimization for shuffle joins. This is a separate optimization from the join strategy itself — it reduces the amount of data entering the join.

How It Works

When one side of a shuffle join has a selective filter, Spark can build a Bloom filter from the filtered side's join keys and push it down as a runtime filter to the other side's scan. This reduces the amount of data read and shuffled from the unfiltered side.

spark.sql.optimizer.runtime.bloomFilter.enabled=true               -- default false
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold=10GB -- max creation side
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems=1000000
spark.sql.optimizer.runtime.bloomFilter.maxNumBits=67108864 -- 8 MB max

When It Helps

Bloom filters are most effective in star-schema patterns where a filtered dimension table joins with a large fact table:

-- Without Bloom filter: scan all of fact_orders, then join
-- With Bloom filter: build filter from matching customer_ids,
-- push into fact_orders scan, skip non-matching rows early
SELECT o.*, c.name
FROM fact_orders o
JOIN dim_customers c ON o.customer_id = c.customer_id
WHERE c.country = 'US';

This complements Dynamic Partition Pruning (DPP) and can achieve up to 10x speedup for star-schema queries when DPP alone is not sufficient (e.g., the fact table is not partitioned on the join key).

Dynamic Partition Pruning (DPP)

DPP (enabled by default since Spark 3.0) pushes filter results from dimension tables into fact table scans at runtime.

spark.sql.optimizer.dynamicPartitionPruning.enabled=true  -- default true

How It Works

  1. Dimension table filter produces a set of partition values
  2. These values are pushed as a subquery into the fact table scan
  3. The fact table scan skips entire partitions (files) that do not match
  4. Only matching partitions are read and shuffled

Requirements

  • The fact table must be partitioned on the join key column
  • The dimension table must have a selective filter
  • Works best with star-schema patterns

DPP + AQE Interaction

DPP creates a subquery dependency that can interact with AQE's stage scheduling. In some cases, DPP's subquery must complete before the fact table scan starts, which can reduce parallelism. Spark 3.3+ improved this interaction.

Common Anti-Patterns

1. Missing Statistics — The Silent Killer

-- Without this, every size estimate is a guess
ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS;
ANALYZE TABLE customers COMPUTE STATISTICS FOR ALL COLUMNS;

Without ANALYZE TABLE, Spark uses file-size heuristics that can be 10x off for compressed columnar formats. A 200 MB Parquet file might contain 2 GB of data in memory — but the optimizer uses 200 MB, keeping the table under the broadcast threshold when it should not be broadcast.

2. Accidental Cartesian Products

-- Missing join condition produces N * M rows
SELECT * FROM orders, customers; -- 100M * 50K = 5 TRILLION rows

-- Spark 3.x allows this by default (spark.sql.crossJoin.enabled=true)
-- Detection: look for CartesianProduct in EXPLAIN output

Fix: Always verify join conditions exist. Set spark.sql.crossJoin.enabled=false during development to catch accidental cartesians at compile time.

3. Broadcasting Too-Large Tables

A table that is 100 MB on disk as Parquet can expand to 300 MB–1 GB in memory due to decompression and columnar-to-row conversion. The optimizer uses the on-disk size (100 MB) which is under the default 10 MB threshold if you increased it. But the driver needs to hold the full in-memory representation.

Fix: When increasing autoBroadcastJoinThreshold, account for the compression ratio. Use EXPLAIN COST to check estimated sizes. Monitor driver memory usage.

4. SortMergeJoin on Small Tables

Without statistics, Spark defaults to SMJ even for a 1 MB table joined with a 1 TB table — shuffling both tables across the cluster when a simple broadcast would eliminate the shuffle entirely.

Fix: Run ANALYZE TABLE, or use BROADCAST hint when you know a table is small.

5. Join Explosion (1:N:M)

When join keys are not unique, rows multiply:

-- orders has 100 rows per customer
-- returns has 50 rows per customer
-- Result: 100 * 50 = 5,000 rows per customer (not 100 or 50)
SELECT * FROM orders o
JOIN returns r ON o.customer_id = r.customer_id;

Multi-table cascading joins make this worse: 100 * 50 * 200 = 1,000,000 rows from 100 input rows.

Fix: Deduplicate or aggregate before joining. Use DISTINCT or GROUP BY on join keys when you only need unique matches. Check output row counts in Spark UI.

6. Joining on Skewed Keys

A single hot key (e.g., NULL, "unknown", 0, default values) causes one partition to hold 80%+ of data. One task runs for 45 minutes while 199 executors sit idle.

Fix: Enable AQE skew join (spark.sql.adaptive.skewJoin.enabled=true), or apply salting for severe skew. Filter out null keys before joining if they should not match.

7. Subqueries Instead of Joins

-- Correlated subquery — optimizer rewrites to LEFT SEMI join
SELECT * FROM orders WHERE customer_id IN (SELECT id FROM customers WHERE country = 'US');

-- Non-correlated subquery — may use inefficient BNLJ
SELECT * FROM orders WHERE amount > (SELECT AVG(amount) FROM orders);

Fix: Rewrite as explicit joins when possible. The optimizer handles correlated subqueries well but may struggle with non-correlated ones.

8. Filtering After Join Instead of Before

-- Anti-pattern: shuffle all data, then filter
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01' AND c.country = 'US';

Catalyst usually pushes predicates down, but verify with EXPLAIN. Complex predicates, UDFs, or certain DataFrame operations can prevent pushdown.

9. Not Using AQE

If you are on Spark 3.2+ and have not explicitly enabled AQE, verify it is on:

spark.sql.adaptive.enabled=true  -- default true since Spark 3.2

AQE's runtime broadcast conversion catches many cases where compile-time statistics are wrong. It is essentially free optimization.

Optimization Techniques

Pre-Filtering Before Joins

The single most effective optimization. Reduce row counts on both sides before the join:

# Filter both sides aggressively
recent_orders = orders.filter(col("order_date") >= "2024-01-01")
active_customers = customers.filter(col("active") == True)

result = recent_orders.join(broadcast(active_customers), "customer_id")

Bucketing for Shuffle-Free Joins

If two tables are bucketed on the same join key with the same number of buckets, Spark can skip the shuffle entirely.

Hive-style bucketing:

CREATE TABLE orders USING parquet
CLUSTERED BY (customer_id) INTO 256 BUCKETS;

CREATE TABLE customers USING parquet
CLUSTERED BY (customer_id) INTO 256 BUCKETS;

Iceberg SPJ (Storage Partitioned Joins):

CREATE TABLE orders (...) USING iceberg
PARTITIONED BY (bucket(256, customer_id));

CREATE TABLE customers (...) USING iceberg
PARTITIONED BY (bucket(256, customer_id));

Required configurations for Iceberg SPJ:

spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
spark.sql.requireAllClusterKeysForCoPartition=false

Verify with EXPLAIN — no Exchange nodes before the join confirms SPJ is working.

Salting for Skewed Joins

When a join key has extreme skew (e.g., one key has 80% of data), salting distributes the hot key across multiple partitions:

from pyspark.sql import functions as F

SALT_BUCKETS = 10

# Add random salt to the skewed (left) side
salted_left = left_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int"))

# Explode salt on the right side (replicate each row N times)
salted_right = right_df.crossJoin(
spark.range(SALT_BUCKETS).withColumnRenamed("id", "salt")
)

# Join on original key + salt
result = salted_left.join(salted_right, ["join_key", "salt"]).drop("salt")

Tradeoff: the right side is replicated SALT_BUCKETS times. Use only when the skewed side is significantly larger than the replicated side.

Semi Joins to Reduce Data

When you only need rows from one side that have matches in the other, use a semi join instead of a full join:

-- Instead of joining and then selecting only left columns:
SELECT o.* FROM orders o
INNER JOIN customers c ON o.customer_id = c.customer_id;

-- Use LEFT SEMI JOIN (only returns matching left rows, no duplication):
SELECT o.* FROM orders o
LEFT SEMI JOIN customers c ON o.customer_id = c.customer_id;

Semi joins avoid the row-multiplication problem of full joins and can use broadcast efficiently.

Denormalization to Avoid Joins

For frequently-joined tables, consider pre-joining at write time:

# At ETL time, flatten dimension data into fact table
enriched_orders = orders.join(
broadcast(customers.select("id", "name", "country")),
orders.customer_id == customers.id
)
enriched_orders.writeTo("catalog.db.enriched_orders").append()

Tradeoff: larger storage, stale dimension data, but eliminates runtime join entirely. Effective when dimension data changes infrequently.

Spark Version Join Improvements

Spark 3.0 (2020)

  • AQE with runtime broadcast conversion
  • AQE skew join handling (OptimizeSkewedJoin)
  • DemoteBroadcastHashJoin
  • Dynamic Partition Pruning (DPP)
  • New join hints: MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL

Spark 3.1 (2021)

  • FullOuter join support in ShuffledHashJoin (SPARK-32399)
  • ShuffledHashJoin improvements: better partitioning preservation, stream-side ordering
  • Code-gen for ShuffledHashJoin (SPARK-32421)

Spark 3.2 (2022)

  • AQE enabled by default
  • AQE SMJ-to-SHJ conversion via maxShuffledHashJoinLocalMapThreshold
  • Improved partition coalescing

Spark 3.3 (2022)

  • Runtime Bloom filter for joins
  • FullOuter ShuffledHashJoin codegen (SPARK-32567) — 10-20% speedup
  • Improved DPP + AQE interaction

Spark 3.4 (2023)

  • Storage Partitioned Joins (SPJ) improvements for V2 data sources including Iceberg
  • spark.sql.sources.v2.bucketing.enabled config
  • spark.sql.requireAllClusterKeysForCoPartition relaxed requirement

Spark 3.5 (2023)

  • Left-outer-build-left / Right-outer-build-right in SHJ (SPARK-36612)
  • Improved SPJ for more scenarios
  • Better AQE partition coalescing

Spark 4.0 (2025)

  • ANSI mode by default (impacts NULL handling in join conditions)
  • Collation support in joins (hash joins for queries with inline COLLATE)
  • Improved optimizer rules: removing unnecessary outer joins when join keys are unique
  • Stream-stream join improvements

Iceberg Integration

How Iceberg Statistics Feed Catalyst

Iceberg stores per-data-file metrics in manifest files: file size, record count, column sizes, value counts, null counts, lower bounds, upper bounds. Spark reads these during planning to estimate sizeInBytes for scan nodes.

NDV (Number of Distinct Values) via Puffin files:

ANALYZE TABLE catalog.db.orders COMPUTE STATISTICS FOR COLUMNS customer_id;

This triggers ComputeTableStatsSparkAction which computes Theta sketches for NDV per column, saved in Puffin format (Iceberg's statistics file format). When spark.sql.cbo.enabled=true, NDV is retrieved from the statistics file and used for join cardinality estimation — critical for CBO join reordering and strategy selection.

Storage Partitioned Joins (SPJ)

SPJ eliminates shuffle entirely by aligning table partitions on disk. When both tables are bucketed on the same join key with the same number of buckets, Spark joins corresponding partitions directly — bucket 0 from left joins bucket 0 from right.

Setup:

CREATE TABLE fact_orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2)
) USING iceberg
PARTITIONED BY (bucket(128, customer_id));

CREATE TABLE dim_customers (
customer_id BIGINT,
name STRING,
country STRING
) USING iceberg
PARTITIONED BY (bucket(128, customer_id));

Required configs:

spark.sql.sources.v2.bucketing.enabled=true
spark.sql.iceberg.planning.preserve-data-grouping=true
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
spark.sql.requireAllClusterKeysForCoPartition=false

Known limitations:

  • AQE can interfere with SPJ decisions — may need to disable AQE for SPJ
  • 3-way joins can fail (Iceberg Issue #15015)
  • Both tables must be Iceberg V2 data source

Partition Pruning Reducing Join Input

Iceberg's manifest-level min/max statistics enable file pruning beyond partition-level filtering. Combined with DPP, this drastically reduces the data entering joins — potentially making tables small enough for broadcast that would otherwise require SMJ.

Real-World Decision Framework

Decision Tree

Two tables need to join.
|
├── Is one side < 10 MB (or your tuned threshold)?
│ YES → BroadcastHashJoin (automatic)
│ NO → continue

├── Is one side < 1 GB and driver has 4+ GB heap?
│ YES → Consider raising autoBroadcastJoinThreshold
│ Account for compression ratio (Parquet 2-10x expansion)
│ NO → continue

├── Is the join equi-join?
│ NO → Can one side be broadcast?
│ │ YES → BroadcastNestedLoopJoin
│ │ NO → Rewrite: binning for range, OR → UNION
│ │ If InnerLike: CartesianProduct (last resort)
│ YES → continue

├── Are both tables bucketed on the join key (same N buckets)?
│ YES → Storage Partitioned Join (zero shuffle)
│ NO → continue

├── Is one side 3x+ smaller and fits in per-partition memory?
│ YES → Consider ShuffledHashJoin (SHUFFLE_HASH hint)
│ NO → SortMergeJoin (default, safest for large-large)

├── Is there key skew?
│ YES → Enable AQE skew join or apply salting
│ NO → default should work

└── Are runtime sizes different from compile-time estimates?
YES → Enable AQE for runtime optimization
NO → Verify with EXPLAIN and Spark UI

When to Override the Optimizer

  1. Statistics are wrong or missing — use BROADCAST hint when you know one side is small
  2. AQE overhead unacceptable — for latency-sensitive queries, use explicit hints
  3. SPJ not being selected — verify configs, check EXPLAIN for Exchange nodes
  4. Shuffle is the bottleneck — consider broadcast for slightly-larger-than-threshold tables
  5. Driver memory is limited — use MERGE hint to force SMJ and avoid broadcast

Cost Analysis by Strategy

StrategyShuffle CostCPU CostMemory CostFailure Risk
BHJNone (broadcast only)O(N) probeO(M) per executorDriver OOM if M too large
SMJO(N+M) both sidesO(N log N + M log M) sortSort buffers, spill-safeSlow for small M
SHJO(N+M) both sidesO(N+M) build + probeO(M/P) per partitionOOM if partition large
BNLJNone (broadcast only)O(N * M) comparisonsO(M) per executorSlow for large M
CartesianProductO(N * M)O(N * M)O(N * M) outputCatastrophic

Where N = larger table, M = smaller table, P = number of partitions.

How Cazpian Uses Join Optimization

Cazpian's compute pools run Apache Spark against Iceberg tables on S3. The join optimization strategies in this guide are directly applicable:

  • Automatic statistics. Cazpian's catalog integration means Iceberg manifest-level statistics are always available to the Catalyst optimizer. This prevents the "missing statistics" anti-pattern that causes most join strategy mistakes
  • SPJ-ready architecture. Cazpian tables can be bucketed with PARTITIONED BY (bucket(N, column)) to enable Storage Partitioned Joins, eliminating shuffle for frequently-joined table pairs
  • AQE enabled by default. Cazpian compute pools run with AQE enabled, providing runtime broadcast conversion, skew join handling, and partition coalescing out of the box
  • Compute pool sizing. Understanding join memory requirements — driver memory for broadcast, executor memory for hash tables, sort buffers for SMJ — directly informs Cazpian compute pool configuration
  • Query federation. When Cazpian queries JDBC sources alongside Iceberg tables, join strategy becomes even more critical. Broadcasting a small JDBC lookup table against a large Iceberg fact table avoids shuffling data that was already expensive to read

Complete Configuration Reference

Join Strategy Selection

ConfigurationDefaultDescription
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)Max size for automatic broadcast. Set to -1 to disable
spark.sql.join.preferSortMergeJointruePrefer SMJ over SHJ when both are eligible
spark.sql.broadcastTimeout300 (seconds)Timeout for broadcast operations
spark.broadcast.blockSize4mTorrentBroadcast chunk size
spark.broadcast.compresstrueCompress broadcast data with LZ4
spark.sql.shuffle.partitions200Number of shuffle partitions
spark.sql.crossJoin.enabledtrue (3.x+)Allow cartesian products

Adaptive Query Execution

ConfigurationDefaultDescription
spark.sql.adaptive.enabledtrue (3.2+)Enable AQE
spark.sql.adaptive.autoBroadcastJoinThresholdsame as autoBroadcastJoinThresholdAQE runtime broadcast threshold
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold0 (disabled)AQE SMJ-to-SHJ conversion threshold
spark.sql.adaptive.skewJoin.enabledtrueAQE skew join handling
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MBMin size to consider skewed
spark.sql.adaptive.skewJoin.skewedPartitionFactor5Factor vs median for skew detection
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2DemoteBroadcastHashJoin threshold

Cost-Based Optimizer

ConfigurationDefaultDescription
spark.sql.cbo.enabledfalseEnable CBO
spark.sql.cbo.joinReorder.enabledfalseEnable join reordering
spark.sql.cbo.joinReorder.dp.threshold12Max tables for DP reorder algorithm
spark.sql.cbo.joinReorder.card.weight0.7Cardinality weight in cost formula
spark.sql.cbo.starSchemaDetectionfalseStar schema heuristic
spark.sql.statistics.histogram.enabledfalseColumn histograms for selectivity
spark.sql.statistics.histogram.numBins254Histogram bins

Runtime Filters

ConfigurationDefaultDescription
spark.sql.optimizer.dynamicPartitionPruning.enabledtrueDynamic Partition Pruning
spark.sql.optimizer.runtime.bloomFilter.enabledfalseRuntime Bloom filter
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold10GBMax creation side for Bloom filter
spark.sql.optimizer.runtime.bloomFilter.maxNumBits67108864 (8 MB)Max Bloom filter size

SortMergeJoin Internals

ConfigurationDefaultDescription
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold2^30 (~1B rows)SMJ in-memory buffer rows
spark.sql.sortMergeJoinExec.buffer.spill.threshold2^30 (~1B rows)SMJ spill threshold rows

Storage Partitioned Joins (Iceberg)

ConfigurationDefaultDescription
spark.sql.sources.v2.bucketing.enabledfalseEnable SPJ for V2 sources
spark.sql.iceberg.planning.preserve-data-groupingfalsePreserve Iceberg data grouping
spark.sql.sources.v2.bucketing.pushPartValues.enabledfalsePush partition values for SPJ
spark.sql.requireAllClusterKeysForCoPartitiontrueRequire all cluster keys to match

Summary

The join strategy is the most consequential decision in a Spark SQL query plan. Getting it right means the difference between queries that finish in seconds and queries that run for hours while burning compute resources.

The critical takeaways:

  1. Run ANALYZE TABLE. Missing statistics is the root cause of most join strategy mistakes. The optimizer cannot choose the right strategy if it does not know the table sizes
  2. Understand the five strategies. BHJ eliminates shuffle for small-large joins. SMJ handles any size through spilling. SHJ avoids sort overhead. BNLJ handles non-equi joins. CartesianProduct is the last resort
  3. Use hints when statistics fail. BROADCAST, MERGE, and SHUFFLE_HASH hints override the optimizer's choice when you know better
  4. Enable AQE. Runtime optimization catches cases where compile-time statistics are wrong. It is the single most impactful "set and forget" optimization
  5. Read your plans. EXPLAIN and the Spark UI SQL tab reveal exactly what strategy Spark chose and why. Check for Exchange nodes, compare estimated vs actual row counts, and look for spill metrics
  6. Bucket for frequently-joined tables. Iceberg SPJ eliminates shuffle entirely for co-bucketed tables — the biggest free performance win for repeated join patterns
  7. Filter early, join late. Every row filtered before a join is a row that does not shuffle, sort, hash, or compare. Pre-filtering is the simplest and most effective optimization

The next time a Spark job runs slowly, check the join strategy first. It is almost always where the time goes.