Spark Caching and Persistence: The Complete Guide for Iceberg and Cazpian
You are running the same 500 GB join three times in a single pipeline — once for a daily summary, once for a top-products report, once for customer segmentation. Each query reads from S3, shuffles terabytes across the network, builds hash maps, and aggregates from scratch. That is 1.5 TB of redundant I/O, three redundant shuffles, and three redundant sort-merge joins.
Spark caching eliminates this waste. You compute the expensive join once, store the result in executor memory, and every subsequent query reads from that in-memory copy instead of going back to object storage. The improvement is not incremental — it is typically 10-100x faster for repeated access patterns.
But caching does something else that is less obvious and equally powerful: it makes Spark's query optimizer smarter. When a table is cached, Spark knows its exact in-memory size. If that size falls below the broadcast join threshold, the optimizer automatically converts a Sort-Merge Join into a Broadcast Hash Join — eliminating the shuffle entirely, without you writing a single hint.
This post covers every dimension of Spark caching. We start with internals, walk through every storage level, show all the ways to cache, explain the columnar storage format that makes DataFrame caching special, dive into memory management, discuss how much data you should actually cache (spoiler: not terabytes), show you how to read the Spark UI Storage tab, and cover the pitfalls that catch production workloads.
How Spark Caching Works Internally
Understanding the internals explains why caching sometimes fails silently, why partial caching happens, and why memory estimates are often wrong.
Lazy Evaluation — Nothing Happens on .cache()
When you call .cache() or .persist(), nothing happens immediately. These are lazy operations that mark the DataFrame's logical plan as "cacheable." The data is not read, computed, or stored until an action triggers execution — .count(), .collect(), .show(), .write(), or any operation that produces a result.
df = spark.read.parquet("s3://bucket/500gb-table")
df.cache() # Nothing happens. No data is read. No memory is used.
df.count() # NOW: reads from S3, computes, and stores in memory.
df.show(10) # Reads from cache — instant.
This means if your action only touches a subset of partitions (e.g., .take(10) or .limit(100)), only those partitions are materialized and cached. Unless you run an action that touches every partition (like .count() or a full .write()), you are working with a partial cache.
BlockManager — Where Cached Data Lives
BlockManager is Spark's local key-value store running on every node (driver and executors). It manages blocks representing cached partitions, shuffle outputs, and broadcast variables.
Each RDD/DataFrame partition maps to exactly one block with an ID like rdd_42_7 (RDD 42, partition 7). When a partition is computed during an action:
BlockManager.putIterator()is called with the computed iterator and theStorageLevel- For deserialized storage (
MEMORY_ONLY,MEMORY_AND_DISK): values are "unrolled" one by one intoMemoryStore, with periodic checks for remaining space - For serialized storage (
MEMORY_ONLY_SER,MEMORY_AND_DISK_SER): values are serialized into compact byte arrays first - If memory is full and the storage level allows disk, the block spills to
DiskStore - If memory is full and the storage level is
MEMORY_ONLY, the partition is simply not cached — it will be recomputed from lineage on next access
LRU Eviction
MemoryStore uses a LinkedHashMap in access-order mode for LRU (Least Recently Used) eviction. When a new block needs space:
- Iterates from least-recently accessed to most-recently accessed
- Skips blocks from the same RDD (to avoid evicting parts of the dataset you are actively caching)
- For
MEMORY_AND_DISKlevels: evicted blocks spill to disk - For
MEMORY_ONLYlevels: evicted blocks are dropped entirely (recomputed from lineage later) - Continues until enough space is freed or no more eligible blocks remain
This eviction behavior means caching is not all-or-nothing. A heavily loaded cluster may cache 80% of your data in memory, spill 15% to disk, and drop 5% — all transparently.
All Storage Levels Explained
Spark offers eight storage levels. Choosing the right one is the difference between a fast cache and one that causes more problems than it solves.
Storage Level Comparison
| Storage Level | In Memory | On Disk | Serialized | Replicated | Default For |
|---|---|---|---|---|---|
MEMORY_ONLY | Yes | No | No | 1x | rdd.cache() |
MEMORY_AND_DISK | Yes | Spillover | No | 1x | df.cache() |
MEMORY_ONLY_SER | Yes | No | Yes | 1x | — |
MEMORY_AND_DISK_SER | Yes | Spillover | Yes | 1x | — |
DISK_ONLY | No | Yes | Yes | 1x | — |
MEMORY_ONLY_2 | Yes | No | No | 2x | — |
MEMORY_AND_DISK_2 | Yes | Spillover | No | 2x | — |
OFF_HEAP | Yes (off-heap) | No | Yes | 1x | — |
When to Use Each
MEMORY_AND_DISK — the default for DataFrames and the right choice 90% of the time. Partitions that fit in memory are stored as deserialized Java objects (fastest read access). Partitions that do not fit spill to local disk rather than being dropped. You never lose cached data unless the executor itself dies.
MEMORY_ONLY — the default for RDDs. Fastest when everything fits in memory, but partitions that do not fit are simply dropped and recomputed from lineage on next access. Use this when recomputation is cheap and memory is abundant.
MEMORY_ONLY_SER — stores each partition as a single serialized byte array. Uses 2-3x less memory than deserialized, but every read requires deserialization (CPU cost). Use when GC pressure from millions of deserialized Java objects is causing pauses.
MEMORY_AND_DISK_SER — serialized in memory with disk spillover. The most memory-efficient option that still guarantees data is not lost. Use for large datasets in memory-constrained clusters.
DISK_ONLY — no memory caching at all. All reads require disk I/O. Rarely useful in practice — if you need disk persistence, writing to a Parquet table is usually better.
MEMORY_ONLY_2 / MEMORY_AND_DISK_2 — replicate each partition to two executors. If one executor dies, the replica is immediately available without recomputation. Use for latency-critical interactive applications where waiting for recomputation is unacceptable. Doubles memory usage.
OFF_HEAP — stores serialized data outside the JVM heap. Zero GC overhead since off-heap memory is not managed by the garbage collector. Requires spark.memory.offHeap.enabled=true and spark.memory.offHeap.size to be set. Use when GC pauses are a significant problem (large caches, streaming workloads).
Serialized vs Deserialized — Memory Impact
The memory difference is substantial:
| Format | Relative Size | Example (1M mixed-type rows) |
|---|---|---|
| Parquet on disk (compressed, columnar) | 1x baseline | 100 MB |
Deserialized Java objects (MEMORY_ONLY) | 3-5x disk size | 300-500 MB |
Serialized with Java (MEMORY_ONLY_SER) | 1.5-3x disk size | 150-300 MB |
Serialized with Kryo (MEMORY_ONLY_SER + Kryo) | 1-2x disk size | 100-200 MB |
| DataFrame columnar cache (compressed) | 1-2x disk size | 100-200 MB |
If you use _SER levels, configure Kryo serialization for significantly better compression:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
All Ways to Cache in Spark
DataFrame API
from pyspark import StorageLevel
# cache() — uses default MEMORY_AND_DISK for DataFrames
df.cache()
# persist() — specify a storage level explicitly
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.OFF_HEAP)
# Trigger materialization (cache is lazy — needs an action)
df.count()
# Uncache
df.unpersist() # Lazy removal
df.unpersist(blocking=True) # Blocks until all blocks are removed
import org.apache.spark.storage.StorageLevel
df.cache() // MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.unpersist()
Spark SQL
-- Eager caching — immediately materializes the entire table
CACHE TABLE dim_customer;
-- Lazy caching — materializes on first access
CACHE LAZY TABLE dim_customer;
-- Cache with a specific storage level
CACHE TABLE dim_customer OPTIONS ('storageLevel' 'MEMORY_AND_DISK_SER');
-- Cache the result of a query
CACHE TABLE active_customers AS
SELECT * FROM customers WHERE status = 'ACTIVE';
-- Cache a lazy query
CACHE LAZY TABLE recent_orders AS
SELECT * FROM orders WHERE order_date >= '2026-01-01';
-- Uncache
UNCACHE TABLE dim_customer;
-- Refresh stale cache (invalidate + reload on next access)
REFRESH TABLE dim_customer;
The default storage level for CACHE TABLE is MEMORY_AND_DISK.
Catalog API
# Cache a table (lazy by default)
spark.catalog.cacheTable("my_catalog.my_db.dim_customer")
# Check if a table is cached
spark.catalog.isCached("my_catalog.my_db.dim_customer") # True/False
# Uncache a specific table
spark.catalog.uncacheTable("my_catalog.my_db.dim_customer")
# Clear ALL cached data in the current session
spark.catalog.clearCache()
Eager vs Lazy — Which to Use
| Method | Eager/Lazy | Behavior |
|---|---|---|
CACHE TABLE | Eager | Immediately runs a full scan to materialize all partitions |
CACHE LAZY TABLE | Lazy | Marks for caching; materializes when first accessed |
df.cache() / df.persist() | Lazy | Marks for caching; materializes on first action |
spark.catalog.cacheTable() | Lazy | Same as CACHE LAZY TABLE |
Use eager caching (CACHE TABLE) for cache warming — when you want data in memory before user queries arrive. Use lazy caching for everything else to avoid caching data that might never be accessed.
In-Memory Columnar Storage — How DataFrames Cache Differently
When you cache an RDD, each partition is stored as a collection of row-based Java objects. When you cache a DataFrame, Spark uses a fundamentally different and more efficient approach: in-memory columnar storage.
How It Works
df.cache()wraps the logical plan in anInMemoryRelationoperator- When materialized, data is converted into
CachedBatchobjects — each batch contains column vectors rather than rows - Each column is stored contiguously in memory, enabling column-specific compression
Automatic Per-Column Compression
When spark.sql.inMemoryColumnarStorage.compressed=true (default), Spark selects a compression codec per column based on data characteristics:
| Codec | How It Works | Best For |
|---|---|---|
| RunLengthEncoding | Stores repeated values as (value, count) pairs | Sorted columns with repeats |
| DictionaryEncoding | Builds dictionary of unique values, stores indices | Low-cardinality strings (status, category) |
| BooleanBitSet | Packs booleans as individual bits (8 per byte) | Boolean columns |
| IntDelta | Stores deltas between consecutive integers | Sorted/monotonic integer columns |
| LongDelta | Same as IntDelta for long values | Timestamps, auto-increment IDs |
This compression is why a cached DataFrame is often smaller than you'd expect — and why it can fall below the broadcast join threshold.
Column Pruning on Cached Data
When you query a cached DataFrame but only reference a subset of columns, Spark reads only those columns from the cached batches:
# Cache a wide table with 200 columns
wide_table = spark.table("events").cache()
wide_table.count()
# This query reads only 3 columns from cache, not all 200
result = wide_table.select("event_id", "event_type", "timestamp")
This is a major advantage over RDD caching, where the entire row must be deserialized even if only one field is needed.
Predicate Pushdown on Cached Data
Spark maintains min/max statistics per batch per column for cached data. When a filter is applied, InMemoryTableScanExec skips entire batches whose range does not overlap with the predicate:
# Cached table has 1000 batches of 10,000 rows each
# Filter: WHERE temperature > 100
# Batch #47 has min=20, max=95 for temperature
# → Batch #47 is skipped entirely without reading any rows
This is controlled by spark.sql.inMemoryColumnarStorage.partitionPruning=true (default).
DataFrame Cache vs RDD Cache
| Aspect | DataFrame Cache | RDD Cache |
|---|---|---|
| Storage format | Columnar (CachedBatch) | Row-based (Java objects or serialized bytes) |
| Default level | MEMORY_AND_DISK | MEMORY_ONLY |
| Column pruning | Yes — reads only needed columns | No — reads entire objects |
| Predicate pushdown | Yes — batch-level min/max pruning | No |
| Compression | Automatic per-column codecs | None (deserialized) or generic serialization |
| Memory efficiency | Higher | Lower |
The takeaway: always use DataFrame caching over RDD caching when possible.
How Caching Converts Sort-Merge to Broadcast Join
This is one of the most underappreciated benefits of caching. It directly connects to our broadcast join guide.
The Problem with Size Estimates
When a table is uncached, Spark estimates its size from file metadata — Parquet footer stats, catalog statistics, or heuristic defaults. These estimates are often wildly inaccurate:
- A 200 MB Parquet file might decompress to 800 MB in memory
- A heavily filtered subquery might produce 5 MB from a 500 MB table, but Spark estimates 500 MB
- Stale
ANALYZE TABLEstatistics might report sizes from months ago
When the estimate is wrong, Spark makes wrong join decisions. It plans a Sort-Merge Join (with shuffle) when a Broadcast Hash Join (without shuffle) would have been 10x faster.
How Caching Fixes This
When a table is cached, Spark knows its exact in-memory size because the data is materialized in BlockManager and the byte count is tracked. The InMemoryRelation reports this precise size to the Catalyst optimizer. If it falls below spark.sql.autoBroadcastJoinThreshold (default 10 MB), Spark automatically switches to Broadcast Hash Join.
Step-by-Step Pattern
# Without caching: Spark estimates dim_product as 200 MB (Parquet file size)
# 200 MB > 10 MB threshold → Sort-Merge Join with shuffle
result = fact_sales.join(dim_product, "product_id")
result.explain() # Shows SortMergeJoin + Exchange (shuffle)
# With caching: materialize and let Spark see the real size
dim_product = spark.table("dim_product").cache()
dim_product.count() # Materialized: actual cached size = 45 MB (columnar compressed)
# Increase threshold to accommodate
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB
# Now: Spark sees exact 45 MB < 50 MB → Broadcast Hash Join automatically
result = fact_sales.join(dim_product, "product_id")
result.explain() # Shows BroadcastHashJoin — no shuffle!
The Same Pattern with Filtered Data
This is even more powerful with filtered subqueries:
# users table is 10 GB on disk — Spark won't broadcast it
# But after filtering, only 25,000 rows remain (2 MB)
vip_users = (
spark.table("users")
.filter((col("tier") == "VIP") & (col("region") == "US"))
.cache()
)
vip_users.count() # Materialized: 2 MB. Spark now knows the exact size.
# 2 MB < 10 MB default threshold → automatic Broadcast Hash Join
result = events.join(vip_users, "user_id")
result.explain() # BroadcastHashJoin — zero shuffle on the events table
SQL Equivalent
-- Cache a filtered result
CACHE TABLE vip_users AS
SELECT user_id, name, tier
FROM users
WHERE tier = 'VIP' AND region = 'US';
-- Join uses broadcast automatically if cached size < threshold
SELECT e.*, v.name, v.tier
FROM events e
JOIN vip_users v ON e.user_id = v.user_id;
How This Complements AQE
AQE (Adaptive Query Execution) can also convert Sort-Merge to Broadcast at runtime. The difference:
- Caching enables broadcast at plan time — Spark plans the broadcast upfront, avoiding the shuffle entirely
- AQE enables broadcast at runtime — Spark discovers the small size after a shuffle stage completes, then converts. The shuffle for the small side has already happened
Caching is strictly better when you know the data will be reused. AQE is the safety net for ad-hoc queries where you did not think to cache.
Memory Management Deep Dive
Caching consumes executor memory. Understanding how Spark's memory model works prevents the most common caching failures: silent eviction, execution slowdowns, and OOM crashes.
Unified Memory Model
Since Spark 1.6, a single memory region is shared between storage (cached data, broadcast variables) and execution (shuffles, joins, aggregations, sorts):
Executor JVM Heap: 10 GB
├── Reserved Memory: 300 MB (hardcoded, non-configurable)
├── User Memory: (10240 - 300) × 0.4 = 3976 MB
│ └── User data structures, UDF variables, internal metadata
└── Unified Memory: (10240 - 300) × 0.6 = 5964 MB
├── Storage Memory (initial): 5964 × 0.5 = 2982 MB
│ └── Cached partitions, broadcast variables
└── Execution Memory (initial): 5964 × 0.5 = 2982 MB
└── Shuffle buffers, join hash maps, sort buffers
The Key Asymmetry: Execution Wins
The split between storage and execution is not a hard boundary — it is dynamic. But the rules are asymmetric:
- Execution can evict cached blocks. If execution needs more memory than its initial allocation, it evicts cached data (LRU order) to make room. Running tasks always have priority.
- Storage cannot evict execution. If cached data wants to expand into execution's space, it can only use space that execution has voluntarily freed. Storage must wait.
This means heavy shuffle or join operations will silently evict your cached data if memory is tight. You will not get an error — the cache just disappears, and subsequent reads recompute from source.
Initial State:
[===== Storage (50%) =====][===== Execution (50%) =====]
Cache grows into idle execution space:
[============ Storage (80%) ============][= Execution (20%) =]
Heavy shuffle needs memory — evicts cached blocks:
[== Storage (30%) ==][========== Execution (70%) ==========]
Key Configuration
| Configuration | Default | Description |
|---|---|---|
spark.memory.fraction | 0.6 | Fraction of (heap - 300 MB) for unified memory |
spark.memory.storageFraction | 0.5 | Initial split for storage (also the floor below which execution cannot evict) |
spark.memory.offHeap.enabled | false | Enable off-heap memory for caching |
spark.memory.offHeap.size | 0 | Off-heap memory size per executor |
Sizing Guidance
For a workload that needs to cache 50 GB across 10 executors (5 GB per executor):
# Option A: On-heap caching (simpler, subject to GC)
--executor-memory 12g
spark.memory.fraction=0.6 # 7 GB unified
spark.memory.storageFraction=0.6 # 4.2 GB initial storage (enough for 5 GB with spill)
# Option B: Off-heap caching (no GC pauses)
--executor-memory 6g # Smaller heap = less GC
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=6g # Off-heap for cache
spark.executor.memoryOverhead=1g
# Total container: 6 + 6 + 1 = 13 GB
Adjust spark.memory.storageFraction based on your workload:
- Cache-heavy (interactive queries, repeated joins): 0.6-0.7
- Execution-heavy (heavy shuffles/joins with minimal caching): 0.2-0.3
- Balanced (default): 0.5
How Much Data Should You Cache? The Recommended Limits
We have seen production environments where teams cache terabytes of data — entire fact tables, full data lakes, everything they can fit. This is almost always counterproductive.
Why Caching TBs Is Problematic
GC storms. Deserialized caches (the default MEMORY_AND_DISK) store data as Java objects on the JVM heap. Each cached row creates multiple objects — String wrappers, Integer boxes, array backing stores. Caching 500 GB across a cluster means billions of Java objects. The garbage collector must traverse all of them during every GC cycle, causing stop-the-world pauses of seconds to minutes. Research shows GC accounts for 16-47% of total elapsed time in cache-heavy Spark workloads.
Eviction thrashing. When cached data exceeds available storage memory, LRU eviction kicks in. If you cache 2 TB but only have 500 GB of storage memory, Spark constantly evicts old blocks to make room for new ones. Each eviction either spills to disk (slow) or drops the block entirely (requires recomputation). You end up spending more time evicting and recomputing than you save by caching.
Execution memory starvation. Under the unified memory model, execution can evict cached data. If your shuffles and joins need memory, they will evict cached blocks — silently. You think your data is cached, but half of it has been evicted by a concurrent shuffle. The next query recomputes from source, and you have gained nothing.
Network and disk overhead. Even with MEMORY_AND_DISK, spilled partitions go to local executor disks — SSDs or spinning disks depending on your cluster. Reading from local disk is faster than S3 but still orders of magnitude slower than memory. If most of your cache is on disk, you have not gained much over reading from a well-configured Parquet table.
Recommended Cache Size Guidelines
| Cluster Storage Memory | Max Recommended Cache | Reasoning |
|---|---|---|
| 50 GB total | 25-35 GB | Leave 30-50% for execution and headroom |
| 200 GB total | 100-140 GB | Same ratio; monitor eviction in Storage tab |
| 1 TB total | 400-600 GB | At this scale, GC pressure becomes significant |
| 5 TB+ total | Do not cache everything | Cache only hot/reused data; read cold data from source |
What to Cache vs What to Leave on Disk
Cache these (small, reused frequently):
- Dimension tables (customers, products, regions) — typically 1-500 MB
- Filtered subsets used in multiple downstream operations
- Expensive intermediate results (multi-table joins, complex aggregations) reused 3+ times
- Lookup/reference tables used in enrichment
Do not cache these (large, used once or rarely):
- Fact tables (billions of rows, hundreds of GB) — read from Parquet/Iceberg instead
- Data used in a single downstream operation (caching overhead > benefit)
- Streaming source data that changes every micro-batch
- Full table scans where partition pruning on source is more effective
The Rule of Thumb
Cache the small, expensive, reused data. Leave the large, cheap-to-read, used-once data on object storage. A well-tuned Spark application typically caches 5-20% of its total data volume, not 100%.
If you find yourself caching more than 50% of your cluster's storage memory, you are almost certainly over-caching. Check the Spark UI Storage tab (see next section) for eviction patterns and fraction-cached percentages.
Reading the Spark UI Storage Tab
The Storage tab is your primary tool for understanding cache health. After caching data, navigate to http://<driver>:4040/storage (or the Spark History Server for completed applications).
What You See
The Storage tab lists every cached RDD and DataFrame with:
| Column | What It Tells You |
|---|---|
| RDD Name | The name or description of the cached data (e.g., In-memory table dim_customer) |
| Storage Level | The storage level in use (e.g., Disk Memory Deserialized 1x Replicated) |
| Cached Partitions | How many partitions are currently cached out of the total |
| Fraction Cached | Percentage of partitions successfully cached (100% = fully cached) |
| Size in Memory | Total bytes consumed across all executors' memory |
| Size on Disk | Total bytes spilled to disk (for MEMORY_AND_DISK levels) |
What to Look For
Fraction Cached < 100%. This means some partitions were evicted or never materialized. If you see 60% cached, 40% of your reads go back to the source — and you are paying the overhead of caching without getting the full benefit. Either increase executor memory, reduce what you are caching, or accept the partial cache.
Size on Disk > 0 with MEMORY_AND_DISK. Memory was insufficient, and partitions spilled to disk. Disk reads are 10-100x slower than memory reads. If a large fraction of your cache is on disk, consider:
- Increasing
spark.memory.storageFraction - Increasing executor memory
- Reducing what you cache
- Using
MEMORY_AND_DISK_SERfor smaller in-memory footprint
Large Size in Memory relative to cluster total. If your cached data consumes 80%+ of total storage memory, execution operations (shuffles, joins) will evict cached blocks. Look for performance degradation in shuffle-heavy stages as evidence.
Multiple cached items with low partition counts. If you see 20 cached DataFrames each with a few partitions, you may be over-caching intermediate results. Consolidate or uncache data that is no longer needed.
Clicking Into a Cached Item
Click on any cached RDD/DataFrame name to see the per-executor breakdown:
- Which executors hold which partitions
- Memory and disk usage per executor
- Distribution of cached data across the cluster
This helps diagnose skewed caching — if one executor holds 80% of the cached data while others hold 20%, your cache is unevenly distributed, likely because the underlying data is skewed.
Programmatic Access to Cache Info
# Check if a specific DataFrame is cached
print(df.is_cached) # True / False
print(df.storageLevel) # StorageLevel(True, True, False, True, 1)
# Check via catalog
print(spark.catalog.isCached("dim_customer")) # True / False
# Detailed storage info via SparkContext (JVM bridge)
for rdd_info in spark.sparkContext._jsc.sc().getRDDStorageInfo():
print(f"Name: {rdd_info.name()}")
print(f"Cached Partitions: {rdd_info.numCachedPartitions()}")
print(f"Memory Used: {rdd_info.memSize() / 1048576:.1f} MB")
print(f"Disk Used: {rdd_info.diskSize() / 1048576:.1f} MB")
Spark History Server
For completed applications, the Spark History Server (http://<history-server>:18080) preserves the Storage tab. This lets you analyze cache behavior post-hoc — critical for debugging production jobs that ran overnight.
Look at the event timeline alongside the Storage tab: if cached data appears and then disappears mid-job, it was evicted by execution memory pressure. Correlate with shuffle stages to identify the culprit.
Real-World Optimization Patterns
Pattern 1: Iterative ML Training — 20x Speedup
Caching delivers the most dramatic improvements in iterative algorithms. Without caching, each iteration re-reads from S3:
# Without caching: each iteration reads 50 GB from S3
training_data = spark.read.parquet("s3://bucket/training_data")
for i in range(100):
model = train_iteration(training_data, model)
# 100 iterations × 50 GB = 5 TB of redundant S3 reads
# With caching: data loaded once, reused 100 times
training_data = spark.read.parquet("s3://bucket/training_data").cache()
training_data.count() # Materialize once
for i in range(100):
model = train_iteration(training_data, model) # Reads from memory
# 100 iterations × memory reads = seconds, not minutes
Benchmarks show 20x speedup for logistic regression and up to 100x for iterative graph algorithms.
Pattern 2: Multi-Query Intermediate Results
Compute an expensive transformation once, use it for multiple reports:
# Expensive: 3-way join + aggregation
enriched_orders = (
orders
.join(customers, "customer_id")
.join(products, "product_id")
.withColumn("total", col("quantity") * col("price"))
.cache()
)
enriched_orders.count() # Materialize
# Three reports from the same cached result — no recomputation
daily_summary = enriched_orders.groupBy("order_date").agg(sum("total"))
top_products = enriched_orders.groupBy("product_id").agg(sum("total")).orderBy(desc("total"))
customer_segments = enriched_orders.groupBy("customer_tier").agg(avg("total"), count("*"))
Without caching, each report re-executes the 3-way join from scratch.
Pattern 3: Dimension Table Caching for Star Schema
Cache dimension tables once at the start of the session:
# Cache all dimension tables at application startup
dim_customer = spark.table("dim_customer").cache()
dim_product = spark.table("dim_product").cache()
dim_store = spark.table("dim_store").cache()
dim_date = spark.table("dim_date").cache()
# Force materialization
for dim in [dim_customer, dim_product, dim_store, dim_date]:
dim.count()
# All subsequent joins benefit from cached, possibly broadcast, dimension tables
q1_report = fact_sales.filter(col("quarter") == 1).join(dim_customer, "customer_id")
q2_report = fact_sales.filter(col("quarter") == 2).join(dim_customer, "customer_id")
product_mix = fact_sales.join(dim_product, "product_id").join(dim_store, "store_id")
-- SQL equivalent: eager cache at session start
CACHE TABLE dim_customer;
CACHE TABLE dim_product;
CACHE TABLE dim_store;
CACHE TABLE dim_date;
Pattern 4: Cache Filtered Hot Data
For interactive analysis, cache only the subset you are exploring:
# Cache only last 7 days — manageable size
hot_data = (
spark.table("events")
.filter(col("event_date") >= date_sub(current_date(), 7))
.cache()
)
hot_data.count()
# Fast interactive exploration from cache
hot_data.describe("event_value").show()
hot_data.groupBy("event_type").count().show()
hot_data.filter(col("event_value") > 1000).show(20)
# Historical queries still go to S3 — no wasted memory
historical = spark.table("events").filter(col("event_date") < "2026-01-01")
Pattern 5: Cache + Repartition for Shuffle-Free Downstream Joins
Repartitioning before caching aligns partitions with downstream join keys:
# Repartition by join key, then cache
customer_orders = (
orders
.repartition(200, "customer_id")
.cache()
)
customer_orders.count()
# Downstream joins on customer_id are partition-aligned — reduced shuffle
result = customer_orders.join(dim_customer, "customer_id")
Warning: Repartitioning adds an initial shuffle cost. Only do this when the cached data will be used for multiple downstream operations on the same key.
Pattern 6: Coalesce After Filter Before Caching
When a filter dramatically reduces data volume, consolidate near-empty partitions:
# Original: 1000 partitions, but filter keeps only 5% of data
# Without coalesce: 950 near-empty partitions wasting memory on overhead
filtered = big_table.filter(col("rare_event") == True)
# Better: consolidate into fewer partitions before caching
filtered = (
big_table
.filter(col("rare_event") == True)
.coalesce(50)
.cache()
)
filtered.count()
Downsides and Pitfalls
Memory Pressure
Cached data competes directly with execution memory. If you cache too much:
- Shuffles and joins will evict cached blocks to make room
- You pay the cost of caching (compute + store) AND the cost of recomputation when blocks are evicted
- Shuffle operations may spill to disk because cache is consuming the memory they need
Over-Caching Anti-Pattern
# ANTI-PATTERN: caching every intermediate result
df1 = spark.read.parquet("...").cache()
df2 = df1.filter(...).cache()
df3 = df2.groupBy(...).agg(...).cache()
df4 = df3.join(other).cache()
# All four DataFrames compete for memory — most will be evicted
Cache only DataFrames that are reused multiple times. If a DataFrame is used once, caching adds overhead with zero benefit.
Stale Cache
Cached data is a snapshot at the time of materialization. It does not reflect changes to the underlying source:
df = spark.table("my_table").cache()
df.count() # Returns 1,000,000 rows
# External process inserts 500K rows into my_table
df.count() # Still returns 1,000,000 — reading from stale cache!
# Must explicitly invalidate
df.unpersist()
df = spark.table("my_table").cache()
df.count() # Now returns 1,500,000
For Spark SQL tables: REFRESH TABLE my_table invalidates the cache and reloads metadata.
GC Pressure
Large deserialized caches create millions of Java objects on the heap, causing:
- Extended GC pauses (seconds to minutes on large heaps)
- Stop-the-world events that can trigger executor heartbeat timeouts
- Task failures when GC time exceeds thresholds
Mitigation: Use MEMORY_ONLY_SER with Kryo serialization or OFF_HEAP to reduce object count.
Cache Lost on Restart
- Cached data is lost when the Spark application terminates
- Cached data is lost when individual executors die (unless using
_2replicated levels) - Dynamic allocation can kill idle executors, destroying their cached data. Configure
spark.dynamicAllocation.cachedExecutorIdleTimeout(default: infinity) to protect executors with cached partitions
Partial Caching
Actions that touch only some partitions result in partial caching:
df = spark.read.parquet("...").cache() # 1000 partitions
df.take(10) # Only materializes 1-2 partitions
# Storage tab shows: Cached Partitions: 2 / 1000
Always run .count() or another full-scan action to materialize the complete cache.
Predicate Pushdown to Source Is Lost
Once a DataFrame is cached, the Catalyst optimizer operates on the InMemoryRelation instead of the original source. Predicates are applied to the cached data — they are not pushed down to the storage layer (Parquet, Iceberg, S3):
# Uncached: Spark pushes filter to Parquet reader — reads only matching row groups
df = spark.read.parquet("...").filter(col("date") == "2026-01-01")
# Cached: filter applied after reading from cache — all cached data is scanned
# (batch-level min/max pruning still works, but not file-level Parquet pushdown)
df = spark.read.parquet("...").cache()
df.count()
result = df.filter(col("date") == "2026-01-01")
For large tables where partition pruning eliminates 99% of files, caching the full table and then filtering is worse than reading directly from source with pushdown.
Broadcast Hints Can Be Lost After Caching
from pyspark.sql.functions import broadcast
# The broadcast hint may be lost after caching
dim = broadcast(spark.table("dim_table")).cache()
dim.count()
# The cached InMemoryRelation does not carry the broadcast hint
# Spark re-evaluates based on cached size vs threshold
If you need guaranteed broadcast, either rely on the threshold-based automatic decision or apply the hint at join time, not at cache time.
Cache vs Checkpoint vs Temporary View
These three concepts are frequently confused. They serve different purposes.
Comparison
| Feature | cache() / persist() | checkpoint() | createOrReplaceTempView() |
|---|---|---|---|
| Stores data | Memory / local disk | HDFS / reliable storage | Does NOT store data |
| Preserves lineage | Yes | No (truncates lineage) | Yes (IS the lineage) |
| Survives executor failure | Recompute from lineage | Yes (data on HDFS) | Recomputes |
| Survives driver restart | No | Yes | No |
| Performance | Fastest (memory) | Slower (write to HDFS + read back) | No caching — recomputes every time |
| Primary use case | Repeated access in same app | Break long lineage chains | SQL alias for a query |
createOrReplaceTempView() Is NOT Caching
This is the most common misconception:
df.filter(col("status") == "active").createOrReplaceTempView("active_users")
# This does NOT cache or materialize anything!
# Every query re-executes the full plan from source
spark.sql("SELECT count(*) FROM active_users") # Full computation
spark.sql("SELECT * FROM active_users WHERE age > 30") # Full computation again
To actually cache a temp view:
CREATE OR REPLACE TEMP VIEW active_users AS
SELECT * FROM users WHERE status = 'active';
CACHE TABLE active_users; -- NOW it is cached in memory
When to Use Checkpoint
checkpoint() writes data to HDFS and truncates the lineage — the checkpointed DataFrame has no dependency on its parents:
# Set checkpoint directory
spark.sparkContext.setCheckpointDir("hdfs:///tmp/spark-checkpoints")
# Always cache before checkpoint to avoid double computation
df = expensive_computation().cache()
df.count() # Materialize in cache
checkpointed = df.checkpoint() # Writes from cache to HDFS
df.unpersist() # Release cache; checkpoint persists on HDFS
Use checkpoint for:
- Long lineage chains (100+ transformations) that cause slow planning or stack overflows
- Iterative algorithms — checkpoint every N iterations to keep lineage manageable
- Fault tolerance — checkpointed data survives executor and driver failures
localCheckpoint() is faster (writes to local disk instead of HDFS) but not fault-tolerant.
Decision Guide
| Scenario | Use |
|---|---|
| DataFrame reused 2-3 times in same job | cache() |
| Very long lineage chain causing slow planning | checkpoint() or localCheckpoint() |
| Iterative algorithm (100+ iterations) | cache() + checkpoint() every N iterations |
| Results needed by another Spark application | Write to Parquet/Iceberg table |
| SQL-friendly naming without materialization | createOrReplaceTempView() |
| SQL-friendly naming WITH materialization | createOrReplaceTempView() + CACHE TABLE |
Caching with Iceberg Tables
Iceberg tables interact with Spark caching in specific ways that require attention.
Iceberg Catalog Caching vs DataFrame Caching
Iceberg has its own caching layer — CachingCatalog — that is separate from Spark's DataFrame caching. CachingCatalog caches table metadata (metadata.json, manifest lists, manifest files) to avoid re-reading them from object storage on every query. This is enabled by default:
spark.sql.catalog.my_catalog.cache-enabled=true
spark.sql.catalog.my_catalog.cache.expiration-interval-ms=30000 # 30 seconds
This metadata caching is transparent and generally beneficial. DataFrame caching (df.cache()) caches the actual data — the rows — and is what this blog post has been discussing.
Stale Cache with Iceberg Snapshots
When an Iceberg table receives new writes (creating new snapshots), a cached DataFrame still holds data from the old snapshot:
df = spark.table("catalog.db.my_table").cache()
df.count() # Returns 10M rows from snapshot 123
# Another process runs: INSERT INTO catalog.db.my_table VALUES (...)
# New snapshot 124 is created
df.count() # Still returns 10M — reading from stale cache (snapshot 123)
# Must invalidate and re-cache
df.unpersist()
df = spark.table("catalog.db.my_table").cache()
df.count() # Now returns 10M + new rows from snapshot 124
In SQL:
-- Refresh to pick up new snapshots
REFRESH TABLE catalog.db.my_table;
-- Re-cache
CACHE TABLE catalog.db.my_table;
Caching Iceberg Metadata Tables
Iceberg's virtual metadata tables (files, manifests, snapshots, partitions) can be cached for repeated analysis:
# Cache metadata tables for a health-check session
files = spark.table("catalog.db.my_table.files").cache()
manifests = spark.table("catalog.db.my_table.manifests").cache()
# Run multiple diagnostic queries from cache
files.filter(col("file_size_in_bytes") < 67108864).count() # Small file count
files.groupBy("partition").agg(avg("file_size_in_bytes")).show()
These become stale if the table receives new writes during your session. For one-time analysis this is fine; for long-running monitoring, do not cache.
Disable Caching During Maintenance
Some Iceberg maintenance procedures have built-in caching controls:
-- rewrite_manifests with caching disabled to avoid executor memory issues
CALL catalog.system.rewrite_manifests(
table => 'db.my_table',
use_caching => false
);
The use_caching parameter (default true) controls whether Spark caches intermediate results during the procedure. For large tables, set false to avoid memory pressure.
Time Travel + Caching
Caching a time-travel query pins data to a specific snapshot:
# Cache two snapshots for comparison
current = spark.table("catalog.db.my_table").cache()
historical = (
spark.read
.option("snapshot-id", 123456789)
.table("catalog.db.my_table")
.cache()
)
current.count()
historical.count()
# Compare
new_records = current.subtract(historical)
deleted_records = historical.subtract(current)
# Clean up — each snapshot consumes independent memory
current.unpersist()
historical.unpersist()
Complete Configuration Reference
Caching and Memory Configurations
| Configuration | Default | Description |
|---|---|---|
spark.memory.fraction | 0.6 | Fraction of (heap - 300 MB) for unified memory (storage + execution) |
spark.memory.storageFraction | 0.5 | Initial/minimum fraction of unified memory for storage (cache floor) |
spark.memory.offHeap.enabled | false | Enable off-heap memory for caching |
spark.memory.offHeap.size | 0 | Off-heap memory size per executor |
In-Memory Columnar Storage
| Configuration | Default | Description |
|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | Enable automatic per-column compression for cached DataFrames |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | Rows per CachedBatch. Larger = better compression, higher peak memory |
spark.sql.inMemoryColumnarStorage.partitionPruning | true | Enable batch-level predicate pruning on cached data |
Serialization
| Configuration | Default | Description |
|---|---|---|
spark.serializer | JavaSerializer | Serializer for _SER levels. Set to KryoSerializer for better performance |
spark.kryoserializer.buffer.max | 64m | Maximum Kryo buffer size |
Dynamic Allocation
| Configuration | Default | Description |
|---|---|---|
spark.dynamicAllocation.cachedExecutorIdleTimeout | infinity | Idle timeout for executors holding cached data. Set to prevent premature executor removal |
Broadcast (Related to Caching)
| Configuration | Default | Description |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Max cached/estimated table size for auto-broadcast |
Iceberg Catalog Caching
| Configuration | Default | Description |
|---|---|---|
spark.sql.catalog.<name>.cache-enabled | true | Enable/disable Iceberg catalog metadata caching |
spark.sql.catalog.<name>.cache.expiration-interval-ms | 30000 | Cache TTL in ms; -1 = no expiry, 0 = no caching |
How Cazpian Handles This
On Cazpian, Spark compute pools are pre-configured with tuned memory fractions and storage allocations based on the cluster size and workload profile. When you cache dimension tables through Cazpian's query interface, the platform monitors the Storage tab metrics automatically — tracking fraction cached, memory vs disk usage, and eviction rates. If eviction pressure is detected, Cazpian surfaces recommendations to adjust memory allocation or reduce cache scope. For Iceberg tables, Cazpian's catalog integration handles cache invalidation when new snapshots are committed, so cached data stays fresh without manual REFRESH TABLE calls.
What's Next
This post covered everything about Spark caching — from internal mechanics to production tuning. For related optimizations in the Spark and Iceberg ecosystem, see our other posts:
- Spark Broadcast Joins — how caching enables automatic broadcast, plus the complete broadcast join guide.
- Storage Partitioned Joins — eliminate shuffles for large-to-large joins using Iceberg bucket partitioning.
- Iceberg Query Performance Tuning — partition pruning, bloom filters, and Spark read configs.
- Iceberg Metrics Reporting — monitoring scan and commit health for table diagnostics.
- Iceberg Table Design — choosing partition transforms, bucket counts, and write properties.