Skip to main content

Spark Runtime Metrics Collection with DriverPlugin, ExecutorPlugin, and SparkListener

· 23 min read
Cazpian Engineering
Platform Engineering Team

Spark Runtime Metrics Collection with DriverPlugin, ExecutorPlugin, and SparkListener

You tuned your Spark cluster. You picked the right join strategies. You enabled AQE. But you are still flying blind. When a job takes twice as long as yesterday, you open the Spark UI, scroll through 200 stages, and guess. When an Iceberg scan suddenly plans for 12 seconds instead of 2, you have no history to compare against. You cannot trend what you do not collect.

Apache Spark ships a powerful but underused plugin system — DriverPlugin, ExecutorPlugin, and SparkListener — that lets you tap into every metric the engine produces at runtime. Combined with Iceberg's MetricsReporter, you get a unified view of compute and storage performance for every query, every task, and every table scan. This post shows you how to build that pipeline from scratch, store the metrics at scale, and turn raw numbers into actionable performance insights.

Why the Spark UI Is Not Enough

The Spark UI is great for debugging a single job in real time. It is terrible for everything else:

LimitationImpact
No historical retentionUI data disappears when the application ends unless you configure a History Server
No cross-job comparisonCannot compare Tuesday's run against Wednesday's to find regressions
No alertingNo way to fire a Slack message when shuffle spill exceeds 10 GB
No Iceberg contextZero visibility into scan planning duration, manifest pruning, or commit contention
Manual inspectionYou must click through stages and tasks one at a time — does not scale to hundreds of daily jobs

You need structured metrics, persisted to a queryable store, with enough dimensionality to slice by job, stage, table, and date.

Architecture Overview

The complete metrics pipeline has four collection layers feeding into a unified storage backend:

Detailed architecture diagram showing DriverPlugin, ExecutorPlugin, SparkListener, and Iceberg MetricsReporter feeding into a unified metrics pipeline

┌─────────────────────────────────────────────────────────────────────┐
│ Spark Application │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ DriverPlugin │ │ ExecutorPlugin │ │ SparkListener │ │
│ │ (driver JVM) │ │ (each executor) │ │ (driver JVM) │ │
│ │ │ │ │ │ │ │
│ │ • Init resources │ │ • Per-task hooks │ │ • Job/Stage/Task │ │
│ │ • Aggregate │ │ • JVM metrics │ │ lifecycle │ │
│ │ • Shutdown flush │ │ • Custom counters │ │ • Full TaskMetrics│ │
│ └────────┬─────────┘ └────────┬──────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ ┌────────┴─────────────────────┴───────────────────────┴─────────┐ │
│ │ QueryExecutionListener (driver JVM) │ │
│ │ • SQL text, logical/physical plans, duration per query │ │
│ └────────┬───────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────┴───────────────────────────────────────────────────────┐ │
│ │ Iceberg MetricsReporter (driver JVM) │ │
│ │ • ScanReport: planning duration, manifests, files pruned │ │
│ │ • CommitReport: commit duration, attempts, files added │ │
│ └────────┬───────────────────────────────────────────────────────┘ │
└───────────┼─────────────────────────────────────────────────────────┘


┌─────────────────┐
│ Metrics Store │
│ (see §Storage) │
└─────────────────┘

Each layer captures a different dimension:

LayerRuns OnWhat It Captures
DriverPluginDriver JVMInitialization, resource setup, aggregation, shutdown hooks
ExecutorPluginEvery executor JVMPer-task metrics, JVM-level stats, custom counters
SparkListenerDriver JVMComplete job/stage/task lifecycle with full TaskMetrics
QueryExecutionListenerDriver JVMSQL text, query plans (logical, optimized, physical), wall-clock duration
Iceberg MetricsReporterDriver JVMScan planning, manifest pruning, commit duration and retry counts

The Spark Plugin API (SPARK-29397)

Introduced in Spark 3.0 via SPARK-29397, the plugin API gives you lifecycle hooks on both the driver and every executor. You register a single SparkPlugin that returns a DriverPlugin and an ExecutorPlugin.

SparkPlugin — The Entry Point

import org.apache.spark.api.plugin.SparkPlugin;
import org.apache.spark.api.plugin.DriverPlugin;
import org.apache.spark.api.plugin.ExecutorPlugin;

public class MetricsPlugin implements SparkPlugin {

@Override
public DriverPlugin driverPlugin() {
return new MetricsDriverPlugin();
}

@Override
public ExecutorPlugin executorPlugin() {
return new MetricsExecutorPlugin();
}
}

Register it with a single Spark config:

spark.plugins=com.cazpian.spark.MetricsPlugin

That is it. Spark will instantiate your plugin on startup, call driverPlugin() once on the driver, and call executorPlugin() on every executor as they register.

DriverPlugin — Driver-Side Lifecycle

The DriverPlugin runs in the driver JVM. Its primary jobs are initializing shared resources (database connections, metric registries) and aggregating data at shutdown.

import org.apache.spark.api.plugin.DriverPlugin;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.SparkContext;

