Spark Data Skew: The Complete Guide to Identification, Debugging, and Optimization
Your 200-node cluster finished 199 of 200 tasks in 30 seconds. The last task has been running for 45 minutes. Every executor except one is idle, burning compute cost while it waits for a single partition containing 80% of the data to finish processing. The stage progress bar is stuck at 99.5%. Your Spark job that should take 2 minutes is taking an hour.
This is data skew -- the single most common and most destructive performance problem in distributed data processing. It turns a perfectly parallelized cluster into an expensive single-threaded computation. It wastes money, wastes time, and breaks SLAs. And it is entirely fixable once you know how to identify it and which optimization to apply.
This post goes deep on every dimension of data skew. We start with what it is and why it kills performance, show exactly how to identify it in the Spark UI, catalog every type of skew you will encounter, cover the AQE automatic optimizations that handle skew at runtime, walk through every manual fix with code examples, address Iceberg-specific skew problems, provide a complete configuration reference, and close with the anti-patterns that cause skew in the first place.
What Data Skew Is and Why It Kills Performance
Data skew occurs when data is unevenly distributed across partitions. In a perfectly balanced cluster, every partition has roughly the same amount of data, every task does roughly the same work, and the stage completes when the last task finishes -- which is about the same time as the first task. With skew, one or a few partitions hold dramatically more data than the rest. The tasks processing those partitions become stragglers -- they run for 10x, 100x, or 1000x longer than the other tasks in the same stage.
The Straggler Problem
Spark stages are all-or-nothing: a stage does not complete until every task in that stage finishes. If 999 tasks finish in 10 seconds and one task takes 30 minutes, the stage takes 30 minutes. The downstream stages cannot start. The 999 completed executors sit idle, consuming memory and compute but producing nothing.
Stage Timeline (Skewed):
Task 1: [===] (10 seconds, 50 MB)
Task 2: [===] (12 seconds, 55 MB)
Task 3: [===] (8 seconds, 45 MB)
...
Task 199: [===] (11 seconds, 52 MB)
Task 200: [========================================] (30 minutes, 40 GB)
^ ^
All other tasks idle Stage completes here
The Cost Cascade
Skew does not just slow things down. It triggers a cascade of expensive problems:
Over-provisioning. When a skewed job runs out of memory on the straggler task, the instinct is to increase executor memory or add more executors. But the problem is not insufficient resources -- it is uneven distribution. Adding memory helps the one straggler but wastes money on the 199 executors that were already fine. Teams routinely over-provision clusters by 3-5x to accommodate skew they have not diagnosed.
Memory pressure and spill. The straggler task processes orders of magnitude more data than it was sized for. It exhausts execution memory, triggering disk spill -- serializing data to local disk and reading it back. Spill is 10-100x slower than in-memory processing. A task that would take 30 seconds in memory might take 30 minutes with heavy spill.
GC pressure. The straggler task creates millions of Java objects for its oversized partition. The garbage collector must traverse all of them, causing stop-the-world pauses. In extreme cases, GC time exceeds the heartbeat timeout (spark.executor.heartbeatInterval, default 10 seconds), Spark assumes the executor is dead, and the task is re-scheduled on another executor -- where the same thing happens again, creating a retry loop.
OOM crashes. When the skewed partition exceeds available memory even after spill, the executor crashes with OutOfMemoryError. Spark retries the task (up to spark.task.maxFailures, default 4), but each retry hits the same skewed partition and crashes again. After max retries, the entire job fails.
Cost waste. A skewed 2-hour job on a 100-node cluster means 198 nodes are idle for most of the run. At $0.50/node/hour, that is $198/hour of wasted compute. If the job runs daily, that is $72,000/year in waste -- from a single skewed stage.
When Skew Appears
Skew manifests during any operation that redistributes data by key:
- Shuffles --
JOIN,GROUP BY,DISTINCT,WINDOWfunctions,REPARTITION BY - Aggregations --
COUNT,SUM,AVG,COLLECT_LISTon high-cardinality keys with hot values - Writes -- partitioned writes where one partition value contains far more data than others
Skew does not appear during pure map operations (filter, select, withColumn) because those operate on partitions in place without redistribution.
How to Identify Skew Using the Spark UI
The Spark UI provides definitive evidence of skew. You need to know exactly where to look and what the numbers mean.
Step 1: Find the Slow Stage
Open the Spark UI at http://<driver>:4040 (or the History Server for completed jobs). Navigate to the Stages tab. Look for stages with:
- Long duration relative to other stages
- A status showing most tasks complete but a few still running
- Shuffle Read or Shuffle Write columns showing large values
Click on the slow stage to open the stage detail page.
Step 2: Summary Metrics Table
The stage detail page shows Summary Metrics for Completed Tasks. This is the most important diagnostic tool for skew. It shows percentiles (Min, 25th, Median, 75th, Max) for every task metric:
| Metric | Min | 25th Percentile | Median | 75th Percentile | Max |
|---|---|---|---|---|---|
| Duration | 2 s | 3 s | 4 s | 5 s | 28 min |
| Shuffle Read Size | 12 MB | 45 MB | 52 MB | 58 MB | 38 GB |
| Shuffle Read Records | 100K | 400K | 450K | 500K | 350M |
| Spill (Memory) | 0 B | 0 B | 0 B | 0 B | 45 GB |
| Spill (Disk) | 0 B | 0 B | 0 B | 0 B | 12 GB |
| GC Time | 50 ms | 100 ms | 150 ms | 200 ms | 8 min |
How to read this: If the Max value is dramatically larger than the Median (10x or more), you have data skew. In the example above:
- Max duration is 420x the median
- Max shuffle read is 730x the median
- Max records are 778x the median
- Only the Max task shows spill -- confirming one task is processing far more data
Rule of thumb: If Max > 3x the 75th percentile for Duration or Shuffle Read Size, investigate skew.
Step 3: Event Timeline
Click the Event Timeline dropdown in the stage detail page. This shows a visual timeline with each task as a colored bar:
- Colored segments within each bar show how time was spent: compute (green), shuffle read (orange), shuffle write (blue), serialization (yellow), GC (red)
- Skew pattern: Most bars are short and roughly equal length. One or a few bars extend far to the right, with large red (GC) and orange (shuffle read) segments.
The timeline makes skew visually obvious. A healthy stage shows uniform bars. A skewed stage shows one dramatically longer bar.
Step 4: Task Table
Scroll to the task table and sort by Duration (descending). The straggler tasks will be at the top. Compare their Shuffle Read Size and Records Read to the average. Note the Locality Level and Executor ID -- if the same executor hosts all stragglers, it might be a data locality issue rather than true skew.
Step 5: Spill and GC Indicators
In the Summary Metrics:
Spill (Memory) -- the amount of data that was serialized due to memory pressure. If only the Max row shows spill, it confirms a single task is under memory pressure -- classic skew.
Spill (Disk) -- the amount of data written to disk after serialization. This is the actual I/O penalty. Any non-zero value in a task indicates that task exceeded its memory budget.
GC Time -- time spent in garbage collection. If the Max GC time is a significant fraction of the Max duration (>20%), the straggler is spending more time managing memory than processing data. This is a secondary indicator that confirms the memory pressure from skew.
Step 6: SQL Tab DAG Visualization
For Spark SQL queries, click the query in the SQL tab to see the DAG visualization. Hover over exchange nodes (shuffles) to see data size metrics. If one partition in an exchange is orders of magnitude larger than others, the visualization will show it. With AQE enabled, you can compare the initial plan to the final adaptive plan to see if Spark detected and handled the skew.
Quick Diagnostic Checklist
| Indicator | What It Means | Severity |
|---|---|---|
| Max Duration > 10x Median | Severe skew | High |
| Max Duration > 3x 75th Percentile | Moderate skew | Medium |
| Max Shuffle Read > 10x Median | Partition size skew | High |
| Spill only on Max task | Straggler memory pressure | High |
| GC Time > 20% of Duration on Max | GC from oversized partition | Medium |
| Stage stuck at 99% | Waiting for one straggler | High |
Types of Data Skew
Skew comes in several distinct forms. Each has different root causes and different optimal fixes.
Join Skew
Join skew occurs when the join key has a non-uniform distribution -- one or a few key values appear in dramatically more rows than others. During a shuffle join (Sort-Merge or Shuffle Hash), all rows with the same key are sent to the same partition. If key "US" appears in 500 million rows while all other country codes appear in 1-5 million rows, the partition for "US" is 100x larger than the rest.
# Classic join skew scenario
# orders table: 1 billion rows, country_code column
# 500M rows have country_code = "US", 500M distributed across 200 other codes
orders.join(customers, "country_code")
# Result: one partition gets 500M rows, others get ~2.5M each
Join skew is amplified by cartesian explosion: if the skewed key appears 500M times in the left table and 10M times in the right table, the join produces 5 quadrillion rows for that key -- which will crash any executor.
Aggregation Skew
Aggregation skew occurs when GROUP BY or window functions group data by a key with hot values. All rows with the same group key are sent to the same reducer. If one group has 1 billion rows and the average group has 10,000 rows, the reducer for the hot group does 100,000x more work.
# Aggregation skew: one merchant has 90% of all transactions
transactions.groupBy("merchant_id").agg(
count("*").alias("tx_count"),
sum("amount").alias("total_amount"),
collect_list("item").alias("items") # collect_list makes it worse -- materializes all items
)
collect_list and collect_set are particularly dangerous because they materialize all values for a group into a single array on one executor. A group with 100 million rows produces a 100-million-element array.
Null Key Skew
Null key skew is the most common and most overlooked form of skew. When a join or group-by column contains many null values, all nulls hash to the same partition. A table with 20% null values in the join column sends 20% of all data to a single partition.
# Null key skew: 200M out of 1 billion rows have null customer_id
orders.join(customers, "customer_id")
# 200M rows with null customer_id → all sent to one partition
# For inner join: these rows are discarded after the shuffle -- wasted work
# For left join: these 200M rows produce null matches in one partition
The irony is that for inner joins, null keys never match anything -- the 200M rows are shuffled, processed, and then discarded. The shuffle work is entirely wasted. For outer joins, they must be processed but create a massive skewed partition.
Time-Based Skew
Time-based skew occurs when data partitioned or grouped by time has uneven volume across time periods. This is ubiquitous in event-driven systems:
- Black Friday: 50x normal daily volume
- Product launches: Spike in activity for a few hours
- Business hours vs off-hours: 10x more data during working hours
- Month-end processing: Financial data concentrated on the last day
# Time-based skew: partitioned by day, Black Friday has 50x data
events.write.partitionBy("event_date").parquet("s3://bucket/events")
# Partition 2026-11-27 (Black Friday): 5 TB
# All other partitions: ~100 GB each
When you later query across partitions, the Black Friday partition creates a straggler that processes 50x more data.
Write Skew
Write skew occurs during partitioned writes when the output partition key has skewed distribution. Spark writes one file per partition per task. If one partition value dominates, many tasks write large files for that partition while writing tiny files for others.
# Write skew: repartition by region before writing
events.repartition("region").write.partitionBy("region").parquet("s3://...")
# Region "US" gets one massive partition → one task writes a huge file
# Region "LI" (Liechtenstein) gets a tiny partition → tiny file
Write skew also causes the small files problem -- hundreds of tiny files for low-volume partitions alongside a few massive files for high-volume partitions. This degrades downstream read performance and creates metadata overhead.
AQE Optimizations for Skew
Adaptive Query Execution (AQE) is Spark's runtime optimization framework that detects and handles skew automatically. It is enabled by default since Spark 3.2. AQE provides two critical optimizations for skew: skew join handling and partition coalescing.
AQE Skew Join
AQE skew join dynamically splits oversized partitions during sort-merge joins. It detects skew after the shuffle completes (when actual partition sizes are known) and splits the skewed partition into smaller sub-partitions that run as separate tasks.
How It Works
- Shuffle completes. Both sides of the sort-merge join have been shuffled and their partition sizes are known.
- Skew detection. AQE examines every partition and flags it as skewed if both conditions are true:
- Partition size >
skewedPartitionFactor(default 5.0) x median partition size - Partition size >
skewedPartitionThresholdInBytes(default 256 MB)
- Partition size >
- Partition splitting. The skewed partition is split into sub-partitions, each targeting
spark.sql.adaptive.advisoryPartitionSizeInBytes(default 64 MB) in size. - Replication of the other side. For each sub-partition of the skewed side, the corresponding partition from the other side of the join is replicated -- each sub-task reads the full other-side partition to ensure correctness.
- Parallel execution. The sub-tasks run in parallel, turning one 30-minute straggler into (for example) 50 tasks that each take 30 seconds.
Before AQE Skew Join:
Partition 42 (left): [========================] 40 GB ← skewed
Partition 42 (right): [===] 500 MB
After AQE Skew Join (split into ~625 sub-partitions of 64 MB each):
Sub-partition 42-0 (left): [=] 64 MB + Partition 42 (right): [===] 500 MB
Sub-partition 42-1 (left): [=] 64 MB + Partition 42 (right): [===] 500 MB (replicated)
Sub-partition 42-2 (left): [=] 64 MB + Partition 42 (right): [===] 500 MB (replicated)
...
Sub-partition 42-624 (left): [=] 64 MB + Partition 42 (right): [===] 500 MB (replicated)
Supported Join Types
AQE skew join optimization applies to sort-merge joins and supports the following join types:
| Join Type | Skew Handling | Notes |
|---|---|---|
| Inner Join | Both sides | Can split skew on either or both sides |
| Left Outer Join | Left side only | Splits skewed left partitions; right side is replicated |
| Right Outer Join | Right side only | Splits skewed right partitions; left side is replicated |
| Left Semi Join | Left side only | Splits skewed left partitions |
| Left Anti Join | Left side only | Splits skewed left partitions |
| Cross Join | Both sides | Can split skew on either side |
| Full Outer Join | Not supported | AQE cannot handle full outer join skew |
Full outer join is the notable gap. If you have a skewed full outer join, you must use manual techniques (salting, isolate-and-broadcast).
Skew Join Configurations
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.enabled | true | Master switch for AQE (since Spark 3.2) |
spark.sql.adaptive.skewJoin.enabled | true | Enable skew join optimization within AQE |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5.0 | Partition is skewed if size > factor x median |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | Partition is skewed if size > this absolute threshold |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | Target size for each sub-partition after splitting |
spark.sql.adaptive.forceOptimizeSkewedJoin | false | Force skew optimization even when it may not help (since Spark 3.3) |
Tuning Skew Join Detection
The default settings are conservative. For workloads with moderate skew:
# More aggressive skew detection -- catch moderate skew too
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3.0")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "128MB")
# Smaller sub-partitions for finer-grained parallelism
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "32MB")
For workloads where you know skew exists and want to force handling:
# Force skew optimization
spark.conf.set("spark.sql.adaptive.forceOptimizeSkewedJoin", "true")
Limitations of AQE Skew Join
- Only sort-merge joins. AQE skew handling does not apply to broadcast joins (which do not shuffle) or shuffle hash joins.
- Replication overhead. The non-skewed side's partition is replicated for every sub-partition. If the skewed side splits into 100 sub-partitions, the other side is read 100 times. If the other side's partition is also large, this creates significant I/O.
- Post-shuffle only. AQE detects skew after the shuffle completes. The shuffle cost for the skewed partition has already been paid -- AQE optimizes the join computation, not the shuffle itself.
- Full outer joins are not supported. You must handle skew manually for full outer joins.
- Aggregation skew is not handled. AQE skew join only handles join operations. Skewed
GROUP BYoperations are not split.
AQE Coalesce Partitions
While skew join handles oversized partitions, coalesce partitions handles the opposite problem: too many small partitions after a shuffle. This is relevant to skew because fixing skew (via salting or other techniques) can create many small partitions.
How It Works
After a shuffle completes, AQE examines the actual sizes of all shuffle partitions. It merges contiguous small partitions into larger ones, targeting spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB). This reduces the number of tasks in downstream stages, cutting scheduler overhead and improving efficiency.
Before Coalescing (200 partitions, most tiny):
[=][=][=][=][=][=][=][===========][=][=][=][=]...
2 3 1 4 2 5 3 500 1 2 4 3 MB
After Coalescing (merged contiguous small partitions):
[===========][===========][===========][=====]
64 MB 64 MB 64 MB remaining
Coalesce Configurations
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled | true | Enable partition coalescing after shuffle |
spark.sql.adaptive.coalescePartitions.parallelismFirst | true | When true, ignores target size and maximizes parallelism |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | Minimum partition size after coalescing |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | (none) | Initial shuffle partition count before coalescing. Defaults to spark.sql.shuffle.partitions |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | Target partition size for coalescing |
Optimizing Coalesce Behavior
For skew workloads, the interaction between skew splitting and coalescing is important. Skew splitting creates many small sub-partitions for the skewed key, while coalescing merges small partitions from non-skewed keys:
# Set high initial partition count (AQE will coalesce down)
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# Target 64 MB partitions after coalescing
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "67108864")
# Allow small partitions to be merged
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1048576")
# Set parallelismFirst to false for better partition sizing
spark.conf.set("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false")
Setting parallelismFirst=false tells AQE to respect the advisory partition size rather than maximizing parallelism. This produces more uniformly-sized partitions, which reduces skew in downstream stages.
AQE Rebalance Partition Skew (Write Skew)
Since Spark 3.2, AQE can also optimize skewed partitions during rebalance operations (triggered by REBALANCE hint or partitioned writes):
# AQE optimizes skewed partitions during writes
spark.conf.set("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled", "true")
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled | true | Split skewed partitions during rebalance |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor | 0.2 | Merge partitions smaller than this factor x advisory size |
Manual Fixes for Data Skew
AQE handles many skew scenarios automatically, but there are cases where manual intervention is required: aggregation skew, full outer join skew, or situations where you need more control than AQE provides.
Salting Technique
Salting is the most versatile manual skew fix. It works by appending a random number to the skewed key, which distributes data that would go to one partition across multiple partitions.
Salting for Joins
The concept: add a random salt (0 to N-1) to the skewed side, and explode the non-skewed side to match all possible salt values. This distributes the skewed key's data across N partitions while ensuring every row still finds its match.
from pyspark.sql.functions import (
col, lit, rand, floor, concat_ws, explode, array
)
SALT_BUCKETS = 20 # Number of salt buckets -- tune based on skew severity
# === Left side (skewed): add random salt ===
orders_salted = orders.withColumn(
"salt", floor(rand() * SALT_BUCKETS).cast("int")
).withColumn(
"salted_key", concat_ws("_", col("customer_id").cast("string"), col("salt").cast("string"))
)
# === Right side (smaller): replicate for every salt value ===
salt_range = spark.range(SALT_BUCKETS).withColumnRenamed("id", "salt")
customers_salted = customers.crossJoin(salt_range).withColumn(
"salted_key", concat_ws("_", col("customer_id").cast("string"), col("salt").cast("string"))
)
# === Join on salted key ===
result = orders_salted.join(
customers_salted,
"salted_key",
"inner"
).drop("salt", "salted_key")
-- SQL equivalent
WITH salted_orders AS (
SELECT *,
FLOOR(RAND() * 20) AS salt,
CONCAT(CAST(customer_id AS STRING), '_', CAST(FLOOR(RAND() * 20) AS STRING)) AS salted_key
FROM orders
),
salt_range AS (
SELECT id AS salt FROM RANGE(20)
),
salted_customers AS (
SELECT c.*,
s.salt,
CONCAT(CAST(c.customer_id AS STRING), '_', CAST(s.salt AS STRING)) AS salted_key
FROM customers c
CROSS JOIN salt_range s
)
SELECT so.order_id, so.amount, sc.customer_name
FROM salted_orders so
JOIN salted_customers sc ON so.salted_key = sc.salted_key;
How to choose SALT_BUCKETS: Estimate the skew ratio. If the largest partition is 100x the median, start with 50-100 buckets. If it is 10x the median, 10-20 buckets is sufficient. More buckets = better distribution but more replication of the non-skewed side.
Cost of salting: The right side (customers) is replicated N times, so its shuffle size increases by Nx. For a 100 MB dimension table with 20 salt buckets, the right side becomes 2 GB after salting. This is the trade-off: you distribute the skewed side's 40 GB across 20 partitions (2 GB each), but the right side grows from 100 MB to 2 GB. The net result is still dramatically better because 20 parallel tasks each processing 2 GB is far faster than one task processing 40 GB.
Salting for Aggregations
For aggregation skew, salting uses a two-stage approach: first aggregate with the salted key, then aggregate again to combine the partial results.
from pyspark.sql.functions import col, rand, floor, sum, count, avg, concat_ws
SALT_BUCKETS = 10
# Stage 1: Partial aggregation with salt
partial_agg = (
transactions
.withColumn("salt", floor(rand() * SALT_BUCKETS).cast("int"))
.groupBy("merchant_id", "salt")
.agg(
count("*").alias("partial_count"),
sum("amount").alias("partial_sum")
)
)
# Stage 2: Final aggregation -- combine partial results
final_agg = (
partial_agg
.groupBy("merchant_id")
.agg(
sum("partial_count").alias("tx_count"),
sum("partial_sum").alias("total_amount")
)
.withColumn("avg_amount", col("total_amount") / col("tx_count"))
)
-- SQL equivalent
WITH partial AS (
SELECT
merchant_id,
FLOOR(RAND() * 10) AS salt,
COUNT(*) AS partial_count,
SUM(amount) AS partial_sum
FROM transactions
GROUP BY merchant_id, FLOOR(RAND() * 10)
)
SELECT
merchant_id,
SUM(partial_count) AS tx_count,
SUM(partial_sum) AS total_amount,
SUM(partial_sum) / SUM(partial_count) AS avg_amount
FROM partial
GROUP BY merchant_id;
Important: Salting for aggregations only works with decomposable aggregations -- functions where you can combine partial results. SUM, COUNT, MIN, MAX are decomposable. AVG is not directly decomposable (you need SUM/COUNT). MEDIAN, PERCENTILE, and COUNT(DISTINCT) are not decomposable and require different approaches.
Selective Salting (Salt Only Hot Keys)
Full salting replicates the entire right side. If you know which keys are skewed, you can salt only those keys:
# Identify the hot keys
hot_keys = (
orders.groupBy("customer_id")
.count()
.filter(col("count") > 1000000) # Keys with >1M rows
.select("customer_id")
.collect()
)
hot_key_set = {row.customer_id for row in hot_keys}
# Broadcast the hot key set
hot_key_broadcast = spark.sparkContext.broadcast(hot_key_set)
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
is_hot = udf(lambda x: x in hot_key_broadcast.value, BooleanType())
# Split into hot and cold
orders_hot = orders.filter(is_hot(col("customer_id")))
orders_cold = orders.filter(~is_hot(col("customer_id")))
# Salt only the hot portion
orders_hot_salted = orders_hot.withColumn(
"salt", floor(rand() * 20).cast("int")
).withColumn(
"salted_key", concat_ws("_", col("customer_id").cast("string"), col("salt").cast("string"))
)
# Replicate customers only for hot keys
customers_hot = customers.filter(col("customer_id").isin([r.customer_id for r in hot_keys]))
salt_range = spark.range(20).withColumnRenamed("id", "salt")
customers_hot_salted = customers_hot.crossJoin(salt_range).withColumn(
"salted_key", concat_ws("_", col("customer_id").cast("string"), col("salt").cast("string"))
)
# Join hot and cold separately, then union
result_hot = orders_hot_salted.join(customers_hot_salted, "salted_key").drop("salt", "salted_key")
result_cold = orders_cold.join(customers, "customer_id")
result = result_hot.unionByName(result_cold, allowMissingColumns=True)
This avoids replicating the entire right side -- only the hot keys are replicated. The cold (non-skewed) keys join normally without any overhead.
Isolate-and-Broadcast
When you know the skewed keys and the data for those keys on the non-skewed side is small enough to broadcast, you can isolate the skewed keys and use a broadcast join for them while joining the rest normally.
from pyspark.sql.functions import broadcast
# Step 1: Identify skewed keys
skewed_keys = ["US", "CN", "IN"] # Known high-volume countries
# Step 2: Split the large table
orders_skewed = orders.filter(col("country_code").isin(skewed_keys))
orders_normal = orders.filter(~col("country_code").isin(skewed_keys))
# Step 3: Split the small table to match
customers_skewed = customers.filter(col("country_code").isin(skewed_keys))
customers_normal = customers.filter(~col("country_code").isin(skewed_keys))
# Step 4: Broadcast join for skewed keys (no shuffle)
result_skewed = orders_skewed.join(
broadcast(customers_skewed), "country_code"
)
# Step 5: Normal join for non-skewed keys (evenly distributed)
result_normal = orders_normal.join(customers_normal, "country_code")
# Step 6: Union results
result = result_skewed.unionByName(result_normal)
This technique works well when:
- You know which keys are skewed
- The data for those keys on the non-skewed side fits in memory for broadcast
- The skewed keys are a small number of distinct values (even if they contain most of the rows)
Two-Phase Aggregation
Two-phase aggregation is Spark's built-in mechanism for reducing shuffle data during aggregations. Spark already does this automatically via HashAggregate (partial aggregation on the map side, final aggregation on the reduce side). However, you can explicitly implement a more aggressive version for severely skewed aggregations.
# Spark's built-in two-phase aggregation:
# Phase 1 (partial, before shuffle): each partition computes partial counts/sums
# Phase 2 (final, after shuffle): merge partial results
transactions.groupBy("merchant_id").agg(sum("amount"))
# Spark plans this as: HashAggregate(partial) → Exchange → HashAggregate(final)
For severely skewed keys where the partial aggregation does not reduce enough data (because the skewed key dominates a single partition), the salting approach described above is the manual two-phase aggregation.
For non-decomposable functions like approx_count_distinct or percentile_approx:
# Two-phase with approximate functions
from pyspark.sql.functions import approx_count_distinct, percentile_approx
# These are already approximate and handle partial aggregation internally
transactions.groupBy("merchant_id").agg(
approx_count_distinct("customer_id", 0.05).alias("unique_customers"),
percentile_approx("amount", array(lit(0.5), lit(0.95)), 100).alias("amount_percentiles")
)
Null Key Handling
Null keys are the most fixable form of skew because the solution is straightforward: handle nulls separately.
For Inner Joins (Nulls Never Match)
# Filter nulls BEFORE the join -- they will never match anyway
orders_clean = orders.filter(col("customer_id").isNotNull())
result = orders_clean.join(customers, "customer_id")
This is the simplest and most impactful fix. If 20% of your rows have null join keys and you are doing an inner join, you just eliminated 20% of your shuffle data and removed the skew entirely.
For Outer Joins (Nulls Must Be Preserved)
# Separate null and non-null rows
orders_with_key = orders.filter(col("customer_id").isNotNull())
orders_null_key = orders.filter(col("customer_id").isNull())
# Join only the non-null rows
joined = orders_with_key.join(customers, "customer_id", "left")
# Add back the null rows with null customer columns
result = joined.unionByName(
orders_null_key.crossJoin(
spark.createDataFrame(
[tuple([None] * len(customers.columns))],
customers.schema
)
),
allowMissingColumns=True
)
Replace Nulls with Distributed Random Values
For aggregations where null is a valid group:
from pyspark.sql.functions import when, concat, lit, rand, floor
# Replace nulls with randomized placeholder values
# This distributes null rows across multiple partitions
orders_fixed = orders.withColumn(
"customer_id_safe",
when(
col("customer_id").isNull(),
concat(lit("NULL_"), floor(rand() * 100).cast("string"))
).otherwise(col("customer_id").cast("string"))
)
# Aggregate with the safe key
partial = orders_fixed.groupBy("customer_id_safe").agg(
count("*").alias("cnt"),
sum("amount").alias("total")
)
# Combine null groups back together
from pyspark.sql.functions import substring
result = partial.withColumn(
"customer_id",
when(
substring(col("customer_id_safe"), 1, 5) == "NULL_",
lit(None)
).otherwise(col("customer_id_safe"))
).groupBy("customer_id").agg(
sum("cnt").alias("tx_count"),
sum("total").alias("total_amount")
)
Repartitioning
Manual repartitioning can fix skew before it happens, but it comes with trade-offs.
Repartition by a Better Key
If the join key is skewed but there is a more uniformly distributed column available:
# Instead of partitioning by skewed country_code,
# repartition by a composite key or hash
from pyspark.sql.functions import hash
orders_repartitioned = orders.repartition(
200,
hash(col("country_code"), col("order_id")) % 200
)
Repartition with a Specific Partition Count
Increasing the number of partitions can reduce the impact of skew by making each partition smaller:
# More partitions = smaller max partition size
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# Or repartition explicitly
orders.repartition(2000, "customer_id")
However, this does not fix fundamental skew -- if one key has 90% of the data, it still goes to one partition regardless of partition count.
Repartition by Hash of Multiple Columns
# Distribute by a composite key for more uniform partitioning
orders.repartition(500, "customer_id", "order_date")
Warning: Repartitioning always triggers a full shuffle. Only repartition when the downstream operations will benefit enough to justify the shuffle cost. Do not repartition before a broadcast join (which does not need partitioning) or before a simple filter (which does not redistribute data).
Skew with Apache Iceberg
Iceberg tables have their own skew characteristics that interact with Spark's processing. Understanding Iceberg-specific skew -- partition skew, file-level skew, and how compaction addresses them -- is critical for maintaining healthy table performance.
Partition Skew Detection
Iceberg's partitions metadata table reveals partition-level data distribution. Use it to identify skewed partitions:
-- Check partition sizes and file counts
SELECT
partition,
SUM(file_count) AS total_files,
SUM(record_count) AS total_records,
SUM(total_data_file_size_in_bytes) / 1048576 AS total_size_mb
FROM my_catalog.my_db.my_table.partitions
GROUP BY partition
ORDER BY total_size_mb DESC;
Skew indicators:
- One partition has 10x or more data than the average
- One partition has 10x or more files than the average
- The largest partition is orders of magnitude larger than the median
File-Level Skew Detection
Even within a single partition, file sizes can be skewed. The files metadata table shows individual file sizes:
-- Detect file size variance within partitions
SELECT
partition,
COUNT(*) AS file_count,
MIN(file_size_in_bytes) / 1048576 AS min_file_mb,
AVG(file_size_in_bytes) / 1048576 AS avg_file_mb,
MAX(file_size_in_bytes) / 1048576 AS max_file_mb,
(MAX(file_size_in_bytes) - MIN(file_size_in_bytes)) / NULLIF(AVG(file_size_in_bytes), 0) AS size_variance_ratio,
SUM(CASE WHEN file_size_in_bytes < 8388608 THEN 1 ELSE 0 END) AS small_files_under_8mb,
SUM(CASE WHEN file_size_in_bytes > 536870912 THEN 1 ELSE 0 END) AS large_files_over_512mb
FROM my_catalog.my_db.my_table.files
GROUP BY partition
HAVING COUNT(*) > 1
ORDER BY size_variance_ratio DESC;
File skew indicators:
size_variance_ratio> 10 (max file is 10x the average)- High count of
small_files_under_8mb(many tiny files from streaming or skewed writes) - Any
large_files_over_512mb(oversized files from skewed partitions)
How Partition Skew Affects Query Performance
When Spark reads an Iceberg table, each data file becomes one or more Spark tasks. A partition with 500 files generates 500 tasks while a partition with 5 files generates 5 tasks. If the 500-file partition also contains the majority of the data, the scan stage will be skewed.
Additionally, Iceberg's partition pruning eliminates entire partitions from scans. If a query targets only the skewed partition, all the data lands on a subset of executors:
# This query hits only the skewed partition
events = spark.table("catalog.db.events")
black_friday = events.filter(col("event_date") == "2026-11-27")
# All 5 TB of Black Friday data is read -- potentially overwhelming executors
Compaction Fixes
Iceberg's rewrite_data_files procedure addresses file-level skew by rewriting files to target sizes.
Bin-Pack Strategy (Default)
Bin-pack combines small files into larger ones without changing data order. It is the fastest and cheapest compaction strategy:
CALL my_catalog.system.rewrite_data_files(
table => 'my_db.my_table',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '134217728', -- 128 MB target
'min-file-size-bytes', '67108864', -- 64 MB minimum (don't rewrite files above this)
'max-file-size-bytes', '201326592', -- 192 MB maximum
'min-input-files', '5', -- Minimum files in a group to trigger rewrite
'max-concurrent-file-group-rewrites', '10' -- Parallel rewrite groups
)
);
Sort Strategy (For Query-Aligned Compaction)
Sort compaction rewrites files and sorts the data, which improves partition pruning and min/max statistics for downstream queries:
CALL my_catalog.system.rewrite_data_files(
table => 'my_db.my_table',
strategy => 'sort',
sort_order => 'customer_id ASC NULLS LAST, event_date DESC',
options => map(
'target-file-size-bytes', '134217728',
'max-concurrent-file-group-rewrites', '5'
)
);
Compacting Specific Skewed Partitions
When you know which partition is skewed, target it specifically:
-- Compact only the Black Friday partition
CALL my_catalog.system.rewrite_data_files(
table => 'my_db.events',
strategy => 'binpack',
where => 'event_date = "2026-11-27"',
options => map(
'target-file-size-bytes', '134217728',
'min-input-files', '2'
)
);
Rewrite Manifests
After compaction, manifests may also need rewriting for optimal query planning:
CALL my_catalog.system.rewrite_manifests(
table => 'my_db.my_table',
use_caching => false -- Disable caching for large tables to avoid memory pressure
);
Iceberg Write-Time Skew Prevention
Prevent file-level skew at write time with proper Iceberg table properties:
-- Set target file size for writes
ALTER TABLE my_catalog.my_db.my_table
SET TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728', -- 128 MB
'write.distribution-mode' = 'hash', -- Hash-distribute before write
'write.parquet.row-group-size-bytes' = '134217728' -- Parquet row group size
);
The write.distribution-mode property controls how data is distributed before writing:
| Mode | Behavior | Skew Impact |
|---|---|---|
none | No redistribution; tasks write directly | Fast writes but file sizes depend on input distribution |
hash | Hash-partition by partition key before write | Even file sizes within partitions; adds shuffle cost |
range | Range-partition by sort key before write | Best for sorted tables; adds shuffle cost |
For skew-prone tables, hash or range distribution modes add a shuffle cost but produce uniformly-sized files, preventing downstream query skew.
Monitoring Iceberg Table Health for Skew
Build a monitoring query that runs periodically:
# Iceberg table health check for skew
def check_iceberg_skew(spark, table_name, skew_threshold=10):
files_df = spark.table(f"{table_name}.files")
partition_stats = files_df.groupBy("partition").agg(
count("*").alias("file_count"),
sum("file_size_in_bytes").alias("total_bytes"),
sum("record_count").alias("total_records"),
avg("file_size_in_bytes").alias("avg_file_bytes"),
max("file_size_in_bytes").alias("max_file_bytes"),
min("file_size_in_bytes").alias("min_file_bytes")
)
# Calculate global statistics
global_avg = partition_stats.agg(
avg("total_bytes").alias("global_avg_bytes"),
avg("file_count").alias("global_avg_files")
).collect()[0]
# Flag skewed partitions
skewed = partition_stats.filter(
(col("total_bytes") > global_avg.global_avg_bytes * skew_threshold) |
(col("file_count") > global_avg.global_avg_files * skew_threshold) |
(col("max_file_bytes") > col("avg_file_bytes") * skew_threshold)
)
return skewed
Complete Configuration Reference
AQE Skew Join Configurations
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.enabled | true | Master switch for AQE |
spark.sql.adaptive.skewJoin.enabled | true | Enable automatic skew join handling |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5.0 | Skew if partition > factor x median |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | Skew if partition > this absolute size |
spark.sql.adaptive.forceOptimizeSkewedJoin | false | Force skew optimization (Spark 3.3+) |
AQE Coalesce Configurations
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled | true | Enable partition coalescing |
spark.sql.adaptive.coalescePartitions.parallelismFirst | true | Maximize parallelism over target size |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | Minimum partition size after coalescing |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | (none) | Initial partition count before coalescing |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | Target partition size (shared with skew join) |
AQE Rebalance Configurations
| Configuration | Default | Description |
|---|---|---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled | true | Split skewed partitions during rebalance |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor | 0.2 | Merge partitions smaller than factor x advisory size |
Shuffle and Partitioning
| Configuration | Default | Description |
|---|---|---|
spark.sql.shuffle.partitions | 200 | Number of partitions for shuffles |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Max table size for auto-broadcast (skew avoidance) |
spark.sql.adaptive.autoBroadcastJoinThreshold | (none) | AQE-specific broadcast threshold; defaults to autoBroadcastJoinThreshold |
spark.sql.adaptive.localShuffleReader.enabled | true | Use local shuffle reader after AQE conversions |
Memory (Relevant to Skew-Induced Spill)
| Configuration | Default | Description |
|---|---|---|
spark.executor.memory | 1g | Executor JVM heap size |
spark.memory.fraction | 0.6 | Fraction of (heap - 300 MB) for unified memory |
spark.memory.storageFraction | 0.5 | Initial split for storage within unified memory |
spark.executor.memoryOverhead | executorMemory * 0.10 (min 384 MB) | Off-heap memory per executor |
Task Retry and Failure (Skew Crash Recovery)
| Configuration | Default | Description |
|---|---|---|
spark.task.maxFailures | 4 | Max task retry attempts before job failure |
spark.executor.heartbeatInterval | 10s | Heartbeat interval; GC pauses exceeding this kill executor |
spark.network.timeout | 120s | Network timeout for RPC operations |
Iceberg Write Properties (Skew Prevention)
| Configuration | Default | Description |
|---|---|---|
write.target-file-size-bytes | 536870912 (512 MB) | Target file size for writes |
write.distribution-mode | none | Data distribution before write: none, hash, range |
write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
Anti-Patterns That Cause Skew
These are the patterns that reliably create skew. Recognizing them in code review prevents skew before it happens.
Anti-Pattern 1: Joining on Low-Cardinality Keys
# BAD: country_code has ~200 distinct values, "US" dominates
orders.join(customers, "country_code")
# BETTER: Join on a high-cardinality key
orders.join(customers, "customer_id")
# IF YOU MUST join on low-cardinality: use broadcast
orders.join(broadcast(country_dim), "country_code")
Low-cardinality join keys guarantee that a small number of partitions receive most of the data. If you must join on such keys, broadcast one side to avoid the shuffle entirely.
Anti-Pattern 2: GROUP BY on Columns with Hot Values
# BAD: merchant_id "amazon" has 90% of transactions
transactions.groupBy("merchant_id").agg(sum("amount"))
# BETTER: Use salting for the aggregation
# (See salting for aggregations above)
Anti-Pattern 3: Ignoring Null Values in Join Columns
# BAD: 30% of orders have null customer_id
orders.join(customers, orders.customer_id == customers.customer_id, "left")
# BETTER: Filter nulls for inner joins
orders.filter(col("customer_id").isNotNull()).join(customers, "customer_id")
# BETTER: Handle nulls separately for outer joins
# (See null key handling above)
Anti-Pattern 4: Using collect_list / collect_set on Skewed Groups
# BAD: "amazon" group has 500M rows → 500M-element array on one executor
transactions.groupBy("merchant_id").agg(collect_list("item_id"))
# BETTER: Limit the collection or use a different approach
transactions.groupBy("merchant_id").agg(
count("*").alias("item_count"),
slice(collect_list("item_id"), 1, 1000).alias("sample_items")
)
collect_list on a skewed group creates an array with millions or billions of elements on a single executor. This causes OOM even on executors with hundreds of GB of memory.
Anti-Pattern 5: Repartitioning by a Skewed Column
# BAD: repartition by a skewed column concentrates data
orders.repartition("country_code") # "US" partition is 100x larger
# BETTER: repartition by a high-cardinality column
orders.repartition("order_id")
# BETTER: repartition by multiple columns
orders.repartition("country_code", "order_date")
Anti-Pattern 6: Static spark.sql.shuffle.partitions = 200
# BAD: 200 partitions for a 2 TB dataset = 10 GB per partition
spark.conf.set("spark.sql.shuffle.partitions", "200")
# BETTER: set high and let AQE coalesce
spark.conf.set("spark.sql.shuffle.partitions", "2000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") # 128 MB
With the default 200 partitions, large datasets produce oversized partitions that are prone to skew-like symptoms even without key distribution problems. Set a higher initial partition count and let AQE coalesce the small ones.
Anti-Pattern 7: Writing Without Distribution Mode
# BAD: direct write without distribution — file sizes depend on input partition sizes
df.writeTo("catalog.db.events").append()
# BETTER: set distribution mode on the table
# ALTER TABLE catalog.db.events SET TBLPROPERTIES ('write.distribution-mode' = 'hash')
Without a distribution mode, Iceberg writes files directly from whatever partitioning the DataFrame has. If the DataFrame partitions are skewed, the output files will be skewed.
Anti-Pattern 8: Chaining Multiple Skewed Operations
# BAD: skew compounds through the pipeline
step1 = raw_events.groupBy("user_id").agg(...) # Skewed on popular users
step2 = step1.join(user_profiles, "user_id") # Same skew carries forward
step3 = step2.groupBy("region").agg(...) # Additional skew on popular regions
# Each step makes the skew worse
# BETTER: fix skew at the source and repartition between stages
step1 = raw_events.groupBy("user_id").agg(...)
step1_balanced = step1.repartition(200) # Rebalance before next shuffle
step2 = step1_balanced.join(broadcast(user_profiles), "user_id") # Broadcast avoids shuffle
step3 = step2.groupBy("region").agg(...)
Decision Guide: Which Fix to Apply
| Scenario | Recommended Fix | Why |
|---|---|---|
| Skewed sort-merge join, Spark 3.2+ | Enable AQE skew join (default) | Automatic, zero code changes |
| Full outer join with skew | Salting or isolate-and-broadcast | AQE does not support full outer |
| Aggregation skew (GROUP BY) | Salting for aggregations | AQE does not handle aggregation skew |
| Null key skew, inner join | Filter nulls before join | Nulls never match; remove them |
| Null key skew, outer join | Handle nulls separately | Avoid skewed null partition |
| Known hot keys, small lookup table | Isolate-and-broadcast | Most efficient for known hot keys |
| Write skew / uneven file sizes | Set write.distribution-mode=hash | Prevents skew at write time |
| Iceberg file skew after writes | rewrite_data_files compaction | Fixes existing file skew |
| Too many small partitions after shuffle | AQE coalesce partitions (default) | Automatic partition merging |
| General skew with unknown keys | Salting (full) | Works without knowing which keys are hot |
How Cazpian Handles This
On Cazpian, Spark compute pools are configured with AQE enabled and tuned skew detection thresholds based on cluster size and workload profile. When you run queries against Iceberg tables on Cazpian, the platform monitors shuffle metrics and task duration distributions in real time, automatically surfacing skew warnings when straggler tasks are detected. Cazpian's managed compaction service monitors Iceberg table file sizes and partition distributions, triggering targeted rewrite_data_files operations when skew thresholds are exceeded. For tables with known hot-key patterns, Cazpian's query advisor recommends salting strategies and broadcast join configurations, helping you fix skew before it reaches production. The platform's cost dashboard correlates cluster utilization with skew metrics, showing exactly how much compute spend is wasted on idle executors waiting for straggler tasks.
What's Next
This post covered every dimension of data skew in Apache Spark -- from identification to optimization. For related topics in the Spark and Iceberg ecosystem, see our other posts:
- Spark Broadcast Joins -- eliminate shuffles for small-large joins, the primary alternative to skewed sort-merge joins.
- Spark Caching and Persistence -- caching enables accurate size estimates that prevent accidental skewed joins.
- Storage Partitioned Joins -- eliminate shuffles entirely for Iceberg tables with compatible bucket partitioning.
- Iceberg Query Performance Tuning -- partition pruning, bloom filters, and read-side optimization.
- Mastering Iceberg File Sizes -- write controls and compaction for optimal file sizes.
- Iceberg Table Design -- choosing partition transforms to minimize write skew.
- Iceberg Metrics Reporting -- monitoring scan and commit health for skew detection.