Listeners: Hooking into Job, Step, and Chunk Lifecycle Events
Introduction
Spring Batch emits lifecycle events at every stage of execution — before and after a job runs, before and after each step, before and after each chunk, and before and after each individual read/process/write call. Listeners let you hook into these events without modifying your core batch logic.
Common uses:
- Log start/end times and item counts
- Send success/failure notifications (Slack, email, PagerDuty)
- Publish metrics to Prometheus or CloudWatch
- Log every skipped item to a dead-letter table
- Reset resources before a step begins
Listener Hierarchy
Job
├── JobExecutionListener beforeJob / afterJob
│
└── Step
├── StepExecutionListener beforeStep / afterStep
├── ChunkListener beforeChunk / afterChunk / afterChunkError
├── ItemReadListener beforeRead / afterRead / onReadError
├── ItemProcessListener beforeProcess / afterProcess / onProcessError
└── ItemWriteListener beforeWrite / afterWrite / onWriteError
└── SkipListener onSkipInRead / onSkipInWrite / onSkipInProcess
JobExecutionListener
@Component
public class ImportJobListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(ImportJobListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("=== Job [{}] started. ExecutionId={}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getId());
}
@Override
public void afterJob(JobExecution jobExecution) {
BatchStatus status = jobExecution.getStatus();
long durationMs = Duration.between(
jobExecution.getStartTime(),
jobExecution.getEndTime()).toMillis();
if (status == BatchStatus.COMPLETED) {
long totalWritten = jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount).sum();
log.info("=== Job [{}] COMPLETED in {}ms. Total written: {}",
jobExecution.getJobInstance().getJobName(), durationMs, totalWritten);
} else {
String error = jobExecution.getAllFailureExceptions().stream()
.map(Throwable::getMessage)
.collect(Collectors.joining("; "));
log.error("=== Job [{}] {} after {}ms. Errors: {}",
jobExecution.getJobInstance().getJobName(), status, durationMs, error);
}
}
}
Register on the job:
@Bean
public Job importJob(JobRepository jobRepository, Step step, ImportJobListener listener) {
return new JobBuilder("importJob", jobRepository)
.listener(listener)
.start(step)
.build();
}
StepExecutionListener
@Component
public class OrderStepListener implements StepExecutionListener {
private static final Logger log = LoggerFactory.getLogger(OrderStepListener.class);
private Instant stepStartTime;
@Override
public void beforeStep(StepExecution stepExecution) {
stepStartTime = Instant.now();
log.info("Step [{}] starting", stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long elapsed = Duration.between(stepStartTime, Instant.now()).toMillis();
log.info("Step [{}] finished in {}ms — read={}, write={}, skip={}, rollback={}",
stepExecution.getStepName(),
elapsed,
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount(),
stepExecution.getRollbackCount());
// Publish result to job context for next step or listeners
stepExecution.getJobExecution().getExecutionContext()
.putLong(stepExecution.getStepName() + ".writeCount",
stepExecution.getWriteCount());
// Return custom ExitStatus or null to keep the default
return null;
}
}
Register on the step:
@Bean
public Step importStep(JobRepository jobRepository, ..., OrderStepListener listener) {
return new StepBuilder("importStep", jobRepository)
.<Order, Order>chunk(100, tx)
.reader(reader)
.writer(writer)
.listener(listener)
.build();
}
ChunkListener
ChunkListener fires around each chunk transaction — useful for metrics and per-chunk logging.
@Component
public class OrderChunkListener implements ChunkListener {
private static final Logger log = LoggerFactory.getLogger(OrderChunkListener.class);
private Instant chunkStart;
private long totalChunks = 0;
@Override
public void beforeChunk(ChunkContext context) {
chunkStart = Instant.now();
}
@Override
public void afterChunk(ChunkContext context) {
long ms = Duration.between(chunkStart, Instant.now()).toMillis();
totalChunks++;
StepExecution se = context.getStepContext().getStepExecution();
log.debug("Chunk {} completed in {}ms — total written so far: {}",
totalChunks, ms, se.getWriteCount());
}
@Override
public void afterChunkError(ChunkContext context) {
log.error("Chunk {} failed after {}ms",
totalChunks,
Duration.between(chunkStart, Instant.now()).toMillis());
}
}
Item-Level Listeners
ItemReadListener
@Component
public class OrderReadListener implements ItemReadListener<Order> {
@Override
public void beforeRead() {}
@Override
public void afterRead(Order item) {
log.trace("Read order {}", item.getOrderId());
}
@Override
public void onReadError(Exception ex) {
if (ex instanceof FlatFileParseException pe) {
log.error("Read error at line {}: [{}] — {}",
pe.getLineNumber(), pe.getInput(), pe.getCause().getMessage());
} else {
log.error("Unexpected read error: {}", ex.getMessage(), ex);
}
}
}
ItemProcessListener
@Component
public class OrderProcessListener implements ItemProcessListener<Order, ProcessedOrder> {
@Override
public void beforeProcess(Order item) {}
@Override
public void afterProcess(Order item, ProcessedOrder result) {
if (result == null) {
log.debug("Order {} filtered during processing", item.getOrderId());
}
}
@Override
public void onProcessError(Order item, Exception ex) {
log.error("Processing failed for order {}: {}", item.getOrderId(), ex.getMessage());
}
}
ItemWriteListener
@Component
public class OrderWriteListener implements ItemWriteListener<ProcessedOrder> {
@Override
public void beforeWrite(Chunk<? extends ProcessedOrder> items) {
log.debug("Writing chunk of {} items", items.size());
}
@Override
public void afterWrite(Chunk<? extends ProcessedOrder> items) {
log.debug("Successfully wrote {} items", items.size());
}
@Override
public void onWriteError(Exception ex, Chunk<? extends ProcessedOrder> items) {
log.error("Write failed for {} items: {}", items.size(), ex.getMessage());
items.forEach(item ->
log.error(" Failed item: orderId={}", item.getOrderId()));
}
}
SkipListener — Dead Letter Logging
SkipListener is called when an item is skipped (after exhausting retry attempts). Use it to log skipped items to a dead-letter table for manual review.
@Component
@RequiredArgsConstructor
public class OrderSkipListener implements SkipListener<Order, ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
@Override
public void onSkipInRead(Throwable t) {
String input = "";
int lineNumber = -1;
if (t instanceof FlatFileParseException pe) {
input = pe.getInput();
lineNumber = pe.getLineNumber();
}
logToDeadLetter("READ", input, lineNumber, t.getMessage());
}
@Override
public void onSkipInProcess(Order item, Throwable t) {
logToDeadLetter("PROCESS",
"orderId=" + item.getOrderId(), -1, t.getMessage());
}
@Override
public void onSkipInWrite(ProcessedOrder item, Throwable t) {
logToDeadLetter("WRITE",
"orderId=" + item.getOrderId(), -1, t.getMessage());
}
private void logToDeadLetter(String phase, String payload, int lineNumber, String error) {
jdbcTemplate.update(
"INSERT INTO batch_dead_letter (phase, payload, line_number, error_message, created_at) " +
"VALUES (?, ?, ?, ?, NOW())",
phase, payload, lineNumber, error);
}
}
Dead-letter table:
CREATE TABLE batch_dead_letter (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
phase VARCHAR(20) NOT NULL,
payload TEXT,
line_number INT DEFAULT -1,
error_message VARCHAR(1000),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Register the skip listener on the step:
.faultTolerant()
.skip(FlatFileParseException.class)
.skip(OrderValidationException.class)
.skipLimit(500)
.listener(orderSkipListener)
Annotation-Based Listeners
Instead of implementing listener interfaces, annotate methods on any bean:
@Component
public class OrderBatchEventHandler {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
log.info("Job starting: {}", jobExecution.getJobInstance().getJobName());
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
log.info("Job finished: {}", jobExecution.getStatus());
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
log.info("Step starting: {}", stepExecution.getStepName());
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
return null; // return null to keep default ExitStatus
}
@OnReadError
public void onReadError(Exception ex) {
log.error("Read error: {}", ex.getMessage());
}
@OnSkipInWrite
public void onSkipInWrite(Object item, Throwable t) {
log.warn("Skipped item in write: {}", item);
}
}
Register the annotated bean as a listener:
new StepBuilder("step", jobRepository)
.listener(orderBatchEventHandler) // Spring detects the annotations
// ...
Metrics with Micrometer
Publish batch metrics to Prometheus (or any Micrometer registry):
@Component
@RequiredArgsConstructor
public class BatchMetricsListener implements JobExecutionListener, StepExecutionListener {
private final MeterRegistry registry;
@Override
public void afterJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
String status = jobExecution.getStatus().name();
Timer.builder("batch.job.duration")
.tag("job", jobName)
.tag("status", status)
.register(registry)
.record(Duration.between(
jobExecution.getStartTime(),
jobExecution.getEndTime()));
Counter.builder("batch.job.runs")
.tag("job", jobName)
.tag("status", status)
.register(registry)
.increment();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
String stepName = stepExecution.getStepName();
Gauge.builder("batch.step.write.count",
stepExecution, StepExecution::getWriteCount)
.tag("step", stepName)
.register(registry);
Gauge.builder("batch.step.skip.count",
stepExecution, StepExecution::getSkipCount)
.tag("step", stepName)
.register(registry);
return null;
}
}
Complete Listener Setup
@Bean
public Job importOrdersJob(
JobRepository jobRepository,
Step importStep,
ImportJobListener jobListener,
BatchMetricsListener metricsListener) {
return new JobBuilder("importOrdersJob", jobRepository)
.listener(jobListener)
.listener(metricsListener)
.start(importStep)
.build();
}
@Bean
public Step importOrdersStep(
JobRepository jobRepository,
PlatformTransactionManager tx,
FlatFileItemReader<Order> reader,
ItemProcessor<Order, ProcessedOrder> processor,
JdbcBatchItemWriter<ProcessedOrder> writer,
OrderStepListener stepListener,
OrderChunkListener chunkListener,
OrderReadListener readListener,
OrderSkipListener skipListener,
BatchMetricsListener metricsListener) {
return new StepBuilder("importOrdersStep", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(stepListener)
.listener(chunkListener)
.listener(readListener)
.listener(skipListener)
.listener(metricsListener)
.faultTolerant()
.skip(FlatFileParseException.class)
.skip(OrderValidationException.class)
.skipLimit(500)
.build();
}
Key Takeaways
- Six listener types cover the full lifecycle: Job → Step → Chunk → Read/Process/Write/Skip.
SkipListener.onSkipInRead/Process/Writeis called after an item exhausts retries. Use it to write dead-letter records.- Annotation-based listeners (
@BeforeJob,@AfterStep,@OnSkipInWrite, etc.) avoid implementing interfaces — useful when a class serves multiple listener roles. StepExecutionListener.afterStep()can change the step’sExitStatusfor conditional job flow.- Register metrics listeners on both the job (for job-level counters/timers) and the step (for per-step gauges).
What’s Next
Part 6 (Listeners) is complete. Article 17 starts Part 7 — Error Handling. You will learn retry logic: configuring retry policies, backoff strategies, and handling transient failures like database deadlocks and HTTP 503 errors.