import java.util.Map;
import java.util.Collections;

public class MetricsDriverPlugin implements DriverPlugin {

private MetricsStore metricsStore;

@Override
public Map<String, String> init(SparkContext sc, PluginContext ctx) {
// Called once when the driver starts.
// Return value is a map passed to every ExecutorPlugin.init().
String appId = sc.applicationId();
String appName = sc.appName();

// Initialize the metrics store (DB connection, HTTP client, etc.)
metricsStore = MetricsStoreFactory.create(
sc.getConf().get("spark.cazpian.metrics.backend", "postgres")
);
metricsStore.init();

// Register SparkListener for task/stage/job events
sc.addSparkListener(new MetricsSparkListener(metricsStore, appId, appName));

// Return metadata to executor plugins
return Map.of(
"appId", appId,
"appName", appName,
"metricsBackend", sc.getConf().get("spark.cazpian.metrics.backend", "postgres")
);
}

@Override
public void registerMetrics(String appId, PluginContext ctx) {
// Optional: register custom Spark metrics via the Codahale registry.
// ctx.metricRegistry() returns the shared MetricRegistry.
}

@Override
public void receive(Object message) {
// Receive messages sent from ExecutorPlugins via PluginContext.send().
// Useful for executor → driver metric aggregation.
if (message instanceof ExecutorMetricsBatch batch) {
metricsStore.writeBatch(batch);
}
}

@Override
public void shutdown() {
// Called when the SparkContext is stopped.
// Flush buffered metrics, close connections.
if (metricsStore != null) {
metricsStore.flush();
metricsStore.close();
}
}
}

Key points:

  • init() returns a Map<String, String> that is automatically forwarded to every executor's ExecutorPlugin.init() — use this to pass connection strings, application IDs, or feature flags without relying on broadcast variables.
  • receive() accepts messages sent from executors via PluginContext.send() — this is your executor→driver communication channel.
  • shutdown() is your last chance to flush buffered metrics before the JVM exits.

ExecutorPlugin — Executor-Side Hooks

The ExecutorPlugin runs on every executor JVM. It has hooks for initialization, per-task callbacks, and shutdown.

import org.apache.spark.api.plugin.ExecutorPlugin;
import org.apache.spark.api.plugin.PluginContext;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MetricsExecutorPlugin implements ExecutorPlugin {

private PluginContext pluginContext;
private String executorId;
private String appId;
private final ConcurrentLinkedQueue<TaskMetricsSnapshot> buffer =
new ConcurrentLinkedQueue<>();
private ScheduledExecutorService scheduler;

@Override
public void init(PluginContext ctx, Map<String, String> extraConf) {
// Called once when the executor registers with the driver.
// extraConf contains the map returned by DriverPlugin.init().
this.pluginContext = ctx;
this.executorId = ctx.executorID();
this.appId = extraConf.getOrDefault("appId", "unknown");

// Start a periodic flush to send buffered metrics to the driver
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::flushToDriver, 10, 10, TimeUnit.SECONDS);
}

@Override
public void onTaskStart() {
// Called at the beginning of every task on this executor.
// Capture start timestamp for custom duration tracking.
}

@Override
public void onTaskSucceeded() {
// Called when a task completes successfully.
// Note: TaskMetrics are NOT available here — use SparkListener instead.
}

@Override
public void onTaskFailed(Throwable failureReason) {
// Called when a task fails. Log the failure reason.
buffer.add(TaskMetricsSnapshot.failure(executorId, failureReason));
}

@Override
public void shutdown() {
// Flush remaining metrics and stop the scheduler.
flushToDriver();
if (scheduler != null) {
scheduler.shutdown();
}
}

private void flushToDriver() {
if (buffer.isEmpty()) return;
ExecutorMetricsBatch batch = new ExecutorMetricsBatch(executorId);
TaskMetricsSnapshot snapshot;
while ((snapshot = buffer.poll()) != null) {
batch.add(snapshot);
}
// Send to driver via the plugin communication channel
pluginContext.send(batch);
}
}

Important limitation: The ExecutorPlugin task callbacks (onTaskSucceeded, onTaskFailed) do not provide access to TaskMetrics. To get shuffle bytes, spill sizes, GC time, and other per-task metrics, you need a SparkListener on the driver side listening for SparkListenerTaskEnd events.

The DriverPlugin ↔ ExecutorPlugin Communication Channel

The plugin API provides a built-in message-passing mechanism:

ExecutorPlugin                          DriverPlugin
│ │
│ pluginContext.send(message) │
│ ─────────────────────────────────────► │
│ │ receive(message)
│ │
  • Executor → Driver: Call pluginContext.send(Object) from the executor. The driver's DriverPlugin.receive(Object) is called with the message. Messages must be serializable.
  • Driver → Executor: Use the Map<String, String> returned from DriverPlugin.init() for one-time configuration. For runtime communication, use Spark's broadcast or accumulator mechanisms.

This channel is ideal for aggregating executor-local JVM metrics (heap usage, GC counts, custom counters) at the driver for centralized storage.

SparkListener — The Metrics Goldmine

While the plugin API gives you lifecycle hooks, SparkListener is where the real metrics live. It runs on the driver and receives callbacks for every significant event in the Spark execution lifecycle.

Available Event Callbacks

EventWhen It FiresKey Data
onJobStartJob submittedJob ID, stage IDs, submission time
onJobEndJob completedJob ID, result (success/failure), completion time
onStageSubmittedStage scheduledStage ID, stage name, number of tasks
onStageCompletedStage finishedStage ID, full TaskMetrics aggregated, failure reason
onTaskStartTask beginsTask ID, executor ID, host, partition ID
onTaskEndTask finishesTask ID, full TaskMetrics, duration, GC time
onExecutorAddedExecutor registersExecutor ID, host, total cores
onExecutorRemovedExecutor deregistersExecutor ID, removal reason
onBlockManagerAddedBlock manager startsExecutor ID, max memory
onEnvironmentUpdateConfigs changedFull Spark properties, classpath, JVM info

TaskMetrics — Every Number You Need

The TaskMetrics object available in onTaskEnd and onStageCompleted contains the complete picture of what a task did:

Core Metrics:

MetricMethodWhat It Tells You
Executor run timeexecutorRunTimeWall-clock time the task ran (ms)
Executor CPU timeexecutorCpuTimeCPU time consumed (ns) — ratio to run time shows I/O wait
JVM GC timejvmGCTimeTime spent in garbage collection (ms)
Result sizeresultSizeSize of the serialized task result sent back to driver
Result serialization timeresultSerializationTimeTime spent serializing the result
Memory bytes spilledmemoryBytesSpilledData spilled from memory to disk — indicates memory pressure
Disk bytes spilleddiskBytesSpilledData written to disk during spills
Peak execution memorypeakExecutionMemoryMaximum execution memory used during the task

Shuffle Write Metrics:

MetricMethodWhat It Tells You
Bytes writtenshuffleWriteMetrics.bytesWrittenTotal shuffle data produced
Records writtenshuffleWriteMetrics.recordsWrittenTotal shuffle records produced
Write timeshuffleWriteMetrics.writeTimeTime spent writing shuffle files (ns)

Shuffle Read Metrics:

MetricMethodWhat It Tells You
Remote bytes readshuffleReadMetrics.remoteBytesReadShuffle data fetched from other executors
Local bytes readshuffleReadMetrics.localBytesReadShuffle data read from the same executor
Fetch wait timeshuffleReadMetrics.fetchWaitDurationTime spent waiting for shuffle blocks — high values indicate network bottlenecks
Records readshuffleReadMetrics.recordsReadTotal shuffle records consumed
Remote blocks fetchedshuffleReadMetrics.remoteBlocksFetchedNumber of remote shuffle blocks
Local blocks fetchedshuffleReadMetrics.localBlocksFetchedNumber of local shuffle blocks

Input/Output Metrics:

MetricMethodWhat It Tells You
Bytes readinputMetrics.bytesReadData read from data sources (HDFS, S3, Parquet)
Records readinputMetrics.recordsReadRecords read from data sources
Bytes writtenoutputMetrics.bytesWrittenData written to output sinks
Records writtenoutputMetrics.recordsWrittenRecords written to output sinks

Implementation — MetricsSparkListener

import org.apache.spark.scheduler.*;
import org.apache.spark.executor.TaskMetrics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MetricsSparkListener extends SparkListener {

private final MetricsStore store;
private final String appId;
private final String appName;
private final Map<Integer, Long> jobStartTimes = new ConcurrentHashMap<>();
private final Map<Integer, Long> stageStartTimes = new ConcurrentHashMap<>();

public MetricsSparkListener(MetricsStore store, String appId, String appName) {
this.store = store;
this.appId = appId;
this.appName = appName;
}

@Override
public void onJobStart(SparkListenerJobStart event) {
jobStartTimes.put(event.jobId(), System.currentTimeMillis());
store.writeJobEvent(JobEvent.builder()
.appId(appId)
.appName(appName)
.jobId(event.jobId())
.stageIds(event.stageIds())
.submissionTime(event.time())
.properties(extractProperties(event.properties()))
.build()
);
}

@Override
public void onJobEnd(SparkListenerJobEnd event) {
long startTime = jobStartTimes.getOrDefault(event.jobId(), 0L);
long duration = System.currentTimeMillis() - startTime;
store.writeJobCompletion(JobCompletion.builder()
.appId(appId)
.jobId(event.jobId())
.result(event.jobResult().toString())
.durationMs(duration)
.completionTime(event.time())
.build()
);
}

@Override
public void onStageCompleted(SparkListenerStageCompleted event) {
StageInfo info = event.stageInfo();
TaskMetrics metrics = info.taskMetrics();

if (metrics == null) return; // Stage was cancelled

store.writeStageMetrics(StageMetricsRecord.builder()
.appId(appId)
.stageId(info.stageId())
.stageName(info.name())
.attemptNumber(info.attemptNumber())
.numTasks(info.numTasks())
// Core metrics
.executorRunTimeMs(metrics.executorRunTime())
.executorCpuTimeNs(metrics.executorCpuTime())
.jvmGcTimeMs(metrics.jvmGCTime())
.memoryBytesSpilled(metrics.memoryBytesSpilled())
.diskBytesSpilled(metrics.diskBytesSpilled())
.peakExecutionMemory(metrics.peakExecutionMemory())
// Shuffle write
.shuffleBytesWritten(metrics.shuffleWriteMetrics().bytesWritten())
.shuffleRecordsWritten(metrics.shuffleWriteMetrics().recordsWritten())
.shuffleWriteTimeNs(metrics.shuffleWriteMetrics().writeTime())
// Shuffle read
.shuffleRemoteBytesRead(metrics.shuffleReadMetrics().remoteBytesRead())
.shuffleLocalBytesRead(metrics.shuffleReadMetrics().localBytesRead())
.shuffleFetchWaitTimeMs(metrics.shuffleReadMetrics().fetchWaitDuration())
.shuffleRecordsRead(metrics.shuffleReadMetrics().recordsRead())
// I/O
.inputBytesRead(metrics.inputMetrics().bytesRead())
.inputRecordsRead(metrics.inputMetrics().recordsRead())
.outputBytesWritten(metrics.outputMetrics().bytesWritten())
.outputRecordsWritten(metrics.outputMetrics().recordsWritten())
.build()
);
}

@Override
public void onTaskEnd(SparkListenerTaskEnd event) {
TaskMetrics metrics = event.taskMetrics();
if (metrics == null) return;

store.writeTaskMetrics(TaskMetricsRecord.builder()
.appId(appId)
.stageId(event.stageId())
.taskId(event.taskInfo().taskId())
.executorId(event.taskInfo().executorId())
.host(event.taskInfo().host())
.partitionId(event.taskInfo().partitionId())
.durationMs(event.taskInfo().duration())
.successful(event.taskInfo().successful())
.gcTimeMs(metrics.jvmGCTime())
.peakMemory(metrics.peakExecutionMemory())
.memorySpilled(metrics.memoryBytesSpilled())
.diskSpilled(metrics.diskBytesSpilled())
.shuffleBytesWritten(metrics.shuffleWriteMetrics().bytesWritten())
.shuffleBytesRead(
metrics.shuffleReadMetrics().remoteBytesRead()
+ metrics.shuffleReadMetrics().localBytesRead()
)
.inputBytesRead(metrics.inputMetrics().bytesRead())
.outputBytesWritten(metrics.outputMetrics().bytesWritten())
.build()
);
}
}

QueryExecutionListener — SQL-Level Metrics

For SQL/DataFrame queries, the QueryExecutionListener gives you access to the full query plan and execution duration:

import org.apache.spark.sql.util.QueryExecutionListener;
import org.apache.spark.sql.execution.QueryExecution;

public class MetricsQueryListener implements QueryExecutionListener {

private final MetricsStore store;
private final String appId;

public MetricsQueryListener(MetricsStore store, String appId) {
this.store = store;
this.appId = appId;
}

@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
store.writeQueryMetrics(QueryMetricsRecord.builder()
.appId(appId)
.funcName(funcName)
.durationMs(durationNs / 1_000_000)
.logicalPlan(qe.logical().toString())
.optimizedPlan(qe.optimizedPlan().toString())
.physicalPlan(qe.executedPlan().toString())
.succeeded(true)
.build()
);
}

@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
store.writeQueryMetrics(QueryMetricsRecord.builder()
.appId(appId)
.funcName(funcName)
.physicalPlan(qe.executedPlan() != null ? qe.executedPlan().toString() : "N/A")
.succeeded(false)
.errorMessage(exception.getMessage())
.build()
);
}
}

Register it alongside the SparkListener in your DriverPlugin.init():

@Override
public Map<String, String> init(SparkContext sc, PluginContext ctx) {
// ... existing setup ...

// Register the SparkListener
sc.addSparkListener(new MetricsSparkListener(metricsStore, appId, appName));

// Register the QueryExecutionListener
SparkSession.active().listenerManager()
.register(new MetricsQueryListener(metricsStore, appId));

return Map.of("appId", appId, "appName", appName);
}

This captures query-level performance data that complements the task-level metrics from the SparkListener. You can correlate a slow query's plan with the stage and task metrics that executed it.

Integrating Iceberg MetricsReporter

If you are running Iceberg tables (and if you are reading this blog, you probably are), you should also collect Iceberg's own metrics alongside Spark compute metrics. Iceberg provides ScanReport and CommitReport through its MetricsReporter interface.

For a deep dive on every Iceberg metric, thresholds, and health diagnostics, see our Iceberg Metrics Reporting post.

Unified MetricsReporter

import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.CommitReport;

public class UnifiedIcebergMetricsReporter implements MetricsReporter {

private final MetricsStore store;

public UnifiedIcebergMetricsReporter(MetricsStore store) {
this.store = store;
}

@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport scan) {
store.writeIcebergScanMetrics(IcebergScanRecord.builder()
.tableName(scan.tableName())
.snapshotId(scan.snapshotId())
.planningDurationMs(scan.scanMetrics()
.totalPlanningDuration().totalDuration().toMillis())
.resultDataFiles(scan.scanMetrics()
.resultDataFiles().value())
.resultDeleteFiles(scan.scanMetrics()
.resultDeleteFiles().value())
.totalDataManifests(scan.scanMetrics()
.totalDataManifests().value())
.skippedDataManifests(scan.scanMetrics()
.skippedDataManifests().value())
.totalFileSizeBytes(scan.scanMetrics()
.totalFileSizeInBytes().value())
.build()
);
} else if (report instanceof CommitReport commit) {
store.writeIcebergCommitMetrics(IcebergCommitRecord.builder()
.tableName(commit.tableName())
.operation(commit.operation())
.commitDurationMs(commit.commitMetrics()
.totalDuration().totalDuration().toMillis())
.attempts(commit.commitMetrics()
.attempts().value())
.addedDataFiles(commit.commitMetrics()
.addedDataFiles().value())
.removedDataFiles(commit.commitMetrics()
.removedDataFiles().value())
.addedDeleteFiles(commit.commitMetrics()
.addedDeleteFiles().value())
.addedRecords(commit.commitMetrics()
.addedRecords().value())
.build()
);
}
}
}

Register the reporter in your Spark session config:

spark.sql.catalog.my_catalog.metrics-reporter-impl=com.cazpian.iceberg.UnifiedIcebergMetricsReporter

Or register it programmatically inside your DriverPlugin.init() by adding it to the Iceberg catalog configuration.

Putting It All Together — The Unified Plugin

Here is the complete SparkPlugin implementation that wires everything together:

public class CazpianMetricsPlugin implements SparkPlugin {

@Override
public DriverPlugin driverPlugin() {
return new DriverPlugin() {
private MetricsStore store;

@Override
public Map<String, String> init(SparkContext sc, PluginContext ctx) {
String appId = sc.applicationId();
String appName = sc.appName();
String backend = sc.getConf()
.get("spark.cazpian.metrics.backend", "postgres");

// Initialize storage
store = MetricsStoreFactory.create(backend);
store.init();

// Register SparkListener
sc.addSparkListener(
new MetricsSparkListener(store, appId, appName));

// Register QueryExecutionListener
SparkSession.active().listenerManager()
.register(new MetricsQueryListener(store, appId));

return Map.of(
"appId", appId,
"appName", appName
);
}

@Override
public void receive(Object msg) {
if (msg instanceof ExecutorMetricsBatch batch) {
store.writeBatch(batch);
}
}

@Override
public void shutdown() {
if (store != null) {
store.flush();
store.close();
}
}
};
}

@Override
public ExecutorPlugin executorPlugin() {
return new MetricsExecutorPlugin();
}
}

Deploy with two configs:

# Register the unified metrics plugin
spark.plugins=com.cazpian.spark.CazpianMetricsPlugin

# Register Iceberg's MetricsReporter for scan/commit metrics
spark.sql.catalog.my_catalog.metrics-reporter-impl=com.cazpian.iceberg.UnifiedIcebergMetricsReporter

Storage Backends — Beyond PostgreSQL

PostgreSQL works great when you have a handful of Spark jobs producing a few thousand metric rows per day. It stops working when:

  • You have hundreds of concurrent jobs each producing 10,000+ task-level metrics
  • You need sub-second dashboard queries over weeks of historical data
  • Your metrics volume exceeds 50 million rows/day
  • You need automatic downsampling (keep 1-second resolution for 7 days, 1-minute for 90 days, 1-hour for 1 year)

Here are the alternatives, ranked by how well they fit Spark metrics workloads:

Comparison Matrix

BackendIngestion RateQuery SpeedDownsamplingOperational OverheadBest For
PostgreSQL~5K rows/secSlow at scaleManualLow (you know it)Small teams, < 50 jobs/day
TimescaleDB~50K rows/secFast (hypertables)Built-in policiesLow (PG extension)Teams already on PostgreSQL
ClickHouse~500K rows/secSub-second on billionsTTL + materialized viewsMediumHigh-volume production
QuestDB~1M rows/secSub-secondDownsampling SQLLowUltra-high-throughput time series
VictoriaMetrics~800K samples/secFast (PromQL)AutomaticLow (single binary)Prometheus/Grafana ecosystem
Iceberg on S3Batch (Spark job)Moderate (Spark query)Partition-based retentionMinimalCost-optimized archival
DuckDB~200K rows/secFast (local analytics)ManualNone (embedded)Single-node analysis, dev/test

Option 1: TimescaleDB — The Easiest Upgrade

If you are already using PostgreSQL, TimescaleDB is a drop-in extension that adds time-series capabilities with zero migration friction.

-- Install TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Create the task metrics table
CREATE TABLE task_metrics (
time TIMESTAMPTZ NOT NULL,
app_id TEXT NOT NULL,
stage_id INT NOT NULL,
task_id BIGINT NOT NULL,
executor_id TEXT,
host TEXT,
duration_ms BIGINT,
gc_time_ms BIGINT,
peak_memory BIGINT,
memory_spilled BIGINT,
disk_spilled BIGINT,
shuffle_write_bytes BIGINT,
shuffle_read_bytes BIGINT,
input_bytes BIGINT,
output_bytes BIGINT
);

-- Convert to a hypertable (automatic time-based partitioning)
SELECT create_hypertable('task_metrics', 'time');

-- Automatic compression after 7 days (10x storage reduction)
ALTER TABLE task_metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'app_id',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('task_metrics', INTERVAL '7 days');

-- Automatic retention: drop data older than 90 days
SELECT add_retention_policy('task_metrics', INTERVAL '90 days');

-- Continuous aggregate for dashboard queries (1-minute rollups)
CREATE MATERIALIZED VIEW task_metrics_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
app_id,
COUNT(*) AS task_count,
AVG(duration_ms) AS avg_duration_ms,
MAX(duration_ms) AS max_duration_ms,
SUM(shuffle_write_bytes) AS total_shuffle_write,
SUM(memory_spilled) AS total_memory_spilled,
SUM(gc_time_ms) AS total_gc_time
FROM task_metrics
GROUP BY bucket, app_id;

Why TimescaleDB for Spark metrics:

  • Uses standard PostgreSQL wire protocol — your existing JDBC writer works unchanged
  • Continuous aggregates replace manual rollup jobs
  • Compression policies shrink older data automatically
  • Same SQL you already know

Option 2: ClickHouse — Production-Grade at Scale

For high-volume environments (hundreds of Spark jobs, millions of tasks per day), ClickHouse provides columnar compression, sub-second analytical queries, and exceptional write throughput.

CREATE TABLE spark_task_metrics (
timestamp DateTime64(3),
app_id LowCardinality(String),
app_name LowCardinality(String),
stage_id UInt32,
task_id UInt64,
executor_id LowCardinality(String),
host LowCardinality(String),
duration_ms UInt64,
gc_time_ms UInt64,
peak_memory UInt64,
memory_spilled UInt64,
disk_spilled UInt64,
shuffle_write_bytes UInt64,
shuffle_read_bytes UInt64,
input_bytes UInt64,
output_bytes UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (app_id, timestamp, stage_id, task_id)
TTL timestamp + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Materialized view for stage-level rollups
CREATE MATERIALIZED VIEW spark_stage_metrics_mv
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (app_id, timestamp, stage_id)
AS SELECT
toStartOfMinute(timestamp) AS timestamp,
app_id,
stage_id,
countState() AS task_count,
avgState(duration_ms) AS avg_duration_ms,
maxState(duration_ms) AS max_duration_ms,
sumState(shuffle_write_bytes) AS total_shuffle_write,
sumState(memory_spilled) AS total_memory_spilled
FROM spark_task_metrics
GROUP BY timestamp, app_id, stage_id;

Why ClickHouse for Spark metrics:

  • Handles millions of inserts per second with batched writes
  • Columnar storage gives 10–20x compression over row stores
  • LowCardinality type is perfect for executor IDs, hostnames, app names
  • TTL-based retention — no cron jobs needed
  • Built-in Grafana data source for dashboards

Option 3: VictoriaMetrics — If You Live in Prometheus/Grafana

If your infrastructure already uses Prometheus and Grafana, VictoriaMetrics is a high-performance, long-term storage backend that speaks PromQL natively.

// Write metrics using the Prometheus remote write protocol
public class VictoriaMetricsStore implements MetricsStore {

private final String vmUrl;

@Override
public void writeTaskMetrics(TaskMetricsRecord record) {
// Convert to Prometheus exposition format
String metrics = String.format(
"spark_task_duration_ms{app_id=\"%s\",stage_id=\"%d\",executor=\"%s\"} %d %d\n"
+ "spark_task_gc_time_ms{app_id=\"%s\",stage_id=\"%d\",executor=\"%s\"} %d %d\n"
+ "spark_task_shuffle_write_bytes{app_id=\"%s\",stage_id=\"%d\",executor=\"%s\"} %d %d\n"
+ "spark_task_memory_spilled{app_id=\"%s\",stage_id=\"%d\",executor=\"%s\"} %d %d\n",
record.appId(), record.stageId(), record.executorId(),
record.durationMs(), record.timestamp(),
record.appId(), record.stageId(), record.executorId(),
record.gcTimeMs(), record.timestamp(),
record.appId(), record.stageId(), record.executorId(),
record.shuffleBytesWritten(), record.timestamp(),
record.appId(), record.stageId(), record.executorId(),
record.memorySpilled(), record.timestamp()
);
httpPost(vmUrl + "/api/v1/import/prometheus", metrics);
}
}

Then query from Grafana with standard PromQL:

# Average task duration by app over the last hour
avg by (app_id) (spark_task_duration_ms[1h])

# 99th percentile shuffle write per stage
histogram_quantile(0.99,
rate(spark_task_shuffle_write_bytes[5m])
)

# Spill rate trend
sum by (app_id) (rate(spark_task_memory_spilled[5m]))

Why VictoriaMetrics for Spark metrics:

  • Single binary deployment — no cluster to manage
  • PromQL support means instant Grafana dashboards
  • 10x less storage than Prometheus with the same data
  • Automatic downsampling for long-term retention

Option 4: Iceberg Tables on S3 — Metrics as a Data Product

This is the most cost-effective approach for long-term archival. Write your metrics as Iceberg tables — stored on S3, queried with Spark itself or Trino.

public class IcebergMetricsStore implements MetricsStore {

private SparkSession spark;

@Override
public void writeStageMetrics(StageMetricsRecord record) {
// Buffer records and write as a batch using Spark DataFrame
bufferedRecords.add(record);

if (bufferedRecords.size() >= BATCH_SIZE) {
Dataset<Row> df = spark.createDataFrame(bufferedRecords, StageMetricsRecord.class);
df.writeTo("metrics_catalog.spark_metrics.stage_metrics")
.append();
bufferedRecords.clear();
}
}
}
-- Create the metrics table
CREATE TABLE metrics_catalog.spark_metrics.stage_metrics (
timestamp TIMESTAMP,
app_id STRING,
stage_id INT,
stage_name STRING,
executor_run_time_ms BIGINT,
gc_time_ms BIGINT,
shuffle_write_bytes BIGINT,
shuffle_read_bytes BIGINT,
memory_spilled BIGINT,
input_bytes BIGINT,
output_bytes BIGINT
)
USING iceberg
PARTITIONED BY (days(timestamp))
TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728',
'history.expire.max-snapshot-age-ms' = '604800000'
);

Why Iceberg for metrics archival:

  • Near-zero storage cost on S3
  • Partition pruning makes time-range queries fast
  • Query with Spark, Trino, DuckDB, or any Iceberg-compatible engine
  • Schema evolution — add new metric columns without rewriting data
  • Time travel — go back and requery last week's metric snapshot

Do not over-engineer from day one. Start with what works and scale as your volume grows:

Phase 1 (< 20 jobs/day): PostgreSQL or TimescaleDB. Simple, reliable, SQL you know. Write from the SparkListener directly via JDBC. Good for teams of 1–5 engineers.

Phase 2 (20–200 jobs/day): Add ClickHouse or QuestDB for task-level metrics. Keep PostgreSQL for job/query-level summaries. Add Grafana dashboards. Buffer writes with an in-memory queue and flush every 5–10 seconds to avoid overwhelming the database.

Phase 3 (200+ jobs/day): Write hot metrics (last 7 days) to ClickHouse/VictoriaMetrics. Archive older data to Iceberg tables on S3. Use materialized views or continuous aggregates for dashboard queries. Set up alerting on spill rates, GC ratios, and scan planning duration thresholds.

                  Hot path (7 days)           Cold path (archival)
┌──────────────┐ ┌────────────────────┐
SparkListener ──► │ ClickHouse │ ── TTL ──► │ Iceberg on S3 │
│ or Victoria │ │ (partitioned by │
│ Metrics │ │ day, compressed) │
└──────┬───────┘ └────────────────────┘


┌──────────────┐
│ Grafana │
│ Dashboards │
└──────────────┘

Key Metrics to Track and Alert On

Not every metric deserves a dashboard panel. Focus on the signals that actually predict performance problems:

Compute Health

MetricHealthy RangeAlert ThresholdWhat To Do
GC time / executor run time< 5%> 10%Increase executor memory or reduce data per partition
Memory bytes spilled0> 100 MB per stageIncrease spark.executor.memory or reduce shuffle.partitions
Disk bytes spilled0> 0Indicates serious memory pressure — resize executors
Shuffle fetch wait time< 100 ms> 500 ms per taskNetwork bottleneck — check data locality or increase bandwidth
CPU time / run time ratio> 80%< 50%High I/O wait — check storage throughput or data skew

Shuffle Health

MetricHealthy RangeAlert ThresholdWhat To Do
Shuffle write bytes per stageStable ±20%2x increase vs baselineData grew or join exploded — check query plan
Remote/local read ratioStableSudden shift to remoteData locality degraded — check executor placement
Shuffle records skew (max/median)< 3x> 10xData skew — see our data skew guide

Iceberg Table Health

MetricHealthy RangeAlert ThresholdWhat To Do
Scan planning duration< 2 sec> 5 secToo many manifests — run rewrite_manifests
Skipped/total manifests ratio> 80%< 50%Partition pruning not effective — check partition strategy
Result delete files0> 100 per scanRun compaction to merge delete files
Commit attempts1> 3Writer contention — see our Iceberg table design guide

Production Considerations

Thread Safety

The SparkListener runs on the LiveListenerBus thread. Your onTaskEnd will be called thousands of times during a job. Never block on I/O in the listener callback — buffer events and flush asynchronously:

private final BlockingQueue<TaskMetricsRecord> buffer =
new LinkedBlockingQueue<>(100_000);
private final ScheduledExecutorService flusher =
Executors.newSingleThreadScheduledExecutor();

public MetricsSparkListener(MetricsStore store, String appId, String appName) {
this.store = store;
this.appId = appId;
this.appName = appName;

// Flush every 5 seconds
flusher.scheduleAtFixedRate(() -> {
List<TaskMetricsRecord> batch = new ArrayList<>();
buffer.drainTo(batch, 5000);
if (!batch.isEmpty()) {
store.writeBatch(batch);
}
}, 5, 5, TimeUnit.SECONDS);
}

@Override
public void onTaskEnd(SparkListenerTaskEnd event) {
TaskMetrics m = event.taskMetrics();
if (m == null) return;
buffer.offer(buildRecord(event, m)); // Non-blocking
}

Graceful Shutdown

Always flush remaining metrics when the application ends:

@Override
public void shutdown() {
// Flush remaining buffered records
List<TaskMetricsRecord> remaining = new ArrayList<>();
buffer.drainTo(remaining);
if (!remaining.isEmpty()) {
store.writeBatch(remaining);
}
flusher.shutdown();
store.close();
}

JAR Packaging and Deployment

Package your metrics plugin as a fat JAR (with dependencies shaded) and deploy it alongside your Spark jobs:

<!-- Maven shade plugin configuration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<relocations>
<!-- Shade database drivers to avoid classpath conflicts -->
<relocation>
<pattern>org.postgresql</pattern>
<shadedPattern>shaded.org.postgresql</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>

Deploy:

spark-submit \
--jars /path/to/cazpian-metrics-plugin.jar \
--conf spark.plugins=com.cazpian.spark.CazpianMetricsPlugin \
--conf spark.cazpian.metrics.backend=clickhouse \
--conf spark.sql.catalog.my_catalog.metrics-reporter-impl=com.cazpian.iceberg.UnifiedIcebergMetricsReporter \
your-application.jar

Metrics Overhead

The plugin adds minimal overhead when implemented correctly:

ComponentOverheadNotes
SparkListener callbacks< 1 ms per callbackAsynchronous buffering — never blocks task execution
ExecutorPlugin hooks< 0.5 ms per taskonTaskStart/onTaskSucceeded are lightweight
Periodic flush< 50 ms per batchRuns on a background thread every 5–10 seconds
Network writesDepends on backendBatch writes amortize connection overhead
Total per-task overhead< 2 msNegligible for tasks running 100 ms – minutes

Grafana Dashboard Queries

Once your metrics are flowing into a time-series store, here are the key dashboard panels to create:

Job Duration Trend (ClickHouse)

SELECT
toStartOfHour(timestamp) AS hour,
app_name,
avg(duration_ms) / 1000 AS avg_duration_sec,
max(duration_ms) / 1000 AS max_duration_sec
FROM spark_job_metrics
WHERE timestamp > now() - INTERVAL 7 DAY
GROUP BY hour, app_name
ORDER BY hour;

Shuffle Spill Heatmap (ClickHouse)

SELECT
toStartOfMinute(timestamp) AS minute,
stage_id,
sum(memory_spilled) / (1024*1024) AS spill_mb,
sum(gc_time_ms) / 1000 AS gc_sec
FROM spark_task_metrics
WHERE app_id = '{app_id}'
AND timestamp > now() - INTERVAL 1 DAY
GROUP BY minute, stage_id
ORDER BY minute;

Iceberg Scan Planning Trend (TimescaleDB)

SELECT
time_bucket('1 hour', timestamp) AS hour,
table_name,
avg(planning_duration_ms) AS avg_planning_ms,
max(planning_duration_ms) AS max_planning_ms,
avg(result_data_files) AS avg_files_scanned,
avg(skipped_data_manifests::float / NULLIF(total_data_manifests, 0)) AS prune_ratio
FROM iceberg_scan_metrics
WHERE timestamp > now() - INTERVAL 7 DAY
GROUP BY hour, table_name
ORDER BY hour;

What We Covered

SectionKey Takeaway
Why not just Spark UINo history, no alerting, no Iceberg context, no cross-job comparison
SparkPlugin APISingle entry point (spark.plugins config) wires DriverPlugin + ExecutorPlugin
DriverPluginInitializes resources, registers listeners, receives executor messages, flushes at shutdown
ExecutorPluginPer-task hooks on every executor, communicates back to driver via send()
SparkListenerThe metrics goldmine — 30+ per-task metrics including shuffle, GC, spill, I/O
QueryExecutionListenerSQL text, query plans, and wall-clock duration per query
Iceberg MetricsReporterScanReport and CommitReport for table-level read/write health
Storage backendsPostgreSQL → TimescaleDB → ClickHouse/VictoriaMetrics → Iceberg on S3
Production patternsAsync buffering, graceful shutdown, shaded JAR deployment, < 2 ms overhead

The metrics you do not collect are the performance problems you cannot diagnose. Start with a SparkListener writing to PostgreSQL, and scale to ClickHouse or VictoriaMetrics as your job volume grows. The plugin API makes it a single config change to wire into any Spark cluster.