Skip to main content

Spark Broadcast Joins: The Complete Guide for Iceberg and Cazpian

· 27 min read
Cazpian Engineering
Platform Engineering Team

Spark Broadcast Joins: The Complete Guide for Iceberg and Cazpian

Your 500-node Spark cluster is shuffling a 2 TB fact table across the network — serializing every row, hashing it, writing it to disk, sending it over the wire, and deserializing it on the other side — just to join it with a 50 MB dimension table. Every single query. Every single day.

The shuffle is the most expensive operation in distributed computing. It consumes network bandwidth, disk I/O, CPU cycles, and memory on every executor in the cluster. And for joins where one side is small, it is completely unnecessary.

Broadcast join eliminates the shuffle entirely. Instead of redistributing both tables across the cluster, Spark sends the small table to every executor, where it is stored as an in-memory hash map. Each executor then joins its local partitions of the large table against this hash map — no network shuffle, no disk spill, no cross-executor coordination. The result is typically a 5-20x speedup over Sort-Merge Join for eligible queries.

This post goes deep. We cover exactly what happens inside a broadcast join, all five ways to trigger one, how AQE converts joins at runtime, the real memory math that catches people off guard, when broadcast hurts instead of helps, and how to monitor and debug broadcast behavior in production.

What Exactly Happens Inside a Broadcast Join

Understanding the internal mechanics is critical because most broadcast join failures — OOM errors, timeouts, unexpected slowness — trace back to a misunderstanding of what the driver and executors are actually doing.

Detailed diagram of Spark broadcast join internals showing the five-phase execution flow from driver collect to TorrentBroadcast distribution and local hash join on executors

Step-by-Step Execution Flow

Phase 1: Collect to the Driver. The smaller dataset (the "build side") is materialized and collected back to the Spark driver. This is effectively a collect() operation — every partition of the small DataFrame is pulled from the executors to the driver process. The entire small table must fit in driver memory.

Phase 2: Build a Hash Table. The driver constructs a HashedRelation — an in-memory hash map keyed on the join columns. This is the data structure that executors will use for lookups during the join.

Phase 3: Serialize and Chunk. Spark uses TorrentBroadcast, a BitTorrent-like protocol, to distribute the data. The driver serializes the HashedRelation and divides it into chunks of spark.broadcast.blockSize (default 4 MB). These chunks are stored in the driver's BlockManager and compressed with LZ4 by default (spark.broadcast.compress=true).

Phase 4: BitTorrent-Like Distribution. Executors request the broadcast variable. Each executor checks its local BlockManager first, then fetches missing chunks from the driver or from other executors that already have them. This creates an exponential fan-out: Executor A fetches from the driver, Executor B fetches from A, Executor C fetches from B. Instead of the driver sending N copies to N executors, the load is distributed across the cluster.

Phase 5: Local Hash Join. Once all chunks arrive, each executor deserializes the HashedRelation. It then iterates through its local partitions of the large table (the "probe side") and looks up each row's join key in the hash map. No shuffle of the large table occurs — each executor processes only its own data.

The Data Flow

Small Table (build side)
|
v
[Executor Partitions] --collect()--> [Driver Memory]
|
v
[Driver: Build HashedRelation (hash map on join keys)]
|
v
[Driver: Serialize + Chunk into 4 MB blocks (LZ4 compressed)]
|
v
[Driver BlockManager]
|
v (BitTorrent-like P2P distribution)
[Executor 1] <---> [Executor 2] <---> [Executor N]
| | |
v v v
[Deserialize [Deserialize [Deserialize
HashedRelation] HashedRelation] HashedRelation]
| | |
v v v
[Hash join with [Hash join with [Hash join with
local large-table local large-table local large-table
partitions] partitions] partitions]

Why the Driver Needs 2x Memory

During the broadcast phase, the driver holds two representations of the small table simultaneously:

  1. The deserialized HashedRelation (Java objects in heap memory)
  2. The serialized and chunked representation in the BlockManager

This means a 200 MB broadcast table requires roughly 400 MB of driver heap during the broadcast phase. The deserialized copy is released after serialization completes, but the peak memory usage includes both.

How Executors Store the Broadcast

Each executor stores the broadcast variable in its BlockManager with MEMORY_AND_DISK storage level. The individual chunks use MEMORY_AND_DISK_SER (serialized). If executor memory is tight, chunks can spill to disk — but this significantly degrades join performance since every probe-side row requires a hash lookup against the broadcast table.

All Five Ways to Trigger a Broadcast Join

Spark decides whether to use broadcast join through multiple mechanisms. Understanding all of them — and their priority order — is essential for controlling join behavior.

1. Automatic Threshold: autoBroadcastJoinThreshold

spark.sql.autoBroadcastJoinThreshold=10485760  # 10 MB (default)

Spark's JoinSelection strategy checks the estimated size of each side of a join. If either side's estimated size is at or below this threshold, Spark automatically plans a broadcast hash join.

# Check the current threshold
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

# Increase to 200 MB for larger dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "209715200")

# Disable automatic broadcast entirely
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Key details:

  • Set to -1 to disable automatic broadcast joins entirely.
  • The maximum effective value is constrained by the 8 GB hard limit on broadcast data.
  • The decision is based on estimated size (file size on disk, catalog statistics, or heuristics) — not the actual in-memory size. This distinction causes most broadcast OOM failures.

2. SQL Broadcast Hints

Spark accepts three equivalent hint keywords:

-- All three are equivalent
SELECT /*+ BROADCAST(dim_customer) */ *
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id;

SELECT /*+ BROADCASTJOIN(dim_customer) */ *
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id;

SELECT /*+ MAPJOIN(dim_customer) */ *
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id;

Hints override the autoBroadcastJoinThreshold. When you use a BROADCAST hint, Spark will broadcast the specified table even if its estimated size exceeds the threshold. However, the 8 GB hard limit still applies.

You can also broadcast multiple tables in a single query:

SELECT /*+ BROADCAST(dim_customer), BROADCAST(dim_product) */ *
FROM fact_orders o
JOIN dim_customer c ON o.customer_id = c.customer_id
JOIN dim_product p ON o.product_id = p.product_id;

3. DataFrame API: broadcast(df)

The programmatic equivalent of SQL hints:

from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "customer_id")
import org.apache.spark.sql.functions.broadcast

val result = largeDF.join(broadcast(smallDF), "customer_id")

This marks the DataFrame so the Catalyst optimizer prioritizes broadcast join regardless of the threshold setting. Use this when you know at code-writing time that one side will be small.

4. Statistics-Based Decisions

Spark obtains table size statistics from multiple sources and uses them to make automatic broadcast decisions:

SourceHow Spark gets sizeAccuracy
Parquet/ORC filesFile metadata (file sizes on disk)Moderate — compressed size, not in-memory
Hive Metastore tablesANALYZE TABLE statisticsGood if stats are fresh
Cached DataFramesExact in-memory sizeExact
After transformationsSelectivity estimates from filters/aggregationsPoor to moderate
Iceberg tablesTable metadata (snapshot summaries, file sizes)Moderate

The critical gap: statistics reflect compressed, columnar, on-disk size. The in-memory representation after decompression and conversion to row format can be 3-10x larger. A table that appears to be 50 MB on disk might consume 300 MB in memory.

-- Collect statistics for more accurate broadcast decisions
ANALYZE TABLE my_catalog.my_db.dim_customer COMPUTE STATISTICS;
ANALYZE TABLE my_catalog.my_db.dim_customer COMPUTE STATISTICS FOR COLUMNS customer_id, region;

5. Cost-Based Optimizer (CBO)

The CBO uses table and column statistics for join optimization:

spark.sql.cbo.enabled=true           # Default true since Spark 3.x
spark.sql.cbo.joinReorder.enabled=false # Enable for multi-way join reordering

With CBO enabled and column statistics collected, Spark can:

  • Estimate row counts after filters and joins more accurately
  • Select the correct build side for hash joins
  • Choose between broadcast and shuffle based on estimated output sizes
  • Reorder multi-way joins for optimal execution

Without column statistics, the CBO falls back to heuristic-based estimates that are often wildly inaccurate — especially after filters, aggregations, and subqueries.

Spark's JoinSelection Priority Order

When multiple mechanisms apply, Spark evaluates them in this order:

  1. User-provided hintsBROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL
  2. Broadcast Hash Join — if either side is below autoBroadcastJoinThreshold
  3. Shuffle Hash Join — if spark.sql.join.preferSortMergeJoin=false and one side is significantly smaller
  4. Sort-Merge Join — the default fallback for equi-joins
  5. Broadcast Nested Loop Join — for non-equi joins when one side is small
  6. Cartesian Product — last resort for cross joins

Hints always win. If you specify /*+ BROADCAST(t) */, Spark will broadcast t regardless of its size (up to the 8 GB limit).

Adaptive Broadcast: AQE Runtime Join Conversion

Adaptive Query Execution (AQE) is one of the most powerful features in modern Spark. It can convert a Sort-Merge Join to a Broadcast Hash Join at runtime, after actual data sizes are known.

How It Works

  1. Spark initially plans a Sort-Merge Join because compile-time estimates suggest both sides are large.
  2. A shuffle stage completes for one or both sides of the join.
  3. AQE examines the actual shuffle output size — real bytes, not estimates.
  4. If one side's actual size is below autoBroadcastJoinThreshold, AQE re-plans the downstream join as a Broadcast Hash Join.
  5. Spark uses a local shuffle reader to read the already-shuffled data without additional network I/O.

Runtime vs Compile-Time Statistics

You can see the difference in the Spark SQL UI:

-- Compile-time estimate (often wrong):
Statistics(sizeInBytes=2.1 GiB, rowCount=45000000, isRuntime=false)

-- After stage execution (actual):
Statistics(sizeInBytes=8.2 MiB, rowCount=28100, isRuntime=true)

The isRuntime=true flag indicates Spark is using measured statistics. This is particularly valuable when:

  • Filters are highly selective (e.g., WHERE status = 'ACTIVE' on a table where only 0.1% of rows match)
  • Aggregations significantly reduce data volume
  • The CBO lacks column statistics
  • Complex subqueries make compile-time estimation unreliable

AQE Broadcast Is Less Efficient Than Planned Broadcast

There is an important caveat: AQE-converted broadcast joins are less efficient than broadcast joins planned from the start. By the time AQE intervenes, the shuffle has already happened — the data has been redistributed across the network. AQE avoids the sort step and uses local readers, but the shuffle cost is already paid.

If you know a table will always be small, use a hint or increase the threshold so Spark plans the broadcast upfront.

Key AQE Configurations

# Enable AQE (default true since Spark 3.2)
spark.sql.adaptive.enabled=true

# AQE uses the same threshold as compile-time broadcast decisions
spark.sql.autoBroadcastJoinThreshold=10485760

# Enable local shuffle reader for AQE-converted broadcasts
spark.sql.adaptive.localShuffleReader.enabled=true

Real-World AQE Scenario

Consider this query:

SELECT o.order_id, o.amount, u.name
FROM orders o
JOIN (
SELECT user_id, name
FROM users
WHERE region = 'US' AND status = 'ACTIVE' AND last_login > '2026-01-01'
) u ON o.user_id = u.user_id;

At compile time, Spark estimates the users subquery will return 50 million rows (the full table size, since it cannot accurately estimate the combined filter selectivity). It plans a Sort-Merge Join.

At runtime, the subquery actually returns only 25,000 rows (2 MB). AQE detects this after the shuffle stage completes and converts the join to a Broadcast Hash Join — avoiding the sort on both sides and eliminating the large-side shuffle for the remaining stages.

The Real Cost: Driver Memory, Network, and Executor Resources

Most broadcast join failures trace back to memory. Understanding the actual resource requirements — not just the documented thresholds — prevents production incidents.

The Size Amplification Problem

This is the single most important concept in broadcast join tuning:

RepresentationTypical SizeWhy
Parquet on disk (compressed, columnar)1x baseline (e.g., 100 MB)Snappy/Zstd compression + columnar encoding
Serialized for broadcast (row-based, compressed)2-4x disk size (200-400 MB)Converted from columnar to row format, LZ4 compressed
Deserialized in-memory (Java objects)3-10x disk size (300 MB - 1 GB)Java object overhead, no compression, row-based

Spark's auto-broadcast decision is based on the estimated disk/file size. But the actual in-memory representation can be an order of magnitude larger. A dimension table that is 100 MB on disk might consume 800 MB in driver memory.

This is why setting autoBroadcastJoinThreshold=8g is dangerous even though it is technically allowed. A 2 GB Parquet table could expand to 10+ GB in memory, crashing the driver.

Driver Memory Math

The driver needs memory for:

ComponentMemory Required
Collected data from executors1x the serialized data size
HashedRelation construction1x the deserialized data size (larger)
Serialized + chunked representation1x the serialized data size
Driver overhead (GC, framework)20-30% of heap

Rule of thumb: the driver needs at least 3x the on-disk size of the broadcast table as available heap. For a 200 MB Parquet table, allocate at least 600 MB of free driver memory.

spark.driver.maxResultSize

spark.driver.maxResultSize=1g  # Default

This limits the total size of serialized results collected to the driver. It is a safety valve — if the broadcast data exceeds this, Spark fails fast instead of risking an OOM:

SparkException: Job aborted due to stage failure:
Total size of serialized results of 200 tasks is bigger than
spark.driver.maxResultSize (1g)

Increase this when broadcasting larger tables:

spark.conf.set("spark.driver.maxResultSize", "4g")

Network Cost

While broadcast avoids shuffling the large table, it still transfers the small table to every executor. For a cluster with 100 executors and a 500 MB broadcast table:

  • Without P2P: 500 MB x 100 = 50 GB from driver (driver bottleneck)
  • With TorrentBroadcast (default): Load is distributed. The driver sends each chunk once, and executors share chunks with each other. Total bytes transferred is still ~50 GB, but no single node is a bottleneck.

Executor Memory Pressure

Every executor stores a full copy of the broadcast hash table in memory. If you broadcast a 500 MB table to a cluster where each executor has 4 GB heap:

  • 500 MB / 4 GB = 12.5% of each executor's memory consumed by the broadcast alone
  • This reduces memory available for task execution, caching, shuffle buffers, and other broadcast variables
  • If the query has multiple broadcast joins, the pressure compounds

The SPARK-22170 Issue

There is a known issue (SPARK-22170) where BroadcastExchangeExec holds an extra copy of rows in driver memory during construction. Each row is copied to a new byte buffer and added to an ArrayBuffer, creating significant allocation overhead. A 16 MB Parquet table was observed using 100 MB more memory than expected due to this intermediate buffering.

Downsides and Pitfalls — When Broadcast Goes Wrong

Broadcast join is not a universal optimization. Used incorrectly, it causes failures that are worse than the shuffle it was meant to avoid.

Driver OOM — The #1 Killer

The entire broadcast table must be collected to the driver, built into a hash table, serialized, and chunked. If any of these steps exceeds available driver memory, the job crashes:

java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec

This is especially dangerous when:

  • Statistics underestimate the actual data size (stale ANALYZE TABLE results)
  • Parquet compression ratios are high (small on disk, large in memory)
  • Multiple broadcast joins execute in the same query, each consuming driver memory
  • The query has a subquery that looks small but produces large output

Broadcast Timeout

spark.sql.broadcastTimeout=300  # 5 minutes (default)

If the broadcast does not complete within this timeout:

SparkException: Could not execute broadcast in 300 secs.
You can increase the timeout for broadcasts via spark.sql.broadcastTimeout
or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

Common causes:

  • The broadcast data is large and serialization/distribution takes time
  • The cluster is under heavy load and executors are slow to respond to chunk requests
  • A preceding stage (e.g., a shuffle that feeds the broadcast subquery) is itself slow
  • GC pressure on the driver during serialization

Fix: Increase the timeout for legitimate large broadcasts, or disable broadcast if the table is too large:

# Increase timeout to 10 minutes
spark.conf.set("spark.sql.broadcastTimeout", "600")

# Or disable broadcast for this query
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

The 8 GB Hard Limit and 512 Million Record Limit

Spark enforces absolute limits on broadcast data:

SparkException: Cannot broadcast the table that is larger than 8GB: 10 GB

Beyond the size limit, there is also a limit of approximately 512 million records in a single broadcast variable (due to Java array indexing constraints). These limits cannot be overridden — if your table exceeds them, broadcast is not an option.

