Skip to main content

Spark JDBC Data Source: The Complete Optimization Guide for Reads, Writes, and Pushdown

· 43 min read
Cazpian Engineering
Platform Engineering Team

Spark JDBC Data Source: The Complete Optimization Guide

You have a 500 million row table in PostgreSQL. You write spark.read.jdbc(url, "orders", properties) and hit run. Thirty minutes later, the job is still running. One executor is at 100% CPU. The other 49 are idle. Your database server is pegged at a single core, streaming rows through a single JDBC connection while your 50-node Spark cluster sits there doing nothing.

This is the default behavior of Spark JDBC reads. No partitioning. No parallelism. One thread, one connection, one query: SELECT * FROM orders. Every row flows through a single pipe. It is the number one performance mistake data engineers make with Spark JDBC, and it is the default.

This post covers everything you need to know to fix it and to optimize every aspect of Spark JDBC reads and writes. We start with why the default is so slow, then go deep on parallel reads, all pushdown optimizations, fetchSize and batchSize tuning, database-specific configurations, write optimizations, advanced patterns, monitoring and debugging, anti-patterns, and a complete configuration reference.

The Single-Thread Default Problem

When you call spark.read.jdbc() or spark.read.format("jdbc").load() without specifying partitioning options, Spark creates exactly one partition. One partition means one task. One task means one executor thread. One executor thread means one JDBC connection issuing one SQL query.

Detailed diagram of Spark JDBC optimization showing single-thread vs parallel reads, partitionColumn range splitting, predicate and aggregate pushdown flow, fetchSize tuning, and database-specific configurations

# THE #1 MISTAKE: reading without partition options
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db-host:5432/production") \
.option("dbtable", "orders") \
.option("user", "spark_reader") \
.option("password", "secret") \
.load()

# What Spark actually does:
# 1. Opens ONE JDBC connection
# 2. Executes: SELECT * FROM orders
# 3. Fetches ALL rows through that ONE connection
# 4. Loads everything into ONE partition on ONE executor
# 5. The other 49 executors sit completely idle

What Happens Internally

Spark sends a single SELECT "col1", "col2", ... FROM orders query to the database. The database executes it, and the JDBC driver begins streaming rows back. The Spark executor fetches rows in batches controlled by fetchSize (default 0, which means the JDBC driver picks its own default -- often 10 for Oracle, all rows for PostgreSQL). All rows land in a single Spark partition.

Your 50-Node Cluster During a Single-Thread JDBC Read:

Executor 1: [==========================================] 100% CPU (reading all 500M rows)
Executor 2: [ ] 0% (idle)
Executor 3: [ ] 0% (idle)
...
Executor 50: [ ] 0% (idle)

Database: [=====] Single connection, single core, sequential scan

Time: 45 minutes (same as a single laptop)
Cost: 50x what you should be paying

Why This Matters

The entire point of using Spark is distributed parallel processing. A single-thread JDBC read negates that completely. You get zero benefit from your cluster. The read time is identical to what you would get running a Python script with psycopg2 on a single laptop. But you are paying for 50 nodes.

Worse, the single partition creates a downstream bottleneck. Every subsequent operation -- filter, join, aggregation -- starts from a single partition. Spark must repartition the data before it can parallelize anything, adding a full shuffle on top of the already slow read.

The Fix Preview

The fix is to tell Spark how to split the read into multiple parallel queries. There are two approaches:

  1. Numeric/date/timestamp column partitioning -- partitionColumn, lowerBound, upperBound, numPartitions
  2. Custom predicates -- manually defined WHERE clauses for each partition

Both approaches cause Spark to issue multiple concurrent JDBC connections, each executing a different query that reads a subset of the data. Instead of one query reading 500 million rows, you get 50 queries each reading ~10 million rows in parallel.

Parallel Reads with partitionColumn

The primary mechanism for parallel JDBC reads uses four options that must be specified together: partitionColumn, lowerBound, upperBound, and numPartitions.

df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db-host:5432/production") \
.option("dbtable", "orders") \
.option("user", "spark_reader") \
.option("password", "secret") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()

How Spark Generates WHERE Clauses

Spark computes a stride value and generates a WHERE clause for each partition. The math is:

stride = (upperBound - lowerBound) / numPartitions

For the example above: stride = (10,000,000 - 1) / 50 = 199,999

Spark then generates these partition queries:

PartitionWHERE ClauseRows Covered
0WHERE order_id IS NULL OR order_id < 200000Nulls + IDs 1 to 199,999
1WHERE order_id >= 200000 AND order_id < 400000IDs 200,000 to 399,999
2WHERE order_id >= 400000 AND order_id < 600000IDs 400,000 to 599,999
.........
49WHERE order_id >= 9800001IDs 9,800,001 and above

Critical details about this behavior:

The first partition includes NULLs. Partition 0 always gets the predicate column IS NULL OR column < firstUpperBound. This means all rows with NULL values in the partition column go to partition 0. If your column has many NULLs, partition 0 will be significantly larger than the others.

The last partition is unbounded on the upper end. Partition N-1 gets column >= lastLowerBound with no upper bound. This means all rows with values above upperBound still get read -- they all land in the last partition.

lowerBound and upperBound do NOT filter data. This is the most commonly misunderstood aspect. These values only control how Spark divides the range into partitions. They do not add a WHERE clause that excludes rows outside the range. Every row in the table is read regardless of the bounds.

# COMMON MISCONCEPTION: "This only reads orders with IDs 1 to 10,000,000"
# WRONG. This reads ALL orders. The bounds only control partition boundaries.
df = spark.read \
.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "50") \
.load()

# If order_id goes up to 50,000,000:
# Partitions 0-48: each reads ~200K rows (evenly distributed)
# Partition 49: reads 40,200,000 rows (everything above 9,800,001)
# MASSIVE SKEW in partition 49

How Spark Actually Computes the Stride

The Spark source code in JDBCRelation.scala uses precise decimal arithmetic to avoid overflow:

upperStride = upperBound / numPartitions  (with 18 decimal places)
lowerStride = lowerBound / numPartitions (with 18 decimal places)
preciseStride = upperStride - lowerStride
stride = preciseStride.toLong (truncated to long)

Spark then adjusts for precision loss by computing lostNumOfStrides and shifting the lower bound to redistribute the lost fractional strides evenly across partitions. This prevents the systematic accumulation of rounding errors that would cause the last partition to be significantly larger.

Column Requirements

The partitionColumn must be one of these types:

  • Numeric types: IntegerType, LongType, DecimalType, FloatType, DoubleType, ShortType, ByteType
  • Date type: DateType -- bounds specified as yyyy-MM-dd strings
  • Timestamp type: TimestampType -- bounds specified as yyyy-MM-dd HH:mm:ss strings
# Partitioning on a date column
df = spark.read \
.format("jdbc") \
.option("dbtable", "events") \
.option("partitionColumn", "event_date") \
.option("lowerBound", "2024-01-01") \
.option("upperBound", "2026-01-01") \
.option("numPartitions", "24") \
.load()
# Each partition covers approximately one month

# Partitioning on a timestamp column
df = spark.read \
.format("jdbc") \
.option("dbtable", "logs") \
.option("partitionColumn", "created_at") \
.option("lowerBound", "2025-01-01 00:00:00") \
.option("upperBound", "2026-01-01 00:00:00") \
.option("numPartitions", "365") \
.load()
# Each partition covers approximately one day

Choosing Optimal Values

partitionColumn selection:

  1. Use an indexed column. Each partition generates a range query (WHERE col >= X AND col < Y). Without an index, every partition triggers a full table scan. With 50 partitions, that is 50 full table scans instead of 50 index range scans.

  2. Use a uniformly distributed column. If the column has gaps or clustering, some partitions will have far more rows than others. Auto-increment primary keys are ideal. Timestamps work well if data arrives at a steady rate.

  3. Prefer integer primary keys. They have the best index performance, uniform distribution (for auto-increment), and simple stride math.

lowerBound and upperBound selection:

-- Query your database to find optimal bounds
SELECT MIN(order_id), MAX(order_id), COUNT(*) FROM orders;
-- Result: MIN=1, MAX=9,847,293, COUNT=9,500,000

-- Use these as your bounds
-- lowerBound = 1
-- upperBound = 9847293

Set lowerBound to the actual minimum value and upperBound to the actual maximum value. If you set upperBound too low, the last partition absorbs all excess rows. If you set lowerBound too high, the first partition absorbs all rows below the bound plus all NULLs.

numPartitions selection:

The right value depends on cluster size, database capacity, and table size:

# Rule of thumb:
# numPartitions = min(cluster_cores, database_max_connections / 2, table_rows / 1_000_000)

# For a 50-executor cluster with 4 cores each, database max connections = 200:
# numPartitions = min(200, 100, 9.5) = something between 10 and 200
# Start with 2x-4x the number of executor cores: 100-200

# But never exceed what your database can handle
# If the database allows 100 concurrent connections and other apps use 50:
# numPartitions should be <= 50

Handling Data Skew in Partition Columns

Even with correct bounds, data distribution might be non-uniform. An order_id that auto-increments is ideal. But a customer_id where 80% of orders belong to 1% of customers will create massive skew.

# BAD: customer_id is skewed — a few customers have millions of orders
df = spark.read \
.format("jdbc") \
.option("partitionColumn", "customer_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.option("numPartitions", "100") \
.load()
# Partition containing customer_id range 1-10000 might have 50% of all rows
# if the biggest customers have the lowest IDs

# GOOD: order_id is auto-increment, uniformly distributed
df = spark.read \
.format("jdbc") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "100") \
.load()
# Each partition gets ~500,000 rows regardless of customer distribution

Handling Gaps in the Partition Column

If your partition column has large gaps (deleted rows, non-sequential IDs), some partitions will be empty while others are overloaded:

ID Range:   1 -------- 1,000,000 (gap: IDs 100,001 to 900,000 deleted)
Actual data: IDs 1-100,000 and IDs 900,001-1,000,000

With numPartitions=10, stride=100,000:
Partition 0: WHERE id < 100001 → 100,000 rows
Partition 1: WHERE id >= 100001 AND id < 200001 → 0 rows (gap)
Partition 2: WHERE id >= 200001 AND id < 300001 → 0 rows (gap)
...
Partition 8: WHERE id >= 800001 AND id < 900001 → 0 rows (gap)
Partition 9: WHERE id >= 900001 → 100,000 rows

Result: 8 empty partitions, 2 loaded partitions. Parallelism = 2, not 10.

When gaps exist, use custom predicates instead, or choose a different column.

Custom Partition Predicates

When your partition column is not numeric/date/timestamp, or when the built-in partitioning produces skew due to data distribution, you can define custom predicates. Each predicate becomes the WHERE clause for one partition.

# Define one predicate per partition — each becomes a separate JDBC query
predicates = [
"region = 'US-EAST'",
"region = 'US-WEST'",
"region = 'EU-WEST'",
"region = 'EU-EAST'",
"region = 'APAC'",
"region NOT IN ('US-EAST','US-WEST','EU-WEST','EU-EAST','APAC') OR region IS NULL"
]

df = spark.read.jdbc(
url="jdbc:postgresql://db-host:5432/production",
table="orders",
predicates=predicates,
properties={"user": "spark_reader", "password": "secret", "fetchSize": "10000"}
)

# Spark executes 6 parallel queries:
# SELECT * FROM orders WHERE region = 'US-EAST'
# SELECT * FROM orders WHERE region = 'US-WEST'
# SELECT * FROM orders WHERE region = 'EU-WEST'
# SELECT * FROM orders WHERE region = 'EU-EAST'
# SELECT * FROM orders WHERE region = 'APAC'
# SELECT * FROM orders WHERE region NOT IN (...) OR region IS NULL

Critical Rule: Predicates Must Be Mutually Exclusive and Collectively Exhaustive

If your predicates overlap, rows will be duplicated. If they miss rows, data will be lost. There is no validation. Spark trusts you.

# BAD: overlapping predicates — rows with amount=100 appear in BOTH partitions
predicates = [
"amount <= 100",
"amount >= 100" # overlap at amount=100
]

# BAD: missing rows — rows with amount IS NULL are lost
predicates = [
"amount < 1000",
"amount >= 1000"
# NULL values are not covered by either predicate
]

# CORRECT: mutually exclusive and collectively exhaustive including NULLs
predicates = [
"amount < 1000 OR amount IS NULL",
"amount >= 1000 AND amount < 10000",
"amount >= 10000"
]

String Column Partitioning Using Hash

For string columns, use database hash functions to create numeric ranges:

# PostgreSQL: use hashtext() to partition on a string column
num_partitions = 20
predicates = [
f"MOD(ABS(hashtext(customer_name)), {num_partitions}) = {i}"
for i in range(num_partitions)
]
# Adds a catch-all for NULLs to the first partition
predicates[0] = f"(MOD(ABS(hashtext(customer_name)), {num_partitions}) = 0) OR customer_name IS NULL"

df = spark.read.jdbc(url, "customers", predicates=predicates, properties=props)

# MySQL: use CRC32 instead
predicates_mysql = [
f"MOD(CRC32(customer_name), {num_partitions}) = {i}"
for i in range(num_partitions)
]

# Oracle: use ORA_HASH
predicates_oracle = [
f"ORA_HASH(customer_name, {num_partitions - 1}) = {i}"
for i in range(num_partitions)
]

Date Range Predicates

For complex date-based partitioning where built-in date partitioning is insufficient:

from datetime import date, timedelta

start_date = date(2024, 1, 1)
end_date = date(2026, 1, 1)
interval_days = 30

predicates = []
current = start_date
while current < end_date:
next_date = min(current + timedelta(days=interval_days), end_date)
if current == start_date:
# First partition: include everything before start and NULLs
predicates.append(
f"(order_date < '{next_date}') OR order_date IS NULL"
)
elif next_date >= end_date:
# Last partition: include everything from current onwards
predicates.append(f"order_date >= '{current}'")
else:
predicates.append(
f"order_date >= '{current}' AND order_date < '{next_date}'"
)
current = next_date

df = spark.read.jdbc(url, "orders", predicates=predicates, properties=props)

Complex WHERE Clauses as Predicates

Predicates can be any valid SQL WHERE clause, including joins, subqueries, and function calls:

# Partition by first letter of last_name
import string

predicates = []
for letter in string.ascii_uppercase:
predicates.append(f"UPPER(SUBSTRING(last_name, 1, 1)) = '{letter}'")
# Catch-all for non-alpha characters and NULLs
predicates.append(
"UPPER(SUBSTRING(last_name, 1, 1)) NOT IN ("
+ ",".join(f"'{c}'" for c in string.ascii_uppercase)
+ ") OR last_name IS NULL"
)

df = spark.read.jdbc(url, "customers", predicates=predicates, properties=props)
# Creates 27 partitions: one per letter + one catch-all

Pushdown Optimizations

Pushdown optimizations allow Spark to translate DataFrame operations into SQL and push them to the database for execution. Instead of reading all data and processing it in Spark, the database does the filtering, aggregation, or sorting, and Spark receives only the result. This dramatically reduces network I/O and leverages database indexes.

Predicate Pushdown (Filter Pushdown)

Predicate pushdown translates Spark DataFrame .filter() or .where() operations into SQL WHERE clauses that execute on the database.

# With predicate pushdown ENABLED (default):
df = spark.read.format("jdbc").option("url", url).option("dbtable", "orders").load()
result = df.filter("status = 'SHIPPED' AND total > 100.0")

# Spark sends to database:
# SELECT col1, col2, ... FROM orders WHERE status = 'SHIPPED' AND total > 100.0

# With predicate pushdown DISABLED:
df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", "orders") \
.option("pushDownPredicate", "false") \
.load()
result = df.filter("status = 'SHIPPED' AND total > 100.0")

# Spark sends to database:
# SELECT col1, col2, ... FROM orders
# Then filters ALL rows in Spark — reads entire table over the network

Controlled by: pushDownPredicate (default: true)

What gets pushed down:

  • Equality: =, !=, <>, IN
  • Comparison: <, >, <=, >=
  • Null checks: IS NULL, IS NOT NULL
  • String operations: LIKE, STARTS_WITH (depends on dialect)
  • Logical operators: AND, OR, NOT

What does NOT get pushed down:

  • User-defined functions (UDFs)
  • Complex expressions that cannot be translated to the target SQL dialect
  • Filters on computed columns that are not representable in the database
  • Filters involving type mismatches that require casting

Column Pruning (Projection Pushdown)

Column pruning is always active and cannot be disabled. When you select specific columns, Spark only requests those columns from the database.

# Spark sends: SELECT order_id, total FROM orders
# NOT: SELECT * FROM orders
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.select("order_id", "total")

# This is why you should ALWAYS select only needed columns:
# BAD: reads all 50 columns from database, transfers them over network
all_cols = spark.read.format("jdbc").option("dbtable", "orders").load()
result = all_cols.select("order_id")

# GOOD: same result but Spark sends SELECT order_id FROM orders
specific = spark.read.format("jdbc").option("dbtable", "orders").load().select("order_id")

Column pruning happens automatically through Spark's query optimizer. When Spark sees that downstream operations only need certain columns, it pushes the column selection into the JDBC scan.

Aggregate Pushdown

Since Spark 3.2, aggregate operations can be pushed down to JDBC sources. The database computes the aggregation and returns only the result, rather than sending all rows to Spark.

# With aggregate pushdown ENABLED (default since Spark 3.5):
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.groupBy("status").agg(
count("*").alias("order_count"),
sum("total").alias("total_revenue"),
avg("total").alias("avg_order_value"),
min("total").alias("min_order"),
max("total").alias("max_order")
)

# Spark sends to database:
# SELECT status, COUNT(*), SUM(total), AVG(total), MIN(total), MAX(total)
# FROM orders GROUP BY status
# Database returns ~5 rows (one per status) instead of 500 million rows

# Without pushdown, Spark reads all 500M rows and aggregates in Spark

Controlled by: pushDownAggregate (default: true since Spark 3.5)

Supported aggregate functions:

  • COUNT(*), COUNT(col), COUNT(DISTINCT col)
  • SUM(col)
  • AVG(col)
  • MIN(col), MAX(col)
  • VAR_POP(col), VAR_SAMP(col)
  • STDDEV_POP(col), STDDEV_SAMP(col)
  • COVAR_POP(col1, col2), COVAR_SAMP(col1, col2)
  • CORR(col1, col2)
  • REGR_INTERCEPT, REGR_SLOPE, REGR_R2, REGR_SXY

How it handles numPartitions > 1:

When numPartitions is 1 or the GROUP BY key equals the partitionColumn, Spark can push the complete aggregate. When numPartitions > 1, Spark pushes a partial aggregate to each partition and then performs a final aggregate in Spark to combine results from all partitions. This is because each partition's query only sees a subset of the data and cannot produce the final aggregated result alone.

# numPartitions=1: complete pushdown
# Database returns final result directly
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("numPartitions", "1") \
.load()
df.groupBy("status").count()
# Database executes: SELECT status, COUNT(*) FROM orders GROUP BY status

# numPartitions=10: partial pushdown + Spark final aggregate
# Each partition gets a partial aggregate, Spark combines them
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "10") \
.load()
df.groupBy("status").count()
# Each partition sends: SELECT status, COUNT(*) FROM orders WHERE order_id >= X AND order_id < Y GROUP BY status
# Spark sums the partial counts

Limit Pushdown

Limit pushdown sends the LIMIT clause to the database, avoiding reading the entire table when you only need a few rows.

# With limit pushdown ENABLED (default since Spark 3.5):
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.limit(100)

# Spark sends: SELECT col1, col2, ... FROM orders LIMIT 100
# Database returns 100 rows instead of 500 million

# Limit + Sort (Top-N) pushdown:
result = df.orderBy("total", ascending=False).limit(10)
# Spark sends: SELECT ... FROM orders ORDER BY total DESC LIMIT 10
# Database uses its index on 'total' to return the top 10 efficiently

Controlled by: pushDownLimit (default: true since Spark 3.5)

Behavior with numPartitions > 1: When partitioned, Spark pushes the LIMIT to each partition query and then applies a final LIMIT in Spark. Each partition returns up to N rows, and Spark keeps only N total. This reads more than N rows from the database but far fewer than the entire table.

Offset Pushdown

Offset pushdown sends the OFFSET clause to the database.

# With offset pushdown ENABLED (default since Spark 3.5):
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.offset(1000).limit(100)

# Spark sends: SELECT ... FROM orders LIMIT 100 OFFSET 1000

Controlled by: pushDownOffset (default: true since Spark 3.5)

Important limitation: Offset pushdown only works when numPartitions = 1. With multiple partitions, applying OFFSET to each partition query would skip different rows in each partition, producing incorrect results. Spark applies the offset client-side instead.

TableSample Pushdown

TABLESAMPLE operations can be pushed to the database.

# With tablesample pushdown ENABLED (default since Spark 3.5):
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.sample(0.01) # 1% sample

# Database executes the sampling, returns only ~1% of rows

Controlled by: pushDownTableSample (default: true since Spark 3.5)

How to Verify Pushdown Is Happening

Use explain() or explain(True) to inspect the physical plan and confirm which operations were pushed down:

df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", "orders") \
.load()

# Apply operations
result = df.filter("status = 'SHIPPED'") \
.filter("total > 100") \
.select("order_id", "status", "total") \
.groupBy("status") \
.agg(count("*").alias("cnt"), sum("total").alias("revenue"))

