Listeners: Hooking into Job, Step, and Chunk Lifecycle Events

Introduction

Spring Batch emits lifecycle events at every stage of execution — before and after a job runs, before and after each step, before and after each chunk, and before and after each individual read/process/write call. Listeners let you hook into these events without modifying your core batch logic.

Common uses:

  • Log start/end times and item counts
  • Send success/failure notifications (Slack, email, PagerDuty)
  • Publish metrics to Prometheus or CloudWatch
  • Log every skipped item to a dead-letter table
  • Reset resources before a step begins

Listener Hierarchy

Job
├── JobExecutionListener          beforeJob / afterJob
│
└── Step
    ├── StepExecutionListener     beforeStep / afterStep
    ├── ChunkListener             beforeChunk / afterChunk / afterChunkError
    ├── ItemReadListener          beforeRead / afterRead / onReadError
    ├── ItemProcessListener       beforeProcess / afterProcess / onProcessError
    └── ItemWriteListener         beforeWrite / afterWrite / onWriteError
        └── SkipListener          onSkipInRead / onSkipInWrite / onSkipInProcess

JobExecutionListener

@Component
public class ImportJobListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(ImportJobListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("=== Job [{}] started. ExecutionId={}",
                jobExecution.getJobInstance().getJobName(),
                jobExecution.getId());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        BatchStatus status = jobExecution.getStatus();
        long durationMs = Duration.between(
                jobExecution.getStartTime(),
                jobExecution.getEndTime()).toMillis();

        if (status == BatchStatus.COMPLETED) {
            long totalWritten = jobExecution.getStepExecutions().stream()
                    .mapToLong(StepExecution::getWriteCount).sum();
            log.info("=== Job [{}] COMPLETED in {}ms. Total written: {}",
                    jobExecution.getJobInstance().getJobName(), durationMs, totalWritten);
        } else {
            String error = jobExecution.getAllFailureExceptions().stream()
                    .map(Throwable::getMessage)
                    .collect(Collectors.joining("; "));
            log.error("=== Job [{}] {} after {}ms. Errors: {}",
                    jobExecution.getJobInstance().getJobName(), status, durationMs, error);
        }
    }
}

Register on the job:

@Bean
public Job importJob(JobRepository jobRepository, Step step, ImportJobListener listener) {
    return new JobBuilder("importJob", jobRepository)
            .listener(listener)
            .start(step)
            .build();
}

StepExecutionListener

@Component
public class OrderStepListener implements StepExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(OrderStepListener.class);
    private Instant stepStartTime;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        stepStartTime = Instant.now();
        log.info("Step [{}] starting", stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        long elapsed = Duration.between(stepStartTime, Instant.now()).toMillis();

        log.info("Step [{}] finished in {}ms — read={}, write={}, skip={}, rollback={}",
                stepExecution.getStepName(),
                elapsed,
                stepExecution.getReadCount(),
                stepExecution.getWriteCount(),
                stepExecution.getSkipCount(),
                stepExecution.getRollbackCount());

        // Publish result to job context for next step or listeners
        stepExecution.getJobExecution().getExecutionContext()
                .putLong(stepExecution.getStepName() + ".writeCount",
                         stepExecution.getWriteCount());

        // Return custom ExitStatus or null to keep the default
        return null;
    }
}

Register on the step:

@Bean
public Step importStep(JobRepository jobRepository, ..., OrderStepListener listener) {
    return new StepBuilder("importStep", jobRepository)
            .<Order, Order>chunk(100, tx)
            .reader(reader)
            .writer(writer)
            .listener(listener)
            .build();
}

ChunkListener

ChunkListener fires around each chunk transaction — useful for metrics and per-chunk logging.

@Component
public class OrderChunkListener implements ChunkListener {

    private static final Logger log = LoggerFactory.getLogger(OrderChunkListener.class);
    private Instant chunkStart;
    private long totalChunks = 0;

    @Override
    public void beforeChunk(ChunkContext context) {
        chunkStart = Instant.now();
    }

    @Override
    public void afterChunk(ChunkContext context) {
        long ms = Duration.between(chunkStart, Instant.now()).toMillis();
        totalChunks++;
        StepExecution se = context.getStepContext().getStepExecution();
        log.debug("Chunk {} completed in {}ms — total written so far: {}",
                totalChunks, ms, se.getWriteCount());
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        log.error("Chunk {} failed after {}ms",
                totalChunks,
                Duration.between(chunkStart, Instant.now()).toMillis());
    }
}

Item-Level Listeners

ItemReadListener

@Component
public class OrderReadListener implements ItemReadListener<Order> {

    @Override
    public void beforeRead() {}

    @Override
    public void afterRead(Order item) {
        log.trace("Read order {}", item.getOrderId());
    }

    @Override
    public void onReadError(Exception ex) {
        if (ex instanceof FlatFileParseException pe) {
            log.error("Read error at line {}: [{}] — {}",
                    pe.getLineNumber(), pe.getInput(), pe.getCause().getMessage());
        } else {
            log.error("Unexpected read error: {}", ex.getMessage(), ex);
        }
    }
}

ItemProcessListener

@Component
public class OrderProcessListener implements ItemProcessListener<Order, ProcessedOrder> {

    @Override
    public void beforeProcess(Order item) {}

    @Override
    public void afterProcess(Order item, ProcessedOrder result) {
        if (result == null) {
            log.debug("Order {} filtered during processing", item.getOrderId());
        }
    }

    @Override
    public void onProcessError(Order item, Exception ex) {
        log.error("Processing failed for order {}: {}", item.getOrderId(), ex.getMessage());
    }
}

ItemWriteListener

@Component
public class OrderWriteListener implements ItemWriteListener<ProcessedOrder> {

    @Override
    public void beforeWrite(Chunk<? extends ProcessedOrder> items) {
        log.debug("Writing chunk of {} items", items.size());
    }

    @Override
    public void afterWrite(Chunk<? extends ProcessedOrder> items) {
        log.debug("Successfully wrote {} items", items.size());
    }

    @Override
    public void onWriteError(Exception ex, Chunk<? extends ProcessedOrder> items) {
        log.error("Write failed for {} items: {}", items.size(), ex.getMessage());
        items.forEach(item ->
            log.error("  Failed item: orderId={}", item.getOrderId()));
    }
}

SkipListener — Dead Letter Logging

SkipListener is called when an item is skipped (after exhausting retry attempts). Use it to log skipped items to a dead-letter table for manual review.

@Component
@RequiredArgsConstructor
public class OrderSkipListener implements SkipListener<Order, ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public void onSkipInRead(Throwable t) {
        String input = "";
        int lineNumber = -1;
        if (t instanceof FlatFileParseException pe) {
            input = pe.getInput();
            lineNumber = pe.getLineNumber();
        }
        logToDeadLetter("READ", input, lineNumber, t.getMessage());
    }

    @Override
    public void onSkipInProcess(Order item, Throwable t) {
        logToDeadLetter("PROCESS",
                "orderId=" + item.getOrderId(), -1, t.getMessage());
    }

    @Override
    public void onSkipInWrite(ProcessedOrder item, Throwable t) {
        logToDeadLetter("WRITE",
                "orderId=" + item.getOrderId(), -1, t.getMessage());
    }

    private void logToDeadLetter(String phase, String payload, int lineNumber, String error) {
        jdbcTemplate.update(
                "INSERT INTO batch_dead_letter (phase, payload, line_number, error_message, created_at) " +
                "VALUES (?, ?, ?, ?, NOW())",
                phase, payload, lineNumber, error);
    }
}

Dead-letter table:

CREATE TABLE batch_dead_letter (
    id            BIGINT AUTO_INCREMENT PRIMARY KEY,
    phase         VARCHAR(20)   NOT NULL,
    payload       TEXT,
    line_number   INT           DEFAULT -1,
    error_message VARCHAR(1000),
    created_at    TIMESTAMP     DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Register the skip listener on the step:

.faultTolerant()
.skip(FlatFileParseException.class)
.skip(OrderValidationException.class)
.skipLimit(500)
.listener(orderSkipListener)

Annotation-Based Listeners

Instead of implementing listener interfaces, annotate methods on any bean:

@Component
public class OrderBatchEventHandler {

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        log.info("Job starting: {}", jobExecution.getJobInstance().getJobName());
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        log.info("Job finished: {}", jobExecution.getStatus());
    }

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        log.info("Step starting: {}", stepExecution.getStepName());
    }

    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null; // return null to keep default ExitStatus
    }

    @OnReadError
    public void onReadError(Exception ex) {
        log.error("Read error: {}", ex.getMessage());
    }

    @OnSkipInWrite
    public void onSkipInWrite(Object item, Throwable t) {
        log.warn("Skipped item in write: {}", item);
    }
}

Register the annotated bean as a listener:

new StepBuilder("step", jobRepository)
    .listener(orderBatchEventHandler)  // Spring detects the annotations
    // ...

Metrics with Micrometer

Publish batch metrics to Prometheus (or any Micrometer registry):

@Component
@RequiredArgsConstructor
public class BatchMetricsListener implements JobExecutionListener, StepExecutionListener {

    private final MeterRegistry registry;

    @Override
    public void afterJob(JobExecution jobExecution) {
        String jobName = jobExecution.getJobInstance().getJobName();
        String status   = jobExecution.getStatus().name();

        Timer.builder("batch.job.duration")
                .tag("job", jobName)
                .tag("status", status)
                .register(registry)
                .record(Duration.between(
                        jobExecution.getStartTime(),
                        jobExecution.getEndTime()));

        Counter.builder("batch.job.runs")
                .tag("job", jobName)
                .tag("status", status)
                .register(registry)
                .increment();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        String stepName = stepExecution.getStepName();

        Gauge.builder("batch.step.write.count",
                stepExecution, StepExecution::getWriteCount)
             .tag("step", stepName)
             .register(registry);

        Gauge.builder("batch.step.skip.count",
                stepExecution, StepExecution::getSkipCount)
             .tag("step", stepName)
             .register(registry);

        return null;
    }
}

Complete Listener Setup

@Bean
public Job importOrdersJob(
        JobRepository jobRepository,
        Step importStep,
        ImportJobListener jobListener,
        BatchMetricsListener metricsListener) {

    return new JobBuilder("importOrdersJob", jobRepository)
            .listener(jobListener)
            .listener(metricsListener)
            .start(importStep)
            .build();
}

@Bean
public Step importOrdersStep(
        JobRepository jobRepository,
        PlatformTransactionManager tx,
        FlatFileItemReader<Order> reader,
        ItemProcessor<Order, ProcessedOrder> processor,
        JdbcBatchItemWriter<ProcessedOrder> writer,
        OrderStepListener stepListener,
        OrderChunkListener chunkListener,
        OrderReadListener readListener,
        OrderSkipListener skipListener,
        BatchMetricsListener metricsListener) {

    return new StepBuilder("importOrdersStep", jobRepository)
            .<Order, ProcessedOrder>chunk(200, tx)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .listener(stepListener)
            .listener(chunkListener)
            .listener(readListener)
            .listener(skipListener)
            .listener(metricsListener)
            .faultTolerant()
            .skip(FlatFileParseException.class)
            .skip(OrderValidationException.class)
            .skipLimit(500)
            .build();
}

Key Takeaways

  • Six listener types cover the full lifecycle: Job → Step → Chunk → Read/Process/Write/Skip.
  • SkipListener.onSkipInRead/Process/Write is called after an item exhausts retries. Use it to write dead-letter records.
  • Annotation-based listeners (@BeforeJob, @AfterStep, @OnSkipInWrite, etc.) avoid implementing interfaces — useful when a class serves multiple listener roles.
  • StepExecutionListener.afterStep() can change the step’s ExitStatus for conditional job flow.
  • Register metrics listeners on both the job (for job-level counters/timers) and the step (for per-step gauges).

What’s Next

Part 6 (Listeners) is complete. Article 17 starts Part 7 — Error Handling. You will learn retry logic: configuring retry policies, backoff strategies, and handling transient failures like database deadlocks and HTTP 503 errors.