Stale or Missing Statistics

When Spark's size estimates are wrong, broadcast decisions go wrong:

  • Overestimate: Spark thinks a 5 MB table is 500 MB → uses Sort-Merge instead of broadcast → unnecessary shuffle
  • Underestimate: Spark thinks a 2 GB table is 50 MB → broadcasts it → driver OOM

This is particularly common with:

  • Tables that have never had ANALYZE TABLE run
  • Filtered subqueries where selectivity is hard to estimate
  • Tables with high compression ratios (Zstd-compressed Parquet)

Multiple Broadcasts in One Query

When a query joins a fact table with several dimension tables, all broadcast tables are held in memory simultaneously:

SELECT /*+ BROADCAST(c), BROADCAST(p), BROADCAST(d), BROADCAST(r) */
f.*, c.name, p.product_name, d.date_name, r.region_name
FROM fact_sales f
JOIN dim_customer c ON f.customer_id = c.id
JOIN dim_product p ON f.product_id = p.id
JOIN dim_date d ON f.date_id = d.id
JOIN dim_region r ON f.region_id = r.id;

If each dimension table is 100 MB in memory, the driver and every executor must hold 400 MB of broadcast data. Plan your memory budget for the sum of all broadcasts, not individual tables.

Not Suitable for Two Large Tables

This is obvious but worth stating: broadcast join requires one side to be small. Joining two 500 GB tables via broadcast will crash the driver instantly. Use Sort-Merge Join for large-large joins.

Broadcast Join vs Sort-Merge Join vs Shuffle Hash Join

Spark has three primary join strategies. Choosing the right one depends on data sizes, memory, and query patterns.

Comparison

AspectBroadcast Hash JoinSort-Merge JoinShuffle Hash Join
Shuffle requiredNone (broadcasts small side)Both sides shuffledBoth sides shuffled
Sort requiredNoneBoth sides sorted after shuffleNone
Size constraintOne side must be smallBoth sides can be any sizeOne side should be moderate per partition
Memory modelSmall side must fit in memory (driver + each executor)Can spill to diskBuild side per partition must fit in memory
Equi-join onlyYesYesYes
Default strategyWhen one side < 10 MBWhen both sides are largeMust be explicitly preferred
Disk spillBroadcast side: no. Probe side: yesYes (both sides)Build side: no. Probe side: yes
Best forSmall-large joins (dimension + fact)Large-large joinsMedium-large joins without sort

When Each Strategy Wins

Broadcast Hash Join:

  • One side is definitively small (up to ~1-2 GB with proper memory tuning)
  • Star schema queries (fact + dimension tables)
  • Lookup/reference table enrichment
  • Post-filter subqueries that produce small results

Sort-Merge Join:

  • Both sides are large (hundreds of GB or more)
  • Memory is constrained (gracefully spills to disk)
  • The most robust and predictable strategy
  • Default when no table is small enough to broadcast

Shuffle Hash Join:

  • Both sides need shuffling, but one side is moderately small per partition
  • Data is uniformly distributed (no skew)
  • Must set spark.sql.join.preferSortMergeJoin=false
  • Avoids the sort step, saving CPU

Performance Impact

The dominant factor is shuffle elimination. Consider joining a 1 TB fact table with a 50 MB dimension table:

  • Sort-Merge Join: Shuffles all 1 TB across the network, sorts both sides, then merges. Shuffle alone can take 10-30 minutes depending on cluster size and network.
  • Broadcast Hash Join: Broadcasts 50 MB to each executor (seconds), then performs local hash lookups. No shuffle of the 1 TB table.

Real-world benchmarks consistently show 5-20x speedup for broadcast joins over Sort-Merge joins in suitable scenarios. The exact improvement depends on cluster size, network bandwidth, data distribution, and the ratio of broadcast table size to shuffle cost.

Real-World Optimization Patterns

Pattern 1: Star Schema — Broadcasting Dimension Tables

The classic and most impactful use case. In a star schema, the fact table (orders, transactions, events) is massive while dimension tables (customers, products, regions, dates) are small:

SELECT /*+ BROADCAST(c), BROADCAST(p) */
f.order_id,
f.amount,
c.customer_name,
c.segment,
p.product_name,
p.category
FROM fact_orders f
JOIN dim_customer c ON f.customer_id = c.customer_id
JOIN dim_product p ON f.product_id = p.product_id
WHERE f.order_date BETWEEN '2026-01-01' AND '2026-01-31';

This eliminates two shuffle stages on the fact table. If fact_orders has 2 billion rows, you just saved shuffling 2 billion rows — twice.

Spark also has built-in star schema detection:

spark.sql.starSchemaDetection=true  # Default true since Spark 2.2

When enabled, Spark can automatically reorder joins in star schemas so that dimension table joins (broadcast candidates) execute first, reducing intermediate data size.

Pattern 2: Lookup and Reference Table Enrichment

Small reference tables are ideal broadcast candidates — they rarely exceed a few MB and change infrequently:

# Country code mapping — ~50 KB
country_mapping = spark.table("ref_country_codes")
enriched = events.join(broadcast(country_mapping), "country_code")

# Status code lookup — ~10 KB
status_lookup = spark.table("ref_status_codes")
enriched = enriched.join(broadcast(status_lookup), "status_code")

Pattern 3: Filtered Subquery Broadcast

A large table filtered down to a small result is an excellent broadcast candidate. This is where AQE shines, but you can also use explicit hints:

SELECT /*+ BROADCAST(active_users) */
e.event_id,
e.event_type,
u.user_name,
u.account_tier
FROM events e
JOIN (
SELECT user_id, user_name, account_tier
FROM users
WHERE status = 'ACTIVE'
AND region = 'US'
AND last_login > '2026-01-01'
-- Filters 1 billion rows down to 50,000
) active_users ON e.user_id = active_users.user_id;

Without the hint, Spark might estimate the subquery as 10 GB (the full table size) and choose Sort-Merge Join. With the hint — or with AQE detecting the actual 2 MB output — broadcast is used instead.

Pattern 4: Increasing the Threshold Strategically

The default 10 MB threshold is conservative. Many production workloads benefit from increasing it:

# Increase to 200 MB — suitable for most dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "209715200")

# Also increase driver memory and max result size to match
# (set these in spark-defaults.conf or cluster config)
# spark.driver.memory=8g
# spark.driver.maxResultSize=4g

Guidelines for choosing a threshold:

ThresholdDriver Memory NeededUse Case
10 MB (default)1 GBConservative, small lookup tables only
50 MB2 GBSmall dimension tables
200 MB4 GBMedium dimension tables
500 MB8 GBLarge dimension tables (careful tuning required)
1 GB+16 GB+Only with thorough testing and memory monitoring

Remember the 3-10x amplification factor: a 200 MB threshold means Spark will broadcast tables up to 200 MB on disk, which could be 600 MB - 2 GB in memory.

Pattern 5: Broadcast with Pre-Aggregated Data

Aggregate before broadcasting to reduce the broadcast table size:

SELECT /*+ BROADCAST(daily_totals) */
o.order_id,
o.amount,
dt.daily_total,
o.amount / dt.daily_total AS pct_of_daily
FROM orders o
JOIN (
SELECT customer_id, DATE(order_date) AS order_day, SUM(amount) AS daily_total
FROM orders
GROUP BY customer_id, DATE(order_date)
-- Aggregation reduces millions of rows to thousands
) daily_totals dt
ON o.customer_id = dt.customer_id
AND DATE(o.order_date) = dt.order_day;

Advanced Topics

Broadcast Nested Loop Join (BNLJ)

When a join condition is non-equi (ranges, BETWEEN, inequality comparisons), Spark cannot use hash-based joins. It falls back to Broadcast Nested Loop Join:

SELECT *
FROM events e
JOIN time_ranges t
ON e.event_time BETWEEN t.start_time AND t.end_time;

BNLJ broadcasts one side and performs a nested loop on each executor: for every row in the local partition, it scans every row of the broadcast side to check the join condition. Time complexity is O(N x M) — it is expensive but unavoidable for non-equi joins.

You can control which side is broadcast:

-- Force broadcast on the smaller side
SELECT /*+ BROADCAST(t) */ *
FROM events e
JOIN time_ranges t
ON e.event_time BETWEEN t.start_time AND t.end_time;

Broadcast Exchange Reuse

When the same table appears in multiple joins within a single query, Spark can reuse the broadcast:

SELECT *
FROM orders o
JOIN products p1 ON o.product_id = p1.id
JOIN products p2 ON o.alt_product_id = p2.id;

Spark's ReuseExchange optimization (spark.sql.exchange.reuse=true, default) detects identical BroadcastExchangeExec nodes and reuses the broadcast variable rather than broadcasting the same table twice. This saves driver memory, network bandwidth, and executor storage.

Interaction with Bucketing and Partitioning

Broadcast supersedes bucketing. If a table is bucketed on the join key and small enough for broadcast, Spark may choose broadcast over the bucket join (no-shuffle join). To force bucket join behavior:

spark.sql.autoBroadcastJoinThreshold=-1

Broadcast with partitioned tables works normally. The small table is broadcast regardless of its partitioning. The large table's partitions are read in place without shuffle.

Interaction with Iceberg's Storage Partitioned Joins (SPJ)

If you have Iceberg tables set up for Storage Partitioned Joins, broadcast and SPJ are competing strategies:

  • Broadcast eliminates the shuffle by sending the small table everywhere. Requires one side to be small.
  • SPJ eliminates the shuffle by recognizing that data is already co-located on disk. Works for large-large joins if both sides are bucketed identically.

When both are possible (small table + compatible bucketing), Spark typically prefers broadcast because it is simpler and usually faster for small tables. If you want to test SPJ instead, disable broadcast:

spark.sql.autoBroadcastJoinThreshold=-1

Iceberg-Specific Broadcast Behavior

Iceberg has a specific optimization for broadcast. When a table is broadcast, Spark's SizeEstimator normally traverses the serialized object to compute its size — which can be expensive, especially with remote file systems like S3FileIO. Iceberg's SerializableTable subclass reports a fixed size of 32 KB, bypassing the expensive size estimation entirely (Iceberg PR #5225).

For Iceberg tables, be aware that:

  • Table size statistics come from Iceberg metadata (snapshot summaries, file sizes). If metadata is stale, broadcast decisions may be wrong.
  • Iceberg's Parquet files with high compression ratios can amplify significantly in memory.
  • Use explicit broadcast hints rather than relying solely on auto-broadcast when you know a table is small.

Complete Configuration Reference

Primary Broadcast Settings

ConfigurationDefaultDescription
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)Maximum table size (bytes) for auto-broadcast. Set to -1 to disable. Max 8 GB.
spark.sql.broadcastTimeout300 (seconds)Timeout for broadcast operations. Set to -1 for no timeout.
spark.broadcast.blockSize4m (4 MB)Chunk size for TorrentBroadcast distribution.
spark.broadcast.compresstrueCompress broadcast data before sending (uses LZ4 by default).
spark.broadcast.checksumtrueInclude checksum verification in broadcast data.
spark.io.compression.codeclz4Compression codec used when spark.broadcast.compress=true.

Driver and Memory Settings

ConfigurationDefaultDescription
spark.driver.memory1gDriver JVM heap size. Must accommodate all broadcast data.
spark.driver.maxResultSize1gMax total size of serialized results collected to driver.
spark.driver.memoryOverheaddriverMemory * 0.10 (min 384 MB)Off-heap memory for the driver.

AQE Settings

ConfigurationDefaultDescription
spark.sql.adaptive.enabledtrue (since Spark 3.2)Enable Adaptive Query Execution.
spark.sql.adaptive.localShuffleReader.enabledtrueUse local shuffle reader for AQE-converted broadcast joins.

Join Strategy Settings

ConfigurationDefaultDescription
spark.sql.join.preferSortMergeJointruePrefer Sort-Merge over Shuffle Hash when both are viable.
spark.sql.exchange.reusetrueReuse broadcast (and shuffle) exchanges in the same query.
spark.sql.starSchemaDetectiontrueEnable star schema join reordering.
spark.sql.cbo.enabledtrueEnable Cost-Based Optimizer.
spark.sql.cbo.joinReorder.enabledfalseEnable CBO join reordering for multi-way joins.

Monitoring and Debugging Broadcast Joins

Reading the Query Plan

Use explain() to confirm whether Spark is using broadcast:

result = large_df.join(broadcast(small_df), "customer_id")
result.explain(True)

Broadcast join is active if you see:

== Physical Plan ==
*(2) BroadcastHashJoin [customer_id#0], [customer_id#5], Inner, BuildRight, false
:- *(2) Filter isnotnull(customer_id#0)
: +- *(2) FileScan parquet [customer_id#0, amount#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, int, false]), false)
+- *(1) Filter isnotnull(customer_id#5)
+- *(1) FileScan parquet [customer_id#5, name#6]

Key indicators:

  • BroadcastHashJoin — confirms broadcast join is being used
  • BroadcastExchange — shows which table is being broadcast
  • HashedRelationBroadcastMode — hash-based broadcast for equi-join
  • BuildRight / BuildLeft — indicates which side is the broadcast (build) side

Broadcast is NOT active if you see:

== Physical Plan ==
SortMergeJoin [customer_id#0], [customer_id#5], Inner
:- Sort [customer_id#0 ASC]
: +- Exchange hashpartitioning(customer_id#0, 200) ← SHUFFLE
: +- FileScan parquet
+- Sort [customer_id#5 ASC]
+- Exchange hashpartitioning(customer_id#5, 200) ← SHUFFLE
+- FileScan parquet

The Exchange hashpartitioning nodes indicate shuffles — broadcast is not being used.

Spark UI Indicators

In the Spark Web UI:

  • SQL Tab: Click on a query to see the DAG visualization. BroadcastExchange appears as a distinct node. Hover over it to see metrics: data size, time spent, number of rows broadcast.
  • Stages Tab: Broadcast joins typically show fewer stages (no shuffle stages for the large table).
  • With AQE: The SQL tab shows both the initial plan and the final adaptive plan. If AQE converted a Sort-Merge to Broadcast, the final plan will show BroadcastHashJoin while the initial plan shows SortMergeJoin.

Common Error Messages and Fixes

Error MessageCauseFix
Could not execute broadcast in 300 secsBroadcast timeoutIncrease spark.sql.broadcastTimeout or disable broadcast
Cannot broadcast the table that is larger than 8GBExceeds hard 8 GB limitCannot broadcast — use Sort-Merge Join
Total size of serialized results exceeds spark.driver.maxResultSizeDriver result size limitIncrease spark.driver.maxResultSize
java.lang.OutOfMemoryError on driverDriver OOM during broadcastIncrease spark.driver.memory or reduce broadcast size
java.lang.OutOfMemoryError on executorExecutor cannot hold broadcast variableIncrease executor memory or disable broadcast

5-Step Diagnostic Workflow

Step 1: Check the query plan — is broadcast being used?

df.explain("formatted")

Step 2: Check broadcast data size in the Spark UI SQL tab. Compare it to the expected size. If it is much larger than the Parquet file size, size amplification is the issue.

Step 3: Monitor driver memory. Check GC activity in the Spark UI Executors tab (the driver is listed as "driver"). High GC time during broadcast indicates memory pressure.

Step 4: Check for broadcast timeout. If jobs are slow but not failing, the broadcast may be close to the 300-second limit. Check stage durations.

Step 5: Compare with Sort-Merge Join. If broadcast is causing problems, disable it and compare:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
df.explain("formatted") # Verify Sort-Merge is now used
# Run and compare performance

How Cazpian Handles This

On Cazpian, broadcast join optimization is built into the compute pool configuration. Cazpian's managed Spark clusters are configured with tuned driver memory, maxResultSize, and broadcast thresholds based on the cluster size and workload profile. When you run queries against Iceberg tables, Cazpian's query optimizer evaluates table statistics from the Iceberg catalog and adjusts broadcast decisions accordingly — using broadcast for dimension table joins while falling back to SPJ or Sort-Merge for large-to-large joins. The platform also monitors broadcast-related metrics (timeout events, driver memory pressure, broadcast data sizes) and surfaces them in the cluster health dashboard, so you can identify and fix broadcast misconfigurations before they cause production failures.

What's Next

This post covered everything about broadcast joins — from internal mechanics to production tuning. For related optimizations in the Iceberg and Spark ecosystem, see our other posts: