Transforming Data with ItemProcessor: Validation, Filtering, and Enrichment

Introduction

ItemProcessor<I, O> sits between the reader and the writer. It receives one item at a time and returns a transformed item — or null to filter the item out entirely. Processors are optional: if your job reads and writes the same type with no transformation needed, you can omit them.

The processor interface is one method:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

Return a processed item to pass it to the writer. Return null to skip it — the item is counted in filter_count in BATCH_STEP_EXECUTION and never reaches the writer.


Validation

Bean Validation with @Valid

The easiest validation approach combines Jakarta Bean Validation annotations on your domain object with Spring’s ValidatingItemProcessor.

Add the validator dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-validation</artifactId>
</dependency>

Annotate your domain object:

@Data
public class Order {
    @NotNull
    private Long customerId;

    @NotNull
    @DecimalMin(value = "0.01", message = "Amount must be positive")
    @DecimalMax(value = "999999.99")
    private BigDecimal amount;

    @NotNull
    @PastOrPresent
    private LocalDate orderDate;

    @NotBlank
    @Pattern(regexp = "PENDING|COMPLETED|SHIPPED|CANCELLED")
    private String status;
}

Wire up the processor:

@Bean
public ValidatingItemProcessor<Order> validatingOrderProcessor(
        Validator validator) {
    ValidatingItemProcessor<Order> processor = new ValidatingItemProcessor<>(
            new SpringValidator<>(validator));
    processor.setFilter(false); // false = throw on invalid, true = skip silently
    return processor;
}

With setFilter(false) (the default), validation failures throw ValidationException. With setFilter(true), invalid items are silently skipped (filter_count incremented). Use setFilter(true) + a SkipListener if you want to log invalid items but continue processing.

Custom validation processor

For business rule validation that goes beyond annotation constraints:

@Component
public class OrderValidationProcessor implements ItemProcessor<Order, Order> {

    private static final Logger log = LoggerFactory.getLogger(OrderValidationProcessor.class);

    @Override
    public Order process(Order order) throws Exception {
        // Business rule: orders over $10,000 need a manual approval flag
        if (order.getAmount().compareTo(new BigDecimal("10000")) > 0
                && !order.isManuallyApproved()) {
            log.warn("Filtering high-value unapproved order {}: amount={}",
                    order.getOrderId(), order.getAmount());
            return null; // filter out
        }

        // Business rule: order date cannot be more than 30 days in the past
        if (order.getOrderDate().isBefore(LocalDate.now().minusDays(30))) {
            log.warn("Filtering stale order {}: orderDate={}", order.getOrderId(), order.getOrderDate());
            return null;
        }

        return order;
    }
}

Transformation

Type conversion

When reader output type differs from writer input type:

public class OrderToCsvRecordProcessor implements ItemProcessor<Order, OrderCsvRecord> {

    private static final DateTimeFormatter FMT =
            DateTimeFormatter.ofPattern("dd/MM/yyyy");

    @Override
    public OrderCsvRecord process(Order order) {
        return new OrderCsvRecord(
                order.getOrderId().toString(),
                order.getCustomerId().toString(),
                String.format("%.2f", order.getAmount()),
                order.getOrderDate().format(FMT),
                order.getStatus()
        );
    }
}

Field normalization

@Component
public class OrderNormalizationProcessor implements ItemProcessor<Order, Order> {

    @Override
    public Order process(Order order) {
        // Normalize status to uppercase
        order.setStatus(order.getStatus().trim().toUpperCase());

        // Normalize customer ID (remove leading zeros from imported string IDs)
        if (order.getRawCustomerId() != null) {
            order.setCustomerId(Long.parseLong(order.getRawCustomerId().stripLeading("0")));
        }

        return order;
    }
}

Enrichment

Enrichment processors add data from secondary sources — databases, caches, or external services — to each item.

Database lookup enrichment

@Component
@RequiredArgsConstructor
public class OrderEnrichmentProcessor implements ItemProcessor<Order, Order> {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public Order process(Order order) {
        // Look up customer tier from DB
        String tier = jdbcTemplate.queryForObject(
                "SELECT tier FROM customers WHERE customer_id = ?",
                String.class,
                order.getCustomerId());

        order.setCustomerTier(tier);

        // Apply tier-based discount
        if ("GOLD".equals(tier)) {
            order.setAmount(order.getAmount().multiply(new BigDecimal("0.90")));
        } else if ("SILVER".equals(tier)) {
            order.setAmount(order.getAmount().multiply(new BigDecimal("0.95")));
        }

        return order;
    }
}

Warning: this executes one SELECT per item — for a large chunk this is N queries. In a multi-threaded step this is N * thread-count concurrent queries. Cache the customer data to avoid this:

Cache-backed enrichment

@Component
@RequiredArgsConstructor
public class CachedOrderEnrichmentProcessor implements ItemProcessor<Order, Order> {

    private final JdbcTemplate jdbcTemplate;

    // Simple per-step cache — populated on first access, cleared when step ends
    private final Map<Long, String> customerTierCache = new ConcurrentHashMap<>();

    @Override
    public Order process(Order order) {
        String tier = customerTierCache.computeIfAbsent(
                order.getCustomerId(),
                id -> jdbcTemplate.queryForObject(
                        "SELECT tier FROM customers WHERE customer_id = ?",
                        String.class, id));

        order.setCustomerTier(tier);
        return order;
    }

    // Clear the cache when the step ends to avoid stale data across restarts
    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        customerTierCache.clear();
        return null;
    }
}

Register the processor as a StepExecutionListener in the step to get the @AfterStep callback:

.listener(cachedOrderEnrichmentProcessor)

For large lookup tables, load the entire reference table at step open time instead of computing-if-absent:

@Override
public void open(ExecutionContext ctx) {
    jdbcTemplate.query("SELECT customer_id, tier FROM customers",
            rs -> customerTierCache.put(rs.getLong(1), rs.getString(2)));
}

Filtering vs Skipping

Two different mechanisms for removing items from the output:

Filter (return null)Skip (exception + skip config)
HowProcessor returns nullException thrown, caught by faultTolerant
Count tracked infilter_countskip_count
Use whenItem is valid but not neededItem is malformed or violates a constraint
ExampleOrder already processedCSV line has wrong column count
// Filter — item deliberately excluded, no error
@Override
public Order process(Order order) {
    if (alreadyProcessed(order.getOrderId())) {
        return null;  // filtered, not an error
    }
    return order;
}

// Skip — in step configuration, not in processor
.faultTolerant()
.skip(OrderValidationException.class)
.skipLimit(100)

Complete Example: Order Processing Processor Chain

@Component
@RequiredArgsConstructor
public class OrderProcessingProcessor implements ItemProcessor<Order, ProcessedOrder> {

    private final JdbcTemplate jdbcTemplate;
    private final Validator validator;
    private final Map<Long, String> tierCache = new ConcurrentHashMap<>();

    @Override
    public ProcessedOrder process(Order order) throws Exception {
        // Step 1: Bean validation
        Set<ConstraintViolation<Order>> violations = validator.validate(order);
        if (!violations.isEmpty()) {
            String msg = violations.stream()
                    .map(v -> v.getPropertyPath() + ": " + v.getMessage())
                    .collect(Collectors.joining(", "));
            throw new OrderValidationException("Invalid order " + order.getOrderId() + ": " + msg);
        }

        // Step 2: Filter already-processed orders
        boolean exists = Boolean.TRUE.equals(jdbcTemplate.queryForObject(
                "SELECT COUNT(*) > 0 FROM processed_orders WHERE order_id = ?",
                Boolean.class, order.getOrderId()));
        if (exists) return null;

        // Step 3: Enrich with customer tier
        String tier = tierCache.computeIfAbsent(order.getCustomerId(),
                id -> jdbcTemplate.queryForObject(
                        "SELECT tier FROM customers WHERE customer_id = ?",
                        String.class, id));

        // Step 4: Calculate final amount with discount
        BigDecimal discount = switch (tier) {
            case "GOLD"   -> new BigDecimal("0.10");
            case "SILVER" -> new BigDecimal("0.05");
            default       -> BigDecimal.ZERO;
        };
        BigDecimal finalAmount = order.getAmount()
                .subtract(order.getAmount().multiply(discount));

        return new ProcessedOrder(
                order.getOrderId(),
                order.getCustomerId(),
                finalAmount,
                order.getOrderDate(),
                "PROCESSED",
                tier,
                discount
        );
    }
}

Wire into the step with skip on validation errors:

@Bean
public Step processOrdersStep(
        JdbcPagingItemReader<Order> reader,
        OrderProcessingProcessor processor,
        JdbcBatchItemWriter<ProcessedOrder> writer) {

    return new StepBuilder("processOrdersStep", jobRepository)
            .<Order, ProcessedOrder>chunk(100, tx)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .listener(processor)    // for @AfterStep cache clear
            .faultTolerant()
            .skip(OrderValidationException.class)
            .skipLimit(500)
            .build();
}

Key Takeaways

  • ItemProcessor.process() returns the transformed item or null to filter. null increments filter_count, not skip_count.
  • Use ValidatingItemProcessor + Bean Validation annotations for declarative validation. Use a custom processor for business rule validation.
  • Enrichment processors that query a database on every item are an N+1 problem. Load reference data into a Map at step start or use computeIfAbsent with a per-step cache.
  • Distinguish between filtering (deliberate exclusion, return null) and skipping (error handling for bad data, exception + faultTolerant config).
  • Keep processors focused on one responsibility. Chain them with CompositeItemProcessor (Article 12) rather than putting all logic in one class.

What’s Next

Article 12 covers CompositeItemProcessor for chaining multiple processors, calling external APIs asynchronously within a processor, and advanced filtering patterns.