Skip to main content

Spark Caching and Persistence: The Complete Guide for Iceberg and Cazpian

· 30 min read
Cazpian Engineering
Platform Engineering Team

Spark Caching and Persistence: The Complete Guide for Iceberg and Cazpian

You are running the same 500 GB join three times in a single pipeline — once for a daily summary, once for a top-products report, once for customer segmentation. Each query reads from S3, shuffles terabytes across the network, builds hash maps, and aggregates from scratch. That is 1.5 TB of redundant I/O, three redundant shuffles, and three redundant sort-merge joins.

Spark caching eliminates this waste. You compute the expensive join once, store the result in executor memory, and every subsequent query reads from that in-memory copy instead of going back to object storage. The improvement is not incremental — it is typically 10-100x faster for repeated access patterns.

But caching does something else that is less obvious and equally powerful: it makes Spark's query optimizer smarter. When a table is cached, Spark knows its exact in-memory size. If that size falls below the broadcast join threshold, the optimizer automatically converts a Sort-Merge Join into a Broadcast Hash Join — eliminating the shuffle entirely, without you writing a single hint.

This post covers every dimension of Spark caching. We start with internals, walk through every storage level, show all the ways to cache, explain the columnar storage format that makes DataFrame caching special, dive into memory management, discuss how much data you should actually cache (spoiler: not terabytes), show you how to read the Spark UI Storage tab, and cover the pitfalls that catch production workloads.

How Spark Caching Works Internally

Understanding the internals explains why caching sometimes fails silently, why partial caching happens, and why memory estimates are often wrong.

Detailed diagram of Spark caching and persistence showing storage levels, in-memory columnar format, memory management with eviction, and how caching enables automatic broadcast join conversion

Lazy Evaluation — Nothing Happens on .cache()

When you call .cache() or .persist(), nothing happens immediately. These are lazy operations that mark the DataFrame's logical plan as "cacheable." The data is not read, computed, or stored until an action triggers execution — .count(), .collect(), .show(), .write(), or any operation that produces a result.

df = spark.read.parquet("s3://bucket/500gb-table")
df.cache() # Nothing happens. No data is read. No memory is used.
df.count() # NOW: reads from S3, computes, and stores in memory.
df.show(10) # Reads from cache — instant.

This means if your action only touches a subset of partitions (e.g., .take(10) or .limit(100)), only those partitions are materialized and cached. Unless you run an action that touches every partition (like .count() or a full .write()), you are working with a partial cache.

BlockManager — Where Cached Data Lives

BlockManager is Spark's local key-value store running on every node (driver and executors). It manages blocks representing cached partitions, shuffle outputs, and broadcast variables.

Each RDD/DataFrame partition maps to exactly one block with an ID like rdd_42_7 (RDD 42, partition 7). When a partition is computed during an action:

  1. BlockManager.putIterator() is called with the computed iterator and the StorageLevel
  2. For deserialized storage (MEMORY_ONLY, MEMORY_AND_DISK): values are "unrolled" one by one into MemoryStore, with periodic checks for remaining space
  3. For serialized storage (MEMORY_ONLY_SER, MEMORY_AND_DISK_SER): values are serialized into compact byte arrays first
  4. If memory is full and the storage level allows disk, the block spills to DiskStore
  5. If memory is full and the storage level is MEMORY_ONLY, the partition is simply not cached — it will be recomputed from lineage on next access

LRU Eviction

MemoryStore uses a LinkedHashMap in access-order mode for LRU (Least Recently Used) eviction. When a new block needs space:

  1. Iterates from least-recently accessed to most-recently accessed
  2. Skips blocks from the same RDD (to avoid evicting parts of the dataset you are actively caching)
  3. For MEMORY_AND_DISK levels: evicted blocks spill to disk
  4. For MEMORY_ONLY levels: evicted blocks are dropped entirely (recomputed from lineage later)
  5. Continues until enough space is freed or no more eligible blocks remain

This eviction behavior means caching is not all-or-nothing. A heavily loaded cluster may cache 80% of your data in memory, spill 15% to disk, and drop 5% — all transparently.

All Storage Levels Explained

Spark offers eight storage levels. Choosing the right one is the difference between a fast cache and one that causes more problems than it solves.

Storage Level Comparison

Storage LevelIn MemoryOn DiskSerializedReplicatedDefault For
MEMORY_ONLYYesNoNo1xrdd.cache()
MEMORY_AND_DISKYesSpilloverNo1xdf.cache()
MEMORY_ONLY_SERYesNoYes1x
MEMORY_AND_DISK_SERYesSpilloverYes1x
DISK_ONLYNoYesYes1x
MEMORY_ONLY_2YesNoNo2x
MEMORY_AND_DISK_2YesSpilloverNo2x
OFF_HEAPYes (off-heap)NoYes1x

When to Use Each

MEMORY_AND_DISK — the default for DataFrames and the right choice 90% of the time. Partitions that fit in memory are stored as deserialized Java objects (fastest read access). Partitions that do not fit spill to local disk rather than being dropped. You never lose cached data unless the executor itself dies.

MEMORY_ONLY — the default for RDDs. Fastest when everything fits in memory, but partitions that do not fit are simply dropped and recomputed from lineage on next access. Use this when recomputation is cheap and memory is abundant.

MEMORY_ONLY_SER — stores each partition as a single serialized byte array. Uses 2-3x less memory than deserialized, but every read requires deserialization (CPU cost). Use when GC pressure from millions of deserialized Java objects is causing pauses.

MEMORY_AND_DISK_SER — serialized in memory with disk spillover. The most memory-efficient option that still guarantees data is not lost. Use for large datasets in memory-constrained clusters.

DISK_ONLY — no memory caching at all. All reads require disk I/O. Rarely useful in practice — if you need disk persistence, writing to a Parquet table is usually better.

MEMORY_ONLY_2 / MEMORY_AND_DISK_2 — replicate each partition to two executors. If one executor dies, the replica is immediately available without recomputation. Use for latency-critical interactive applications where waiting for recomputation is unacceptable. Doubles memory usage.

OFF_HEAP — stores serialized data outside the JVM heap. Zero GC overhead since off-heap memory is not managed by the garbage collector. Requires spark.memory.offHeap.enabled=true and spark.memory.offHeap.size to be set. Use when GC pauses are a significant problem (large caches, streaming workloads).

Serialized vs Deserialized — Memory Impact

The memory difference is substantial:

FormatRelative SizeExample (1M mixed-type rows)
Parquet on disk (compressed, columnar)1x baseline100 MB
Deserialized Java objects (MEMORY_ONLY)3-5x disk size300-500 MB
Serialized with Java (MEMORY_ONLY_SER)1.5-3x disk size150-300 MB
Serialized with Kryo (MEMORY_ONLY_SER + Kryo)1-2x disk size100-200 MB
DataFrame columnar cache (compressed)1-2x disk size100-200 MB

If you use _SER levels, configure Kryo serialization for significantly better compression:

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

All Ways to Cache in Spark

DataFrame API

from pyspark import StorageLevel

# cache() — uses default MEMORY_AND_DISK for DataFrames
df.cache()

# persist() — specify a storage level explicitly
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.OFF_HEAP)

# Trigger materialization (cache is lazy — needs an action)
df.count()

# Uncache
df.unpersist() # Lazy removal
df.unpersist(blocking=True) # Blocks until all blocks are removed
import org.apache.spark.storage.StorageLevel

df.cache() // MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.unpersist()

Spark SQL

-- Eager caching — immediately materializes the entire table
CACHE TABLE dim_customer;

-- Lazy caching — materializes on first access
CACHE LAZY TABLE dim_customer;

-- Cache with a specific storage level
CACHE TABLE dim_customer OPTIONS ('storageLevel' 'MEMORY_AND_DISK_SER');

-- Cache the result of a query
CACHE TABLE active_customers AS
SELECT * FROM customers WHERE status = 'ACTIVE';

-- Cache a lazy query
CACHE LAZY TABLE recent_orders AS
SELECT * FROM orders WHERE order_date >= '2026-01-01';

-- Uncache
UNCACHE TABLE dim_customer;

-- Refresh stale cache (invalidate + reload on next access)
REFRESH TABLE dim_customer;

The default storage level for CACHE TABLE is MEMORY_AND_DISK.

Catalog API

# Cache a table (lazy by default)
spark.catalog.cacheTable("my_catalog.my_db.dim_customer")

# Check if a table is cached
spark.catalog.isCached("my_catalog.my_db.dim_customer") # True/False

# Uncache a specific table
spark.catalog.uncacheTable("my_catalog.my_db.dim_customer")

# Clear ALL cached data in the current session
spark.catalog.clearCache()

Eager vs Lazy — Which to Use

MethodEager/LazyBehavior
CACHE TABLEEagerImmediately runs a full scan to materialize all partitions
CACHE LAZY TABLELazyMarks for caching; materializes when first accessed
df.cache() / df.persist()LazyMarks for caching; materializes on first action
spark.catalog.cacheTable()LazySame as CACHE LAZY TABLE

Use eager caching (CACHE TABLE) for cache warming — when you want data in memory before user queries arrive. Use lazy caching for everything else to avoid caching data that might never be accessed.

In-Memory Columnar Storage — How DataFrames Cache Differently

When you cache an RDD, each partition is stored as a collection of row-based Java objects. When you cache a DataFrame, Spark uses a fundamentally different and more efficient approach: in-memory columnar storage.

How It Works

  1. df.cache() wraps the logical plan in an InMemoryRelation operator
  2. When materialized, data is converted into CachedBatch objects — each batch contains column vectors rather than rows
  3. Each column is stored contiguously in memory, enabling column-specific compression

Automatic Per-Column Compression

When spark.sql.inMemoryColumnarStorage.compressed=true (default), Spark selects a compression codec per column based on data characteristics:

CodecHow It WorksBest For
RunLengthEncodingStores repeated values as (value, count) pairsSorted columns with repeats
DictionaryEncodingBuilds dictionary of unique values, stores indicesLow-cardinality strings (status, category)
BooleanBitSetPacks booleans as individual bits (8 per byte)Boolean columns
IntDeltaStores deltas between consecutive integersSorted/monotonic integer columns
LongDeltaSame as IntDelta for long valuesTimestamps, auto-increment IDs

This compression is why a cached DataFrame is often smaller than you'd expect — and why it can fall below the broadcast join threshold.

Column Pruning on Cached Data

When you query a cached DataFrame but only reference a subset of columns, Spark reads only those columns from the cached batches:

# Cache a wide table with 200 columns
wide_table = spark.table("events").cache()
wide_table.count()

# This query reads only 3 columns from cache, not all 200
result = wide_table.select("event_id", "event_type", "timestamp")

This is a major advantage over RDD caching, where the entire row must be deserialized even if only one field is needed.

Predicate Pushdown on Cached Data

Spark maintains min/max statistics per batch per column for cached data. When a filter is applied, InMemoryTableScanExec skips entire batches whose range does not overlap with the predicate:

# Cached table has 1000 batches of 10,000 rows each
# Filter: WHERE temperature > 100
# Batch #47 has min=20, max=95 for temperature
# → Batch #47 is skipped entirely without reading any rows

This is controlled by spark.sql.inMemoryColumnarStorage.partitionPruning=true (default).

DataFrame Cache vs RDD Cache

AspectDataFrame CacheRDD Cache
Storage formatColumnar (CachedBatch)Row-based (Java objects or serialized bytes)
Default levelMEMORY_AND_DISKMEMORY_ONLY
Column pruningYes — reads only needed columnsNo — reads entire objects
Predicate pushdownYes — batch-level min/max pruningNo
CompressionAutomatic per-column codecsNone (deserialized) or generic serialization
Memory efficiencyHigherLower

The takeaway: always use DataFrame caching over RDD caching when possible.

How Caching Converts Sort-Merge to Broadcast Join

This is one of the most underappreciated benefits of caching. It directly connects to our broadcast join guide.

The Problem with Size Estimates

When a table is uncached, Spark estimates its size from file metadata — Parquet footer stats, catalog statistics, or heuristic defaults. These estimates are often wildly inaccurate:

  • A 200 MB Parquet file might decompress to 800 MB in memory
  • A heavily filtered subquery might produce 5 MB from a 500 MB table, but Spark estimates 500 MB
  • Stale ANALYZE TABLE statistics might report sizes from months ago

When the estimate is wrong, Spark makes wrong join decisions. It plans a Sort-Merge Join (with shuffle) when a Broadcast Hash Join (without shuffle) would have been 10x faster.

How Caching Fixes This

When a table is cached, Spark knows its exact in-memory size because the data is materialized in BlockManager and the byte count is tracked. The InMemoryRelation reports this precise size to the Catalyst optimizer. If it falls below spark.sql.autoBroadcastJoinThreshold (default 10 MB), Spark automatically switches to Broadcast Hash Join.

Step-by-Step Pattern

# Without caching: Spark estimates dim_product as 200 MB (Parquet file size)
# 200 MB > 10 MB threshold → Sort-Merge Join with shuffle
result = fact_sales.join(dim_product, "product_id")
result.explain() # Shows SortMergeJoin + Exchange (shuffle)

# With caching: materialize and let Spark see the real size
dim_product = spark.table("dim_product").cache()
dim_product.count() # Materialized: actual cached size = 45 MB (columnar compressed)

# Increase threshold to accommodate
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB

# Now: Spark sees exact 45 MB < 50 MB → Broadcast Hash Join automatically
result = fact_sales.join(dim_product, "product_id")
result.explain() # Shows BroadcastHashJoin — no shuffle!

The Same Pattern with Filtered Data

This is even more powerful with filtered subqueries:

# users table is 10 GB on disk — Spark won't broadcast it
# But after filtering, only 25,000 rows remain (2 MB)

vip_users = (
spark.table("users")
.filter((col("tier") == "VIP") & (col("region") == "US"))
.cache()
)
vip_users.count() # Materialized: 2 MB. Spark now knows the exact size.

# 2 MB < 10 MB default threshold → automatic Broadcast Hash Join
result = events.join(vip_users, "user_id")
result.explain() # BroadcastHashJoin — zero shuffle on the events table

SQL Equivalent

-- Cache a filtered result
CACHE TABLE vip_users AS
SELECT user_id, name, tier
FROM users
WHERE tier = 'VIP' AND region = 'US';

-- Join uses broadcast automatically if cached size < threshold
SELECT e.*, v.name, v.tier
FROM events e
JOIN vip_users v ON e.user_id = v.user_id;

How This Complements AQE

AQE (Adaptive Query Execution) can also convert Sort-Merge to Broadcast at runtime. The difference:

  • Caching enables broadcast at plan time — Spark plans the broadcast upfront, avoiding the shuffle entirely
  • AQE enables broadcast at runtime — Spark discovers the small size after a shuffle stage completes, then converts. The shuffle for the small side has already happened

Caching is strictly better when you know the data will be reused. AQE is the safety net for ad-hoc queries where you did not think to cache.

Memory Management Deep Dive

Caching consumes executor memory. Understanding how Spark's memory model works prevents the most common caching failures: silent eviction, execution slowdowns, and OOM crashes.

Unified Memory Model

Since Spark 1.6, a single memory region is shared between storage (cached data, broadcast variables) and execution (shuffles, joins, aggregations, sorts):

Executor JVM Heap: 10 GB
├── Reserved Memory: 300 MB (hardcoded, non-configurable)
├── User Memory: (10240 - 300) × 0.4 = 3976 MB
│ └── User data structures, UDF variables, internal metadata
└── Unified Memory: (10240 - 300) × 0.6 = 5964 MB
├── Storage Memory (initial): 5964 × 0.5 = 2982 MB
│ └── Cached partitions, broadcast variables
└── Execution Memory (initial): 5964 × 0.5 = 2982 MB
└── Shuffle buffers, join hash maps, sort buffers

The Key Asymmetry: Execution Wins

The split between storage and execution is not a hard boundary — it is dynamic. But the rules are asymmetric:

  • Execution can evict cached blocks. If execution needs more memory than its initial allocation, it evicts cached data (LRU order) to make room. Running tasks always have priority.
  • Storage cannot evict execution. If cached data wants to expand into execution's space, it can only use space that execution has voluntarily freed. Storage must wait.

This means heavy shuffle or join operations will silently evict your cached data if memory is tight. You will not get an error — the cache just disappears, and subsequent reads recompute from source.

Initial State:
[===== Storage (50%) =====][===== Execution (50%) =====]

Cache grows into idle execution space:
[============ Storage (80%) ============][= Execution (20%) =]

Heavy shuffle needs memory — evicts cached blocks:
[== Storage (30%) ==][========== Execution (70%) ==========]

Key Configuration

ConfigurationDefaultDescription
spark.memory.fraction0.6Fraction of (heap - 300 MB) for unified memory
spark.memory.storageFraction0.5Initial split for storage (also the floor below which execution cannot evict)
spark.memory.offHeap.enabledfalseEnable off-heap memory for caching
spark.memory.offHeap.size0Off-heap memory size per executor

Sizing Guidance

For a workload that needs to cache 50 GB across 10 executors (5 GB per executor):

# Option A: On-heap caching (simpler, subject to GC)
--executor-memory 12g
spark.memory.fraction=0.6 # 7 GB unified
spark.memory.storageFraction=0.6 # 4.2 GB initial storage (enough for 5 GB with spill)

# Option B: Off-heap caching (no GC pauses)
--executor-memory 6g # Smaller heap = less GC
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=6g # Off-heap for cache
spark.executor.memoryOverhead=1g
# Total container: 6 + 6 + 1 = 13 GB

Adjust spark.memory.storageFraction based on your workload:

  • Cache-heavy (interactive queries, repeated joins): 0.6-0.7
  • Execution-heavy (heavy shuffles/joins with minimal caching): 0.2-0.3
  • Balanced (default): 0.5

We have seen production environments where teams cache terabytes of data — entire fact tables, full data lakes, everything they can fit. This is almost always counterproductive.

Why Caching TBs Is Problematic

GC storms. Deserialized caches (the default MEMORY_AND_DISK) store data as Java objects on the JVM heap. Each cached row creates multiple objects — String wrappers, Integer boxes, array backing stores. Caching 500 GB across a cluster means billions of Java objects. The garbage collector must traverse all of them during every GC cycle, causing stop-the-world pauses of seconds to minutes. Research shows GC accounts for 16-47% of total elapsed time in cache-heavy Spark workloads.

Eviction thrashing. When cached data exceeds available storage memory, LRU eviction kicks in. If you cache 2 TB but only have 500 GB of storage memory, Spark constantly evicts old blocks to make room for new ones. Each eviction either spills to disk (slow) or drops the block entirely (requires recomputation). You end up spending more time evicting and recomputing than you save by caching.

Execution memory starvation. Under the unified memory model, execution can evict cached data. If your shuffles and joins need memory, they will evict cached blocks — silently. You think your data is cached, but half of it has been evicted by a concurrent shuffle. The next query recomputes from source, and you have gained nothing.

Network and disk overhead. Even with MEMORY_AND_DISK, spilled partitions go to local executor disks — SSDs or spinning disks depending on your cluster. Reading from local disk is faster than S3 but still orders of magnitude slower than memory. If most of your cache is on disk, you have not gained much over reading from a well-configured Parquet table.

Cluster Storage MemoryMax Recommended CacheReasoning
50 GB total25-35 GBLeave 30-50% for execution and headroom
200 GB total100-140 GBSame ratio; monitor eviction in Storage tab
1 TB total400-600 GBAt this scale, GC pressure becomes significant
5 TB+ totalDo not cache everythingCache only hot/reused data; read cold data from source

What to Cache vs What to Leave on Disk

Cache these (small, reused frequently):

  • Dimension tables (customers, products, regions) — typically 1-500 MB
  • Filtered subsets used in multiple downstream operations
  • Expensive intermediate results (multi-table joins, complex aggregations) reused 3+ times
  • Lookup/reference tables used in enrichment

Do not cache these (large, used once or rarely):

  • Fact tables (billions of rows, hundreds of GB) — read from Parquet/Iceberg instead
  • Data used in a single downstream operation (caching overhead > benefit)
  • Streaming source data that changes every micro-batch
  • Full table scans where partition pruning on source is more effective

The Rule of Thumb

Cache the small, expensive, reused data. Leave the large, cheap-to-read, used-once data on object storage. A well-tuned Spark application typically caches 5-20% of its total data volume, not 100%.

If you find yourself caching more than 50% of your cluster's storage memory, you are almost certainly over-caching. Check the Spark UI Storage tab (see next section) for eviction patterns and fraction-cached percentages.

Reading the Spark UI Storage Tab

The Storage tab is your primary tool for understanding cache health. After caching data, navigate to http://<driver>:4040/storage (or the Spark History Server for completed applications).

What You See

The Storage tab lists every cached RDD and DataFrame with:

ColumnWhat It Tells You
RDD NameThe name or description of the cached data (e.g., In-memory table dim_customer)
Storage LevelThe storage level in use (e.g., Disk Memory Deserialized 1x Replicated)
Cached PartitionsHow many partitions are currently cached out of the total
Fraction CachedPercentage of partitions successfully cached (100% = fully cached)
Size in MemoryTotal bytes consumed across all executors' memory
Size on DiskTotal bytes spilled to disk (for MEMORY_AND_DISK levels)

What to Look For

Fraction Cached < 100%. This means some partitions were evicted or never materialized. If you see 60% cached, 40% of your reads go back to the source — and you are paying the overhead of caching without getting the full benefit. Either increase executor memory, reduce what you are caching, or accept the partial cache.

Size on Disk > 0 with MEMORY_AND_DISK. Memory was insufficient, and partitions spilled to disk. Disk reads are 10-100x slower than memory reads. If a large fraction of your cache is on disk, consider:

  • Increasing spark.memory.storageFraction
  • Increasing executor memory
  • Reducing what you cache
  • Using MEMORY_AND_DISK_SER for smaller in-memory footprint

Large Size in Memory relative to cluster total. If your cached data consumes 80%+ of total storage memory, execution operations (shuffles, joins) will evict cached blocks. Look for performance degradation in shuffle-heavy stages as evidence.

Multiple cached items with low partition counts. If you see 20 cached DataFrames each with a few partitions, you may be over-caching intermediate results. Consolidate or uncache data that is no longer needed.

Clicking Into a Cached Item

Click on any cached RDD/DataFrame name to see the per-executor breakdown:

  • Which executors hold which partitions
  • Memory and disk usage per executor
  • Distribution of cached data across the cluster

This helps diagnose skewed caching — if one executor holds 80% of the cached data while others hold 20%, your cache is unevenly distributed, likely because the underlying data is skewed.

Programmatic Access to Cache Info

# Check if a specific DataFrame is cached
print(df.is_cached) # True / False
print(df.storageLevel) # StorageLevel(True, True, False, True, 1)

# Check via catalog
print(spark.catalog.isCached("dim_customer")) # True / False

# Detailed storage info via SparkContext (JVM bridge)
for rdd_info in spark.sparkContext._jsc.sc().getRDDStorageInfo():
print(f"Name: {rdd_info.name()}")
print(f"Cached Partitions: {rdd_info.numCachedPartitions()}")
print(f"Memory Used: {rdd_info.memSize() / 1048576:.1f} MB")
print(f"Disk Used: {rdd_info.diskSize() / 1048576:.1f} MB")

Spark History Server

For completed applications, the Spark History Server (http://<history-server>:18080) preserves the Storage tab. This lets you analyze cache behavior post-hoc — critical for debugging production jobs that ran overnight.

Look at the event timeline alongside the Storage tab: if cached data appears and then disappears mid-job, it was evicted by execution memory pressure. Correlate with shuffle stages to identify the culprit.

Real-World Optimization Patterns

Pattern 1: Iterative ML Training — 20x Speedup

Caching delivers the most dramatic improvements in iterative algorithms. Without caching, each iteration re-reads from S3:

# Without caching: each iteration reads 50 GB from S3
training_data = spark.read.parquet("s3://bucket/training_data")

for i in range(100):
model = train_iteration(training_data, model)
# 100 iterations × 50 GB = 5 TB of redundant S3 reads

# With caching: data loaded once, reused 100 times
training_data = spark.read.parquet("s3://bucket/training_data").cache()
training_data.count() # Materialize once

for i in range(100):
model = train_iteration(training_data, model) # Reads from memory
# 100 iterations × memory reads = seconds, not minutes

Benchmarks show 20x speedup for logistic regression and up to 100x for iterative graph algorithms.

Pattern 2: Multi-Query Intermediate Results

Compute an expensive transformation once, use it for multiple reports:

# Expensive: 3-way join + aggregation
enriched_orders = (
orders
.join(customers, "customer_id")
.join(products, "product_id")
.withColumn("total", col("quantity") * col("price"))
.cache()
)
enriched_orders.count() # Materialize

# Three reports from the same cached result — no recomputation
daily_summary = enriched_orders.groupBy("order_date").agg(sum("total"))
top_products = enriched_orders.groupBy("product_id").agg(sum("total")).orderBy(desc("total"))
customer_segments = enriched_orders.groupBy("customer_tier").agg(avg("total"), count("*"))

Without caching, each report re-executes the 3-way join from scratch.

Pattern 3: Dimension Table Caching for Star Schema

Cache dimension tables once at the start of the session:

# Cache all dimension tables at application startup
dim_customer = spark.table("dim_customer").cache()
dim_product = spark.table("dim_product").cache()
dim_store = spark.table("dim_store").cache()
dim_date = spark.table("dim_date").cache()

# Force materialization
for dim in [dim_customer, dim_product, dim_store, dim_date]:
dim.count()

# All subsequent joins benefit from cached, possibly broadcast, dimension tables
q1_report = fact_sales.filter(col("quarter") == 1).join(dim_customer, "customer_id")
q2_report = fact_sales.filter(col("quarter") == 2).join(dim_customer, "customer_id")
product_mix = fact_sales.join(dim_product, "product_id").join(dim_store, "store_id")
-- SQL equivalent: eager cache at session start
CACHE TABLE dim_customer;
CACHE TABLE dim_product;
CACHE TABLE dim_store;
CACHE TABLE dim_date;

Pattern 4: Cache Filtered Hot Data

For interactive analysis, cache only the subset you are exploring:

# Cache only last 7 days — manageable size
hot_data = (
spark.table("events")
.filter(col("event_date") >= date_sub(current_date(), 7))
.cache()
)
hot_data.count()

# Fast interactive exploration from cache
hot_data.describe("event_value").show()
hot_data.groupBy("event_type").count().show()
hot_data.filter(col("event_value") > 1000).show(20)

# Historical queries still go to S3 — no wasted memory
historical = spark.table("events").filter(col("event_date") < "2026-01-01")

Pattern 5: Cache + Repartition for Shuffle-Free Downstream Joins

Repartitioning before caching aligns partitions with downstream join keys:

# Repartition by join key, then cache
customer_orders = (
orders
.repartition(200, "customer_id")
.cache()
)
customer_orders.count()

# Downstream joins on customer_id are partition-aligned — reduced shuffle
result = customer_orders.join(dim_customer, "customer_id")

Warning: Repartitioning adds an initial shuffle cost. Only do this when the cached data will be used for multiple downstream operations on the same key.

Pattern 6: Coalesce After Filter Before Caching

When a filter dramatically reduces data volume, consolidate near-empty partitions:

# Original: 1000 partitions, but filter keeps only 5% of data
# Without coalesce: 950 near-empty partitions wasting memory on overhead
filtered = big_table.filter(col("rare_event") == True)

# Better: consolidate into fewer partitions before caching
filtered = (
big_table
.filter(col("rare_event") == True)
.coalesce(50)
.cache()
)
filtered.count()

Downsides and Pitfalls

Memory Pressure

Cached data competes directly with execution memory. If you cache too much:

  • Shuffles and joins will evict cached blocks to make room
  • You pay the cost of caching (compute + store) AND the cost of recomputation when blocks are evicted
  • Shuffle operations may spill to disk because cache is consuming the memory they need

Over-Caching Anti-Pattern

# ANTI-PATTERN: caching every intermediate result
df1 = spark.read.parquet("...").cache()
df2 = df1.filter(...).cache()
df3 = df2.groupBy(...).agg(...).cache()
df4 = df3.join(other).cache()
# All four DataFrames compete for memory — most will be evicted

Cache only DataFrames that are reused multiple times. If a DataFrame is used once, caching adds overhead with zero benefit.

Stale Cache

Cached data is a snapshot at the time of materialization. It does not reflect changes to the underlying source:

df = spark.table("my_table").cache()
df.count() # Returns 1,000,000 rows

# External process inserts 500K rows into my_table
df.count() # Still returns 1,000,000 — reading from stale cache!

# Must explicitly invalidate
df.unpersist()
df = spark.table("my_table").cache()
df.count() # Now returns 1,500,000

For Spark SQL tables: REFRESH TABLE my_table invalidates the cache and reloads metadata.

GC Pressure

Large deserialized caches create millions of Java objects on the heap, causing:

  • Extended GC pauses (seconds to minutes on large heaps)
  • Stop-the-world events that can trigger executor heartbeat timeouts
  • Task failures when GC time exceeds thresholds

Mitigation: Use MEMORY_ONLY_SER with Kryo serialization or OFF_HEAP to reduce object count.

Cache Lost on Restart

  • Cached data is lost when the Spark application terminates
  • Cached data is lost when individual executors die (unless using _2 replicated levels)
  • Dynamic allocation can kill idle executors, destroying their cached data. Configure spark.dynamicAllocation.cachedExecutorIdleTimeout (default: infinity) to protect executors with cached partitions

Partial Caching

Actions that touch only some partitions result in partial caching:

df = spark.read.parquet("...").cache()  # 1000 partitions
df.take(10) # Only materializes 1-2 partitions
# Storage tab shows: Cached Partitions: 2 / 1000

Always run .count() or another full-scan action to materialize the complete cache.

Predicate Pushdown to Source Is Lost

Once a DataFrame is cached, the Catalyst optimizer operates on the InMemoryRelation instead of the original source. Predicates are applied to the cached data — they are not pushed down to the storage layer (Parquet, Iceberg, S3):

# Uncached: Spark pushes filter to Parquet reader — reads only matching row groups
df = spark.read.parquet("...").filter(col("date") == "2026-01-01")

# Cached: filter applied after reading from cache — all cached data is scanned
# (batch-level min/max pruning still works, but not file-level Parquet pushdown)
df = spark.read.parquet("...").cache()
df.count()
result = df.filter(col("date") == "2026-01-01")

For large tables where partition pruning eliminates 99% of files, caching the full table and then filtering is worse than reading directly from source with pushdown.

Broadcast Hints Can Be Lost After Caching

from pyspark.sql.functions import broadcast

# The broadcast hint may be lost after caching
dim = broadcast(spark.table("dim_table")).cache()
dim.count()
# The cached InMemoryRelation does not carry the broadcast hint
# Spark re-evaluates based on cached size vs threshold

If you need guaranteed broadcast, either rely on the threshold-based automatic decision or apply the hint at join time, not at cache time.

Cache vs Checkpoint vs Temporary View

These three concepts are frequently confused. They serve different purposes.

Comparison

Featurecache() / persist()checkpoint()createOrReplaceTempView()
Stores dataMemory / local diskHDFS / reliable storageDoes NOT store data
Preserves lineageYesNo (truncates lineage)Yes (IS the lineage)
Survives executor failureRecompute from lineageYes (data on HDFS)Recomputes
Survives driver restartNoYesNo
PerformanceFastest (memory)Slower (write to HDFS + read back)No caching — recomputes every time
Primary use caseRepeated access in same appBreak long lineage chainsSQL alias for a query

createOrReplaceTempView() Is NOT Caching

This is the most common misconception:

df.filter(col("status") == "active").createOrReplaceTempView("active_users")

# This does NOT cache or materialize anything!
# Every query re-executes the full plan from source
spark.sql("SELECT count(*) FROM active_users") # Full computation
spark.sql("SELECT * FROM active_users WHERE age > 30") # Full computation again

To actually cache a temp view:

CREATE OR REPLACE TEMP VIEW active_users AS
SELECT * FROM users WHERE status = 'active';

CACHE TABLE active_users; -- NOW it is cached in memory

When to Use Checkpoint

checkpoint() writes data to HDFS and truncates the lineage — the checkpointed DataFrame has no dependency on its parents:

# Set checkpoint directory
spark.sparkContext.setCheckpointDir("hdfs:///tmp/spark-checkpoints")

# Always cache before checkpoint to avoid double computation
df = expensive_computation().cache()
df.count() # Materialize in cache
checkpointed = df.checkpoint() # Writes from cache to HDFS
df.unpersist() # Release cache; checkpoint persists on HDFS

Use checkpoint for:

  • Long lineage chains (100+ transformations) that cause slow planning or stack overflows
  • Iterative algorithms — checkpoint every N iterations to keep lineage manageable
  • Fault tolerance — checkpointed data survives executor and driver failures

localCheckpoint() is faster (writes to local disk instead of HDFS) but not fault-tolerant.

Decision Guide

ScenarioUse
DataFrame reused 2-3 times in same jobcache()
Very long lineage chain causing slow planningcheckpoint() or localCheckpoint()
Iterative algorithm (100+ iterations)cache() + checkpoint() every N iterations
Results needed by another Spark applicationWrite to Parquet/Iceberg table
SQL-friendly naming without materializationcreateOrReplaceTempView()
SQL-friendly naming WITH materializationcreateOrReplaceTempView() + CACHE TABLE

Caching with Iceberg Tables

Iceberg tables interact with Spark caching in specific ways that require attention.

Iceberg Catalog Caching vs DataFrame Caching

Iceberg has its own caching layer — CachingCatalog — that is separate from Spark's DataFrame caching. CachingCatalog caches table metadata (metadata.json, manifest lists, manifest files) to avoid re-reading them from object storage on every query. This is enabled by default:

spark.sql.catalog.my_catalog.cache-enabled=true
spark.sql.catalog.my_catalog.cache.expiration-interval-ms=30000 # 30 seconds

This metadata caching is transparent and generally beneficial. DataFrame caching (df.cache()) caches the actual data — the rows — and is what this blog post has been discussing.

Stale Cache with Iceberg Snapshots

When an Iceberg table receives new writes (creating new snapshots), a cached DataFrame still holds data from the old snapshot:

df = spark.table("catalog.db.my_table").cache()
df.count() # Returns 10M rows from snapshot 123

# Another process runs: INSERT INTO catalog.db.my_table VALUES (...)
# New snapshot 124 is created

df.count() # Still returns 10M — reading from stale cache (snapshot 123)

# Must invalidate and re-cache
df.unpersist()
df = spark.table("catalog.db.my_table").cache()
df.count() # Now returns 10M + new rows from snapshot 124

In SQL:

-- Refresh to pick up new snapshots
REFRESH TABLE catalog.db.my_table;
-- Re-cache
CACHE TABLE catalog.db.my_table;

Caching Iceberg Metadata Tables

Iceberg's virtual metadata tables (files, manifests, snapshots, partitions) can be cached for repeated analysis:

# Cache metadata tables for a health-check session
files = spark.table("catalog.db.my_table.files").cache()
manifests = spark.table("catalog.db.my_table.manifests").cache()

# Run multiple diagnostic queries from cache
files.filter(col("file_size_in_bytes") < 67108864).count() # Small file count
files.groupBy("partition").agg(avg("file_size_in_bytes")).show()

These become stale if the table receives new writes during your session. For one-time analysis this is fine; for long-running monitoring, do not cache.

Disable Caching During Maintenance

Some Iceberg maintenance procedures have built-in caching controls:

-- rewrite_manifests with caching disabled to avoid executor memory issues
CALL catalog.system.rewrite_manifests(
table => 'db.my_table',
use_caching => false
);

The use_caching parameter (default true) controls whether Spark caches intermediate results during the procedure. For large tables, set false to avoid memory pressure.

Time Travel + Caching

Caching a time-travel query pins data to a specific snapshot:

# Cache two snapshots for comparison
current = spark.table("catalog.db.my_table").cache()
historical = (
spark.read
.option("snapshot-id", 123456789)
.table("catalog.db.my_table")
.cache()
)
current.count()
historical.count()

# Compare
new_records = current.subtract(historical)
deleted_records = historical.subtract(current)

# Clean up — each snapshot consumes independent memory
current.unpersist()
historical.unpersist()

Complete Configuration Reference

Caching and Memory Configurations

ConfigurationDefaultDescription
spark.memory.fraction0.6Fraction of (heap - 300 MB) for unified memory (storage + execution)
spark.memory.storageFraction0.5Initial/minimum fraction of unified memory for storage (cache floor)
spark.memory.offHeap.enabledfalseEnable off-heap memory for caching
spark.memory.offHeap.size0Off-heap memory size per executor

In-Memory Columnar Storage

ConfigurationDefaultDescription
spark.sql.inMemoryColumnarStorage.compressedtrueEnable automatic per-column compression for cached DataFrames
spark.sql.inMemoryColumnarStorage.batchSize10000Rows per CachedBatch. Larger = better compression, higher peak memory
spark.sql.inMemoryColumnarStorage.partitionPruningtrueEnable batch-level predicate pruning on cached data

Serialization

ConfigurationDefaultDescription
spark.serializerJavaSerializerSerializer for _SER levels. Set to KryoSerializer for better performance
spark.kryoserializer.buffer.max64mMaximum Kryo buffer size

Dynamic Allocation

ConfigurationDefaultDescription
spark.dynamicAllocation.cachedExecutorIdleTimeoutinfinityIdle timeout for executors holding cached data. Set to prevent premature executor removal
ConfigurationDefaultDescription
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)Max cached/estimated table size for auto-broadcast

Iceberg Catalog Caching

ConfigurationDefaultDescription
spark.sql.catalog.<name>.cache-enabledtrueEnable/disable Iceberg catalog metadata caching
spark.sql.catalog.<name>.cache.expiration-interval-ms30000Cache TTL in ms; -1 = no expiry, 0 = no caching

How Cazpian Handles This

On Cazpian, Spark compute pools are pre-configured with tuned memory fractions and storage allocations based on the cluster size and workload profile. When you cache dimension tables through Cazpian's query interface, the platform monitors the Storage tab metrics automatically — tracking fraction cached, memory vs disk usage, and eviction rates. If eviction pressure is detected, Cazpian surfaces recommendations to adjust memory allocation or reduce cache scope. For Iceberg tables, Cazpian's catalog integration handles cache invalidation when new snapshots are committed, so cached data stays fresh without manual REFRESH TABLE calls.

What's Next

This post covered everything about Spark caching — from internal mechanics to production tuning. For related optimizations in the Spark and Iceberg ecosystem, see our other posts: