Configuring Jobs and Steps: Flows, Decisions, and Conditional Execution

Introduction

So far every example has had a single step. Real batch jobs usually have multiple steps — validate input, import data, generate a report, send a notification, clean up temp files. Spring Batch’s JobBuilder DSL lets you compose these steps into flows with conditional branching, parallel execution, and decision logic.

This article covers:

  • Linear multi-step jobs
  • Conditional step transitions using ExitStatus
  • JobExecutionDecider for runtime branching
  • Parallel flows with split
  • Nested jobs with FlowStep
  • The Flow abstraction for reusable step sequences

Linear Multi-Step Job

The simplest multi-step job runs steps in sequence. If any step fails, the job stops.

@Bean
public Job importOrdersJob(JobRepository jobRepository,
                            Step downloadStep,
                            Step validateStep,
                            Step importStep,
                            Step reportStep) {
    return new JobBuilder("importOrdersJob", jobRepository)
            .start(downloadStep)
            .next(validateStep)
            .next(importStep)
            .next(reportStep)
            .build();
}

By default, if validateStep returns ExitStatus.FAILED, the job stops — importStep and reportStep do not run.


Conditional Step Transitions

Use .on(exitCode).to(nextStep) to branch based on a step’s ExitStatus.

@Bean
public Job importOrdersJob(JobRepository jobRepository,
                            Step downloadStep,
                            Step validateStep,
                            Step importStep,
                            Step cleanupStep,
                            Step notifyFailureStep) {

    return new JobBuilder("importOrdersJob", jobRepository)
            .start(downloadStep)
            .next(validateStep)
            .on("FAILED").to(notifyFailureStep)   // if validate fails → notify
            .from(validateStep).on("*").to(importStep)  // otherwise → import
            .from(importStep).on("COMPLETED").to(cleanupStep)
            .from(importStep).on("FAILED").to(notifyFailureStep)
            .end()
            .build();
}

Pattern matching rules for .on(pattern):

PatternMatches
"COMPLETED"Exact match
"FAILED"Exact match
"*"Any exit status
"PARTIAL_*"Any status starting with PARTIAL_
"C*"Any status starting with C

Patterns are evaluated in declaration order. The first match wins.

Ending the job from a transition

.on("FAILED").end()             // job ends with COMPLETED status (logical end)
.on("FAILED").fail()            // job ends with FAILED status
.on("FAILED").stopAndRestart(someStep)  // job stops, restarts from someStep

Custom ExitStatus from a Listener

Set a custom exit status in a StepExecutionListener to drive conditional branching:

@Component
public class ValidationStepListener implements StepExecutionListener {

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        long skipCount = stepExecution.getSkipCount();
        long writeCount = stepExecution.getWriteCount();

        if (skipCount == 0) {
            return new ExitStatus("CLEAN");          // no skips
        } else if ((double) skipCount / (skipCount + writeCount) > 0.1) {
            return new ExitStatus("HIGH_ERROR_RATE"); // >10% errors
        } else {
            return new ExitStatus("COMPLETED_WITH_SKIPS");
        }
    }
}
.from(validateStep).on("CLEAN").to(fullImportStep)
.from(validateStep).on("COMPLETED_WITH_SKIPS").to(partialImportStep)
.from(validateStep).on("HIGH_ERROR_RATE").to(alertStep)

JobExecutionDecider

A JobExecutionDecider makes a routing decision at runtime, outside of any step. Use it when the branching logic depends on job-level state — for example, checking a database flag, reading a configuration value, or inspecting how many records a previous step processed.

@Component
public class OrderVolumeDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution,
                                       StepExecution stepExecution) {
        // stepExecution is the last completed step
        long writeCount = stepExecution.getWriteCount();

        if (writeCount == 0) {
            return new FlowExecutionStatus("NO_DATA");
        } else if (writeCount > 100_000) {
            return new FlowExecutionStatus("HIGH_VOLUME");
        } else {
            return new FlowExecutionStatus("NORMAL");
        }
    }
}
@Bean
public Job importOrdersJob(JobRepository jobRepository,
                            Step importStep,
                            Step lightReportStep,
                            Step heavyReportStep,
                            Step noDataStep,
                            OrderVolumeDecider decider) {

    return new JobBuilder("importOrdersJob", jobRepository)
            .start(importStep)
            .next(decider)                         // decision after importStep
            .on("NO_DATA").to(noDataStep)
            .from(decider).on("HIGH_VOLUME").to(heavyReportStep)
            .from(decider).on("NORMAL").to(lightReportStep)
            .end()
            .build();
}

Parallel Flows with Split

Run independent steps concurrently using .split(). Each branch runs in a separate thread.

@Bean
public Job parallelReportJob(JobRepository jobRepository,
                              Step importStep,
                              Step salesReportStep,
                              Step inventoryReportStep,
                              Step customerReportStep,
                              TaskExecutor taskExecutor) {

    // Define parallel flows
    Flow salesFlow = new FlowBuilder<SimpleFlow>("salesFlow")
            .start(salesReportStep)
            .build();

    Flow inventoryFlow = new FlowBuilder<SimpleFlow>("inventoryFlow")
            .start(inventoryReportStep)
            .build();

    Flow customerFlow = new FlowBuilder<SimpleFlow>("customerFlow")
            .start(customerReportStep)
            .build();

    // Run all three flows in parallel after importStep
    Flow parallelReportFlow = new FlowBuilder<SimpleFlow>("parallelReportFlow")
            .split(taskExecutor)
            .add(salesFlow, inventoryFlow, customerFlow)
            .build();

    return new JobBuilder("parallelReportJob", jobRepository)
            .start(importStep)
            .next(parallelReportFlow)
            .end()
            .build();
}

@Bean
public TaskExecutor parallelFlowExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(3);
    executor.setMaxPoolSize(3);
    executor.setThreadNamePrefix("parallel-flow-");
    executor.initialize();
    return executor;
}

The job waits for all parallel flows to complete before proceeding. If any flow fails, the job fails.

Important: steps in parallel flows must not share mutable state or the same database rows — race conditions will cause data corruption or deadlocks.


Reusable Flows

Extract commonly used step sequences into a Flow bean and reuse them across jobs.

@Bean
public Flow notificationFlow(Step sendEmailStep, Step sendSlackStep) {
    return new FlowBuilder<SimpleFlow>("notificationFlow")
            .start(sendEmailStep)
            .next(sendSlackStep)
            .build();
}

@Bean
public Flow cleanupFlow(Step deleteTempFilesStep, Step archiveOutputStep) {
    return new FlowBuilder<SimpleFlow>("cleanupFlow")
            .start(deleteTempFilesStep)
            .next(archiveOutputStep)
            .build();
}

// Reuse in multiple jobs
@Bean
public Job importJobA(JobRepository jobRepository,
                       Step importAStep,
                       Flow notificationFlow,
                       Flow cleanupFlow) {
    return new JobBuilder("importJobA", jobRepository)
            .start(importAStep)
            .next(notificationFlow)
            .on("*").end()
            .build();
}

FlowStep — Nested Job-Within-a-Job

FlowStep wraps a Flow as a step. This lets you nest complex flows inside a larger job:

@Bean
public Step reportingFlowStep(Flow reportingFlow, JobRepository jobRepository) {
    return new StepBuilder("reportingFlowStep", jobRepository)
            .flow(reportingFlow)
            .build();
}

@Bean
public Job masterJob(JobRepository jobRepository,
                      Step importStep,
                      Step reportingFlowStep) {
    return new JobBuilder("masterJob", jobRepository)
            .start(importStep)
            .next(reportingFlowStep)
            .build();
}

Complete Example: Order Import Job with Full Orchestration

@Configuration
@RequiredArgsConstructor
public class OrderImportJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;
    private final DataSource dataSource;

    // ── Steps ───────────────────────────────────────────────────────────────

    @Bean
    public Step downloadFilesStep() {
        return new StepBuilder("downloadFilesStep", jobRepository)
                .tasklet(new S3DownloadTasklet(...), tx)
                .build();
    }

    @Bean
    public Step validateSchemaStep() {
        return new StepBuilder("validateSchemaStep", jobRepository)
                .<RawOrder, RawOrder>chunk(1000, tx)
                .reader(rawOrderCsvReader())
                .processor(schemaValidationProcessor())
                .writer(validOrderWriter())
                .listener(validationListener())
                .faultTolerant()
                .skip(ValidationException.class)
                .skipLimit(Integer.MAX_VALUE)
                .build();
    }

    @Bean
    public Step importOrdersStep() {
        return new StepBuilder("importOrdersStep", jobRepository)
                .<Order, ProcessedOrder>chunk(500, tx)
                .reader(validatedOrderReader())
                .processor(fullOrderPipeline())
                .writer(processedOrderWriter())
                .build();
    }

    @Bean
    public Step generateSummaryReportStep() { /* ... */ }
    @Bean
    public Step sendSuccessNotificationStep() { /* ... */ }
    @Bean
    public Step sendFailureNotificationStep() { /* ... */ }
    @Bean
    public Step cleanupTempFilesStep() { /* ... */ }

    // ── Decider ─────────────────────────────────────────────────────────────

    @Bean
    public JobExecutionDecider validationResultDecider() {
        return (jobExecution, stepExecution) -> {
            long skipCount = stepExecution.getSkipCount();
            if (skipCount == 0) return new FlowExecutionStatus("VALID");
            if (skipCount > 1000) return new FlowExecutionStatus("TOO_MANY_ERRORS");
            return new FlowExecutionStatus("VALID_WITH_WARNINGS");
        };
    }

    // ── Job ─────────────────────────────────────────────────────────────────

    @Bean
    public Job orderImportJob() {
        return new JobBuilder("orderImportJob", jobRepository)
                .start(downloadFilesStep())
                .next(validateSchemaStep())
                .next(validationResultDecider())
                    .on("TOO_MANY_ERRORS").to(sendFailureNotificationStep())
                .from(validationResultDecider())
                    .on("VALID").to(importOrdersStep())
                .from(validationResultDecider())
                    .on("VALID_WITH_WARNINGS").to(importOrdersStep())
                .from(importOrdersStep())
                    .on("COMPLETED").to(generateSummaryReportStep())
                .from(importOrdersStep())
                    .on("FAILED").to(sendFailureNotificationStep())
                .from(generateSummaryReportStep())
                    .on("*").to(sendSuccessNotificationStep())
                .from(sendSuccessNotificationStep())
                    .on("*").to(cleanupTempFilesStep())
                .from(sendFailureNotificationStep())
                    .on("*").to(cleanupTempFilesStep())
                .end()
                .build();
    }
}

Key Takeaways

  • Use .next() for linear flows. Use .on().to() / .from() for conditional branching based on ExitStatus.
  • Set custom ExitStatus in a StepExecutionListener.afterStep() to drive branching on business outcomes.
  • Use JobExecutionDecider for runtime routing that reads job-level state (counters, flags, configuration).
  • .split() runs parallel flows concurrently. Steps in different branches must be independent.
  • Extract reusable step sequences into Flow beans. Wrap them in a FlowStep to nest them in a larger job.

What’s Next

Article 14 covers JobParameters, ExecutionContext, and job restartability — how to pass runtime parameters into jobs, share state between steps, and design jobs that restart cleanly after failure.