# Inspect the physical plan
result.explain(True)

What to look for in the output:

== Physical Plan ==
*(1) HashAggregate(keys=[status#10], functions=[count(1), sum(total#12)])
+- *(1) Scan JDBCRelation(orders) [numPartitions=1] ...
PushedAggregates: [COUNT(*), SUM(total)],
PushedFilters: [IsNotNull(status), EqualTo(status,SHIPPED), IsNotNull(total), GreaterThan(total,100.0)],
PushedGroupby: [status],
ReadSchema: struct<status:string,cnt:bigint,revenue:decimal(38,2)>

Key fields to check:

FieldMeaning
PushedFiltersFilter conditions sent to database
PushedAggregatesAggregate functions sent to database
PushedGroupbyGROUP BY columns sent to database
ReadSchemaColumns actually read from database

If PushedFilters is empty [] when you expected filters, the pushdown failed. Common reasons:

  • The filter involves a UDF or expression the JDBC dialect does not support
  • Type mismatch between the Spark filter and the database column type
  • pushDownPredicate is set to false
# Quick check: compare explain plans with and without pushdown
df_push = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("pushDownPredicate", "true") \
.load().filter("status = 'SHIPPED'")

df_nopush = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("pushDownPredicate", "false") \
.load().filter("status = 'SHIPPED'")

df_push.explain() # Should show PushedFilters: [EqualTo(status,SHIPPED)]
df_nopush.explain() # Should show PushedFilters: [] with a Filter node above the scan

Nested Schema Pruning

spark.sql.optimizer.nestedSchemaPruning (default: true) enables pruning of nested struct fields. For JDBC sources that return JSON or nested data types, this allows Spark to request only the nested fields actually used in the query rather than the entire nested structure.

# If a column contains JSON that Spark parses into a struct:
# With nested pruning, Spark only requests the fields you access
df.select("address.city", "address.zip_code")
# Instead of reading the entire 'address' struct with all fields

Connection and Performance Tuning

fetchSize

The fetchSize option controls how many rows the JDBC driver retrieves from the database per network round trip. It has a dramatic impact on read performance.

df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("fetchSize", "10000") \
.load()

Default values vary by database:

DatabaseDefault fetchSizeBehavior
PostgreSQL0 (loads all rows into memory)Extremely dangerous for large tables
Oracle10Very slow for large reads; needs tuning
MySQL0 (loads all rows into memory)Similar to PostgreSQL
SQL Server128Moderate; needs tuning for large tables
DB20Driver-dependent

The impact is real:

Reading 10 million rows with fetchSize=10 (Oracle default):
Round trips: 1,000,000
Network overhead: massive
Time: ~45 minutes

Reading 10 million rows with fetchSize=10,000:
Round trips: 1,000
Network overhead: minimal
Time: ~2 minutes

Recommended values:

Table SizeRecommended fetchSizeRationale
< 100K rows1,000Low memory, fast enough
100K - 10M rows10,000Good balance of memory and speed
10M - 100M rows10,000 - 50,000Depends on row width
> 100M rows50,000 - 100,000Reduce round trips aggressively

Memory math: Each fetched batch is held in the JDBC driver's memory. For a table with 100-byte average row width, fetchSize=10,000 uses ~1 MB per fetch. For a table with 10 KB average row width, fetchSize=10,000 uses ~100 MB per fetch. Size your fetchSize based on row width, not just row count.

batchSize for Writes

The batchSize option controls how many rows are sent to the database per INSERT batch during writes.

df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target_table") \
.option("batchSize", "10000") \
.save()

Default: 1000

Individual INSERT statements are extremely slow. Batch inserts group multiple rows into a single database call. Increasing batchSize from the default 1000 to 10,000 or higher reduces network round trips and allows the database to optimize the batch insert.

Recommended values:

DatabaseRecommended batchSizeNotes
PostgreSQL10,000 - 50,000Handles large batches well
MySQL10,000 - 50,000Use rewriteBatchedStatements=true in URL
Oracle10,000 - 100,000Large batches perform best
SQL Server10,000 - 50,000Depends on row width

queryTimeout

df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("queryTimeout", "300") \
.load()
# Kills the JDBC query if it runs longer than 300 seconds

Default: 0 (no timeout)

Set a timeout to prevent runaway queries from locking database resources indefinitely. This is especially important for production pipelines where a hung JDBC connection can block other users.

customSchema

customSchema overrides the default Spark type inference for JDBC columns. This is critical when the JDBC driver reports incorrect types or when you need specific Spark types.

df = spark.read.format("jdbc") \
.option("dbtable", "financial_data") \
.option("customSchema", "amount DECIMAL(38,10), id LONG, name STRING") \
.load()

# Without customSchema:
# - DECIMAL(38,10) in the database might become DoubleType (losing precision)
# - BIGINT might become IntegerType (truncation risk)
# - VARCHAR(MAX) might become incorrect type

Common use cases:

  • Forcing DECIMAL precision for financial data (avoiding DoubleType rounding)
  • Mapping BIGINT to LongType when the driver reports IntegerType
  • Using StringType for columns where the JDBC driver reports an unsupported type
  • Mapping TIMESTAMP WITHOUT TIME ZONE to TimestampNTZType (use preferTimestampNTZ option)

sessionInitStatement

sessionInitStatement executes SQL on the database connection immediately after it is opened, before any data reads. Use it for session-level database settings.

# Oracle: enable direct path reads for faster full table scans
df = spark.read.format("jdbc") \
.option("dbtable", "large_table") \
.option("sessionInitStatement",
"""BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") \
.load()

# PostgreSQL: set work_mem for this session to speed up sorts
df = spark.read.format("jdbc") \
.option("dbtable", "large_table") \
.option("sessionInitStatement", "SET work_mem = '512MB'") \
.load()

# PostgreSQL: set statement timeout for safety
df = spark.read.format("jdbc") \
.option("dbtable", "large_table") \
.option("sessionInitStatement", "SET statement_timeout = '600s'") \
.load()

# MySQL: set session variables
df = spark.read.format("jdbc") \
.option("dbtable", "large_table") \
.option("sessionInitStatement",
"SET SESSION net_read_timeout=600, SESSION net_write_timeout=600") \
.load()

createTableColumnTypes and createTableOptions

These options control table creation during writes:

# createTableColumnTypes: specify exact column types for the target table
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "new_table") \
.option("createTableColumnTypes",
"id BIGINT, name VARCHAR(255), amount DECIMAL(18,2), description TEXT") \
.save()

# createTableOptions: append database-specific clauses to CREATE TABLE
# MySQL: specify engine and character set
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "new_table") \
.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") \
.save()

# PostgreSQL: specify tablespace or partitioning
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "new_table") \
.option("createTableOptions", "TABLESPACE fast_ssd") \
.save()

Writing Optimizations

Parallel Writes with Repartition

The number of concurrent write connections equals the number of DataFrame partitions (bounded by numPartitions if specified). Each partition opens its own JDBC connection and writes its rows independently.

# SLOW: DataFrame has 1 partition after JDBC read without partitioning
# Writes through single connection
df.write.format("jdbc").option("dbtable", "target").save()

# FAST: repartition before writing for parallel writes
df.repartition(20).write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.option("batchSize", "10000") \
.save()
# 20 parallel connections, each inserting batches of 10,000 rows

# Control max connections with numPartitions on write
df.repartition(50).write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.option("numPartitions", "20") \
.option("batchSize", "10000") \
.save()
# Even though DataFrame has 50 partitions, write uses at most 20 connections
# Spark coalesces partitions to fit numPartitions

Sizing write parallelism:

# Too few partitions: slow writes, one connection does most of the work
# Too many partitions: database connection pool exhaustion, lock contention
# Sweet spot: 10-50 partitions for most databases

# For a table with 100M rows:
# 20 partitions × 10,000 batchSize = 200,000 rows per round trip across all connections
# Total round trips: 100,000,000 / 200,000 = 500 round trips

Truncate Mode vs Overwrite

When using SaveMode.Overwrite, Spark normally drops the existing table and recreates it. This destroys indexes, constraints, grants, and triggers.

# DEFAULT Overwrite behavior: DROP TABLE + CREATE TABLE + INSERT
# Destroys indexes, constraints, foreign keys, grants
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.mode("overwrite") \
.save()

# BETTER: truncate mode preserves table structure
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.option("truncate", "true") \
.mode("overwrite") \
.save()
# Executes TRUNCATE TABLE target (fast, preserves structure)
# Then inserts new data
# Indexes, constraints, grants all preserved

truncate is supported by: MySQL, DB2, SQL Server, Derby, Oracle, and PostgreSQL (with cascadeTruncate for tables with foreign keys).

# PostgreSQL with cascade truncate for tables with foreign key references
df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.option("truncate", "true") \
.option("cascadeTruncate", "true") \
.mode("overwrite") \
.save()
# Executes: TRUNCATE TABLE target CASCADE
# WARNING: this cascades to referencing tables

isolationLevel

Controls the transaction isolation level for write operations.

df.write.format("jdbc") \
.option("url", url) \
.option("dbtable", "target") \
.option("isolationLevel", "READ_COMMITTED") \
.save()

Default: READ_UNCOMMITTED

Available levels:

  • NONE -- no transaction isolation (fastest, least safe)
  • READ_UNCOMMITTED -- allows dirty reads (default)
  • READ_COMMITTED -- prevents dirty reads
  • REPEATABLE_READ -- prevents non-repeatable reads
  • SERIALIZABLE -- strictest isolation (slowest)

The default READ_UNCOMMITTED maximizes write throughput. Each partition writes in its own transaction. If one partition fails, already committed partitions are not rolled back -- Spark JDBC writes are not atomic across partitions. Use NONE to eliminate transaction overhead entirely if your database supports it and your use case allows partial writes.

Database-Specific Optimizations

PostgreSQL

PostgreSQL's JDBC driver has a critical default: fetchSize=0 fetches ALL rows into memory at once. For a 500 million row table, this means loading the entire table into the JDBC driver's heap before Spark sees a single row.

# PostgreSQL: MUST set fetchSize
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "50") \
.load()

PostgreSQL cursor mode: When fetchSize is non-zero, the PostgreSQL JDBC driver switches to cursor-based fetching. However, this only works when autoCommit is disabled. Spark handles this internally -- when you set fetchSize, Spark sets autoCommit to false for the read connection. But if you use sessionInitStatement to set autoCommit=true, cursor mode is silently disabled, and the driver reverts to loading all rows into memory.

# PostgreSQL: optimize session for large reads
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db?ApplicationName=spark_etl") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.option("sessionInitStatement",
"SET work_mem = '256MB'; SET statement_timeout = '600s'") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "50") \
.load()

PostgreSQL write optimization with rewriteBatchedStatements:

# Add rewriteBatchedStatements to the URL for faster batch inserts
df.write.format("jdbc") \
.option("url",
"jdbc:postgresql://host:5432/db?rewriteBatchedStatements=true") \
.option("dbtable", "target") \
.option("batchSize", "10000") \
.save()

MySQL

MySQL has similar defaults to PostgreSQL -- the driver loads all rows into memory by default.

# MySQL: enable streaming result sets with fetchSize
# MySQL's streaming requires fetchSize=Integer.MIN_VALUE for true streaming
# But this limits you to one row at a time. Instead use useCursorFetch:
df = spark.read.format("jdbc") \
.option("url",
"jdbc:mysql://host:3306/db?useCursorFetch=true&defaultFetchSize=10000") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "50") \
.load()

MySQL write optimization:

# MySQL: rewriteBatchedStatements converts individual INSERTs to multi-value INSERTs
# This is the single biggest MySQL write optimization
df.write.format("jdbc") \
.option("url",
"jdbc:mysql://host:3306/db?rewriteBatchedStatements=true&useServerPrepStmts=false") \
.option("dbtable", "target") \
.option("batchSize", "10000") \
.save()
# Without rewriteBatchedStatements: INSERT INTO t VALUES (1,'a'); INSERT INTO t VALUES (2,'b'); ...
# With rewriteBatchedStatements: INSERT INTO t VALUES (1,'a'), (2,'b'), (3,'c'), ...
# 10-50x faster writes

Oracle

Oracle's default fetchSize of 10 is extremely low. This is the most common Oracle JDBC performance problem.

# Oracle: increase fetchSize dramatically from default of 10
df = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//host:1521/service") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "50") \
.option("sessionInitStatement",
"""BEGIN
execute immediate 'alter session set "_serial_direct_read"=true';
execute immediate 'alter session set "_very_large_object_threshold"=1073741824';
END;""") \
.load()

Oracle-specific tips:

  • Use _serial_direct_read=true to bypass buffer cache for full table scans (direct path reads)
  • Oracle supports ROWID-based partitioning if no good numeric column exists
  • For Oracle Real Application Clusters (RAC), use partitionColumn on a column that naturally distributes across RAC nodes
# Oracle: use ROWID for partitioning when no good numeric column exists
# First, find the ROWID range
# SELECT MIN(ROWID), MAX(ROWID) FROM large_table;

# Then use custom predicates based on ROWID ranges
predicates = [
"ROWID <= 'AAAoXAAAEAABqAAAA'",
"ROWID > 'AAAoXAAAEAABqAAAA' AND ROWID <= 'AAAoXAAAEAABqAAAB'",
# ... more ranges
"ROWID > 'AAAoXAAAEAABqAAAZ'"
]
df = spark.read.jdbc(url, "large_table", predicates=predicates, properties=props)

SQL Server

# SQL Server: use Microsoft JDBC driver with specific options
df = spark.read.format("jdbc") \
.option("url",
"jdbc:sqlserver://host:1433;databaseName=mydb"
";encrypt=true;trustServerCertificate=true"
";selectMethod=cursor" # Enable server-side cursor
";responseBuffering=adaptive") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "50000000") \
.option("numPartitions", "50") \
.load()

SQL Server write optimization:

# For high-throughput writes, use the Spark Microsoft SQL Server connector
# instead of the generic JDBC connector:
# spark-shell --packages com.microsoft.azure:spark-mssql-connector_2.12:1.3.0

# Or use generic JDBC with tuning:
df.write.format("jdbc") \
.option("url",
"jdbc:sqlserver://host:1433;databaseName=mydb") \
.option("dbtable", "target") \
.option("batchSize", "10000") \
.option("isolationLevel", "READ_COMMITTED") \
.save()

General JDBC URL Parameters That Help

# PostgreSQL
url = ("jdbc:postgresql://host:5432/db"
"?ApplicationName=spark_etl" # Identifies connections in pg_stat_activity
"&rewriteBatchedStatements=true" # Batch write optimization
"&loginTimeout=30" # Connection timeout
"&socketTimeout=600") # Socket read timeout

# MySQL
url = ("jdbc:mysql://host:3306/db"
"?useCursorFetch=true" # Server-side cursor
"&defaultFetchSize=10000" # Default fetch size
"&rewriteBatchedStatements=true" # Multi-value INSERT
"&useServerPrepStmts=false" # Disable server prep for batch
"&connectTimeout=30000" # Connection timeout (ms)
"&socketTimeout=600000" # Socket timeout (ms)
"&useCompression=true") # Compress data transfer

# Oracle
url = ("jdbc:oracle:thin:@//host:1521/service"
"?oracle.net.CONNECT_TIMEOUT=30000" # Connection timeout
"&oracle.net.READ_TIMEOUT=600000" # Read timeout
"&defaultRowPrefetch=10000") # Alternative to fetchSize

# SQL Server
url = ("jdbc:sqlserver://host:1433"
";databaseName=mydb"
";encrypt=true"
";trustServerCertificate=true"
";selectMethod=cursor" # Server-side cursor
";responseBuffering=adaptive" # Adaptive buffering
";loginTimeout=30" # Login timeout
";queryTimeout=600") # Query timeout

Common Mistakes and Anti-Patterns

Anti-Pattern 1: Reading Without Partitioning

This is the most common mistake. A single-thread read through one JDBC connection.

# BAD: single thread, single connection
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.load()
# Time for 100M rows: ~45 minutes

# GOOD: parallel reads with proper partitioning
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()
# Time for 100M rows: ~2 minutes (25x faster)

Anti-Pattern 2: Non-Indexed partitionColumn

Using a non-indexed column as the partitionColumn causes each partition query to trigger a full table scan.

# BAD: created_at has no index — each partition query does a full table scan
df = spark.read.format("jdbc") \
.option("dbtable", "events") \
.option("partitionColumn", "created_at") \
.option("lowerBound", "2024-01-01") \
.option("upperBound", "2026-01-01") \
.option("numPartitions", "50") \
.load()
# 50 partitions × full table scan each = 50 full table scans
# Your DBA will call you

# GOOD: use the indexed primary key
df = spark.read.format("jdbc") \
.option("dbtable", "events") \
.option("partitionColumn", "event_id") \
.option("lowerBound", "1") \
.option("upperBound", "500000000") \
.option("numPartitions", "50") \
.load()
# 50 index range scans — fast and efficient

Anti-Pattern 3: Wrong Bounds Causing Skew

# BAD: upperBound way below actual max — last partition gets 90% of data
# Actual max order_id is 100,000,000 but you set upperBound to 10,000,000
df = spark.read.format("jdbc") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "50") \
.load()
# Partition 49: WHERE order_id >= 9800001 (reads 90,200,000 rows)
# Partitions 0-48: each reads ~200,000 rows
# Result: partition 49 takes 450x longer than the others

# GOOD: query actual bounds first
# SELECT MIN(order_id), MAX(order_id) FROM orders → 1, 100000000
df = spark.read.format("jdbc") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.load()

Anti-Pattern 4: Not Using Pushdown

# BAD: reading entire table, then filtering in Spark
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("pushDownPredicate", "false") \
.load()
result = df.filter("status = 'SHIPPED' AND order_date > '2025-01-01'")
# Reads ALL 100M rows over network, then filters in Spark to keep 1M
# 99% of network transfer is wasted

# GOOD: let pushdown do the work (default behavior)
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.load()
result = df.filter("status = 'SHIPPED' AND order_date > '2025-01-01'")
# Database filters first, sends only 1M matching rows
# 99x less network transfer

Anti-Pattern 5: fetchSize Too Small or Too Large

# BAD: default fetchSize on Oracle (10 rows per round trip)
df = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//host:1521/service") \
.option("dbtable", "large_table") \
.load()
# 100M rows / 10 per fetch = 10,000,000 round trips
# Network latency of 1ms per round trip = 10,000 seconds = 2.7 hours of just waiting

# BAD: fetchSize too large on PostgreSQL (loads entire result into memory)
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000000") \
.load()
# 10M rows × 1 KB per row = 10 GB loaded into JDBC driver memory per partition
# OOM crash

# GOOD: balanced fetchSize
df = spark.read.format("jdbc") \
.option("dbtable", "large_table") \
.option("fetchSize", "10000") \
.load()
# 100M rows / 10,000 per fetch = 10,000 round trips
# At 1ms latency: 10 seconds of network wait — acceptable
# Memory: 10,000 × 1 KB = 10 MB per partition — safe

Anti-Pattern 6: Not Using Connection Pooling for Large Clusters

When numPartitions is high, each partition opens and closes its own JDBC connection. The connection setup overhead (TCP handshake, SSL negotiation, authentication) adds up.

# BAD: 200 partitions = 200 connection setups
df = spark.read.format("jdbc") \
.option("numPartitions", "200") \
.load()

# BETTER: use database-side connection pooling (PgBouncer, ProxySQL)
# Configure your JDBC URL to connect through the pooler:
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://pgbouncer-host:6432/db") \
.option("numPartitions", "200") \
.load()

Anti-Pattern 7: SELECT * When You Need Two Columns

# BAD: reads all 50 columns from database
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.select("order_id", "total").filter("total > 100")

# The column pruning optimization should handle this, but verify with explain()
# If the ReadSchema still shows all columns, use a subquery:

# GUARANTEED column pruning via subquery:
df = spark.read.format("jdbc") \
.option("dbtable", "(SELECT order_id, total FROM orders WHERE total > 100) AS subq") \
.load()

Anti-Pattern 8: Using query Option with Partitioning

# BAD: query option CANNOT be used with partitionColumn
# This will throw an error
df = spark.read.format("jdbc") \
.option("query", "SELECT * FROM orders WHERE year = 2025") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "50") \
.load()
# ERROR: Options 'query' and 'partitionColumn' can not be specified together

# GOOD: use dbtable with a subquery instead
df = spark.read.format("jdbc") \
.option("dbtable", "(SELECT * FROM orders WHERE year = 2025) AS filtered") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "50") \
.load()
# Spark appends partition WHERE clauses to the subquery
# Each partition runs: SELECT * FROM (SELECT * FROM orders WHERE year = 2025) AS filtered
# WHERE order_id >= X AND order_id < Y

Advanced Patterns

Using dbtable with Subquery for Pre-Filtering

The dbtable option accepts any valid SQL FROM clause, including subqueries. This lets you push complex logic to the database before Spark even starts reading.

# Pre-filter and pre-aggregate in the database
df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", """
(SELECT
customer_id,
COUNT(*) as order_count,
SUM(total) as lifetime_value,
MAX(order_date) as last_order_date
FROM orders
WHERE status != 'CANCELLED'
GROUP BY customer_id
HAVING COUNT(*) >= 5
) AS active_customers
""") \
.option("partitionColumn", "customer_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.option("numPartitions", "20") \
.option("fetchSize", "10000") \
.load()

# The database does the heavy lifting (filter + aggregate)
# Spark reads only the aggregated result (~100K rows instead of 100M)

Important: When using subqueries with dbtable, Spark appends its partition WHERE clauses to the outer query. The subquery must be wrapped in parentheses and have an alias.

Incremental Reads with Watermark Columns

For incremental ETL, read only rows that changed since the last run:

from pyspark.sql import functions as F

# Get the high watermark from the last successful run
last_watermark = spark.read.format("jdbc") \
.option("dbtable",
"(SELECT MAX(updated_at) as hw FROM etl_watermarks WHERE table_name = 'orders') AS wm") \
.load().collect()[0]["hw"]

# Read only new/changed rows since the watermark
new_rows = spark.read.format("jdbc") \
.option("dbtable", f"""
(SELECT * FROM orders
WHERE updated_at > '{last_watermark}'
) AS incremental
""") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "20") \
.option("fetchSize", "10000") \
.load()

# Process incremental data
processed = new_rows.transform(apply_business_logic)

# Write to target (Iceberg table for ACID guarantees)
processed.writeTo("catalog.db.orders_lakehouse").append()

# Update watermark
new_watermark = new_rows.agg(F.max("updated_at")).collect()[0][0]
spark.sql(f"""
UPDATE etl_watermarks
SET watermark_value = '{new_watermark}', last_run = current_timestamp()
WHERE table_name = 'orders'
""")

Reading from Read Replicas

Direct your JDBC reads to read replicas to avoid impacting the primary database:

# Production write traffic goes to primary
# Spark ETL reads from replica
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://read-replica-host:5432/production") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()

# For Aurora PostgreSQL, use the reader endpoint
url = "jdbc:postgresql://mydb.cluster-ro-xyz.us-east-1.rds.amazonaws.com:5432/production"

# For Cloud SQL with read replicas
url = "jdbc:postgresql://read-replica-ip:5432/production"

Caching JDBC Results for Repeated Use

JDBC reads are expensive network operations. If you need the same data for multiple operations, cache it:

# BAD: reads from database twice
df = spark.read.format("jdbc").option("dbtable", "orders").load()
count = df.count() # JDBC read #1
avg = df.agg(F.avg("total")) # JDBC read #2

# GOOD: cache after the first read
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()
df.cache() # or df.persist(StorageLevel.MEMORY_AND_DISK)

count = df.count() # Reads from cache
avg = df.agg(F.avg("total")) # Reads from cache

# IMPORTANT: unpersist when done
df.unpersist()

Combining JDBC Reads with Broadcast Joins

When joining a large JDBC table with a small lookup table, broadcast the small table:

from pyspark.sql.functions import broadcast

# Large table: orders (100M rows) — read in parallel from database
orders = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()

# Small table: product_categories (10K rows) — single read is fine
categories = spark.read.format("jdbc") \
.option("dbtable", "product_categories") \
.load()

# Broadcast the small table to avoid shuffling the large table
enriched = orders.join(broadcast(categories), "category_id")

Using Spark JDBC with Iceberg for Data Lake Ingestion

The most powerful pattern: read from JDBC, write to Iceberg. This creates an open, queryable copy of your database tables in your data lake with full ACID support, time travel, and schema evolution.

# Configure Iceberg catalog
spark.conf.set("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.lakehouse.type", "hadoop")
spark.conf.set("spark.sql.catalog.lakehouse.warehouse", "s3://my-lake/warehouse")

# Read from PostgreSQL
orders = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://replica:5432/production") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.option("fetchSize", "10000") \
.load()

# Write to Iceberg table (first time: create)
orders.writeTo("lakehouse.db.orders") \
.tableProperty("format-version", "2") \
.partitionedBy(F.months("order_date")) \
.createOrReplace()

# Incremental sync (subsequent runs): MERGE INTO
spark.sql("""
MERGE INTO lakehouse.db.orders AS target
USING (
SELECT * FROM jdbc_orders_incremental
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.updated_at > target.updated_at
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")

# Now you can query with time travel, schema evolution, and full SQL
spark.sql("SELECT * FROM lakehouse.db.orders VERSION AS OF 42")
spark.sql("SELECT * FROM lakehouse.db.orders TIMESTAMP AS OF '2025-06-01 00:00:00'")

Using prepareQuery for Complex SQL

The prepareQuery option lets you prepend SQL statements (like WITH clauses or temp table creation) that are not supported in subqueries:

# Read using a CTE (WITH clause) that cannot be embedded in a subquery
df = spark.read.format("jdbc") \
.option("url", url) \
.option("prepareQuery", """
WITH ranked_orders AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as rn
FROM orders
)
""") \
.option("query", "SELECT * FROM ranked_orders WHERE rn = 1") \
.load()
# Spark sends:
# WITH ranked_orders AS (...)
# SELECT <columns> FROM (SELECT * FROM ranked_orders WHERE rn = 1) spark_gen_alias

Complete Configuration Reference

Read Options

OptionDefaultRecommendedDescription
url(none)--JDBC connection URL
dbtable(none)--Table name or subquery
query(none)--Read query (cannot use with partitionColumn)
prepareQuery(none)--SQL prefix for query (CTEs, temp tables)
driver(none)--JDBC driver class name
partitionColumn(none)Indexed PKColumn for parallel reads (numeric/date/timestamp)
lowerBound(none)MIN(partitionColumn)Lower bound for partition stride
upperBound(none)MAX(partitionColumn)Upper bound for partition stride
numPartitions(none)2-4x executor coresNumber of parallel read partitions
fetchSize010000Rows per JDBC fetch round trip
queryTimeout0300-600Query timeout in seconds
customSchema(none)Use for DECIMALOverride type inference for specific columns
sessionInitStatement(none)DB-specific tuningSQL executed on each new session
pushDownPredicatetruetruePush filters to database
pushDownAggregatetruetruePush aggregates to database
pushDownLimittruetruePush LIMIT to database
pushDownOffsettruetruePush OFFSET to database
pushDownTableSampletruetruePush TABLESAMPLE to database
preferTimestampNTZfalseDepends on use caseMap TIMESTAMP WITHOUT TIME ZONE to TimestampNTZ
connectionProvider(none)--Named connection provider

Write Options

OptionDefaultRecommendedDescription
url(none)--JDBC connection URL
dbtable(none)--Target table name
driver(none)--JDBC driver class name
numPartitions(none)10-50Max parallel write connections
batchSize100010000-50000Rows per INSERT batch
isolationLevelREAD_UNCOMMITTEDREAD_UNCOMMITTEDTransaction isolation level
truncatefalsetrue for overwriteTRUNCATE instead of DROP+CREATE on overwrite
cascadeTruncateDB defaultfalse (unless needed)CASCADE with TRUNCATE
createTableOptions(none)DB-specificAppended to CREATE TABLE statement
createTableColumnTypes(none)Use for precisionOverride column types in CREATE TABLE
queryTimeout0600Statement timeout in seconds

Authentication Options

OptionDefaultDescription
user(none)Database username
password(none)Database password
keytab(none)Kerberos keytab file path
principal(none)Kerberos principal name
refreshKrb5ConfigfalseRefresh Kerberos config before connecting

Spark SQL Configuration Properties

PropertyDefaultDescription
spark.sql.jdbc.pushdowntrueGlobal predicate pushdown enable/disable
spark.sql.optimizer.nestedSchemaPruningtruePrune nested struct fields

Monitoring and Debugging JDBC Reads

Verifying Parallel Reads Are Happening

# Check the number of partitions after reading
df = spark.read.format("jdbc") \
.option("dbtable", "orders") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "100000000") \
.option("numPartitions", "50") \
.load()

print(f"Number of partitions: {df.rdd.getNumPartitions()}")
# Should print: Number of partitions: 50
# If it prints 1, your partitioning options were ignored or invalid

Spark UI Indicators for JDBC Stages

In the Spark UI at http://<driver>:4040:

Jobs tab: Look for the job triggered by your JDBC read action. Click it to see stages.

Stages tab: A properly partitioned JDBC read shows a stage with N tasks (where N = numPartitions). Each task represents one JDBC connection reading one partition.

Stage 0 (JDBC Read):
Total Tasks: 50
Active Tasks: 50 (initially)

Task 0: Reading partition WHERE order_id IS NULL OR order_id < 2000001
Task 1: Reading partition WHERE order_id >= 2000001 AND order_id < 4000001
...
Task 49: Reading partition WHERE order_id >= 98000001

What to look for:

  • All tasks running simultaneously: Confirms parallel reads
  • Task durations roughly equal: Confirms balanced partitioning
  • One task dramatically longer: Indicates skewed partitioning (wrong bounds or uneven distribution)
  • Only 1 task: Partitioning not applied -- check your options

Task metrics to monitor:

  • Input Size / Records: Rows read per partition. Should be roughly equal.
  • Duration: Time per partition. Large variance = skew.
  • GC Time: If high, fetchSize might be too large.

SQL Query Logging on the Database Side

Enable query logging in your database to see exactly what Spark is sending:

-- PostgreSQL: enable query logging
ALTER SYSTEM SET log_statement = 'all';
ALTER SYSTEM SET log_min_duration_statement = 0;
SELECT pg_reload_conf();

-- Now check pg_stat_activity for Spark queries:
SELECT pid, state, query, query_start, backend_type
FROM pg_stat_activity
WHERE application_name = 'spark_etl'
ORDER BY query_start DESC;

-- MySQL: enable general log
SET GLOBAL general_log = 'ON';
SET GLOBAL general_log_file = '/var/log/mysql/general.log';

-- Oracle: check V$SQL for recent Spark queries
SELECT sql_text, executions, elapsed_time/1000000 as elapsed_sec
FROM V$SQL
WHERE sql_text LIKE '%orders%'
ORDER BY last_active_time DESC;

-- SQL Server: use Extended Events or sys.dm_exec_requests
SELECT r.session_id, r.status, t.text as sql_text, r.start_time
FROM sys.dm_exec_requests r
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) t
WHERE t.text LIKE '%orders%';

How to See What SQL Spark Is Sending

# Method 1: Use explain() to see the scan details
df = spark.read.format("jdbc").option("dbtable", "orders").load()
result = df.filter("status = 'SHIPPED'").select("order_id", "total")
result.explain(True) # Shows logical and physical plans with PushedFilters

# Method 2: Enable Spark SQL logging
# In log4j.properties or spark-defaults.conf:
# log4j.logger.org.apache.spark.sql.execution.datasources.jdbc=DEBUG

# Method 3: Check the SQL tab in Spark UI
# After running a query, the SQL tab shows the plan with JDBC scan details
# including PushedFilters, ReadSchema, and the number of partitions

# Method 4: Use queryExecution to see the full plan
print(result.queryExecution.toString())

# Method 5: Use Spark listener to capture queries programmatically
class JDBCQueryListener(spark._jvm.org.apache.spark.sql.util.QueryExecutionListener):
def onSuccess(self, funcName, qe, duration):
print(f"Query: {qe.logical().toString()}")
def onFailure(self, funcName, qe, exception):
pass

Debugging Slow JDBC Reads Checklist

SymptomDiagnosisFix
Only 1 task in the JDBC read stagePartitioning not configuredAdd partitionColumn, lowerBound, upperBound, numPartitions
50 tasks but one takes 100x longerWrong bounds or skewed columnQuery actual MIN/MAX, choose better partition column
All tasks slowLow fetchSize or full table scansIncrease fetchSize, ensure partition column is indexed
OOM on executor during readfetchSize too high or PostgreSQL loading all rowsReduce fetchSize, ensure fetchSize > 0 for PostgreSQL
Database CPU at 100%Too many parallel queries or missing indexesReduce numPartitions, add indexes on partition column
Connection timeout errorsToo many connections or slow databaseReduce numPartitions, use connection pooler
PushedFilters empty in explainPushdown disabled or unsupported filterCheck pushDownPredicate, simplify filter expressions
Reading all columns when only 2 neededColumn pruning not working through subqueryUse dbtable subquery with explicit SELECT

Using SQL Syntax

All of the above can also be expressed using Spark SQL:

-- Create a JDBC source view
CREATE TEMPORARY VIEW jdbc_orders
USING jdbc
OPTIONS (
url 'jdbc:postgresql://host:5432/production',
dbtable 'orders',
user 'spark_reader',
password 'secret',
partitionColumn 'order_id',
lowerBound '1',
upperBound '100000000',
numPartitions '50',
fetchSize '10000',
pushDownPredicate 'true',
pushDownAggregate 'true'
);

-- Query with automatic pushdown
SELECT status, COUNT(*), SUM(total)
FROM jdbc_orders
WHERE status = 'SHIPPED' AND order_date > '2025-01-01'
GROUP BY status;
-- Spark pushes the filter AND the aggregate to the database

-- Write results to another JDBC table
CREATE TEMPORARY VIEW jdbc_target
USING jdbc
OPTIONS (
url 'jdbc:postgresql://host:5432/production',
dbtable 'order_summary',
user 'spark_writer',
password 'secret'
);

INSERT INTO jdbc_target
SELECT status, COUNT(*) as cnt, SUM(total) as revenue
FROM jdbc_orders
WHERE status = 'SHIPPED'
GROUP BY status;

-- Write JDBC data to Iceberg
INSERT INTO lakehouse.db.orders_snapshot
SELECT * FROM jdbc_orders;

How Cazpian Handles This

On Cazpian, Spark compute pools are preconfigured with optimized JDBC defaults for every major database engine. When you configure a JDBC connection in Cazpian, the platform automatically sets appropriate fetchSize values based on the target database type -- 10,000 for Oracle, cursor-mode-compatible settings for PostgreSQL, and useCursorFetch-enabled URLs for MySQL. Cazpian's query advisor analyzes your JDBC read patterns and recommends optimal numPartitions and partitionColumn settings based on table statistics, flagging single-thread reads before they reach production. For Iceberg-based data lake ingestion pipelines, Cazpian provides managed JDBC-to-Iceberg sync with automatic incremental watermark tracking, MERGE INTO for upserts, and schema drift detection when source database schemas evolve. The platform's cost dashboard shows JDBC read times alongside cluster utilization, making it clear when a misconfigured JDBC read is wasting compute on idle executors.

What's Next

This post covered every dimension of Spark JDBC data source optimization -- from the single-thread default problem to parallel reads, pushdown optimizations, database-specific tuning, write performance, and advanced ingestion patterns. For related topics in the Spark and Iceberg ecosystem, see our other posts: