Transforming Data with ItemProcessor: Validation, Filtering, and Enrichment
Introduction
ItemProcessor<I, O> sits between the reader and the writer. It receives one item at a time and returns a transformed item — or null to filter the item out entirely. Processors are optional: if your job reads and writes the same type with no transformation needed, you can omit them.
The processor interface is one method:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
Return a processed item to pass it to the writer. Return null to skip it — the item is counted in filter_count in BATCH_STEP_EXECUTION and never reaches the writer.
Validation
Bean Validation with @Valid
The easiest validation approach combines Jakarta Bean Validation annotations on your domain object with Spring’s ValidatingItemProcessor.
Add the validator dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
Annotate your domain object:
@Data
public class Order {
@NotNull
private Long customerId;
@NotNull
@DecimalMin(value = "0.01", message = "Amount must be positive")
@DecimalMax(value = "999999.99")
private BigDecimal amount;
@NotNull
@PastOrPresent
private LocalDate orderDate;
@NotBlank
@Pattern(regexp = "PENDING|COMPLETED|SHIPPED|CANCELLED")
private String status;
}
Wire up the processor:
@Bean
public ValidatingItemProcessor<Order> validatingOrderProcessor(
Validator validator) {
ValidatingItemProcessor<Order> processor = new ValidatingItemProcessor<>(
new SpringValidator<>(validator));
processor.setFilter(false); // false = throw on invalid, true = skip silently
return processor;
}
With setFilter(false) (the default), validation failures throw ValidationException. With setFilter(true), invalid items are silently skipped (filter_count incremented). Use setFilter(true) + a SkipListener if you want to log invalid items but continue processing.
Custom validation processor
For business rule validation that goes beyond annotation constraints:
@Component
public class OrderValidationProcessor implements ItemProcessor<Order, Order> {
private static final Logger log = LoggerFactory.getLogger(OrderValidationProcessor.class);
@Override
public Order process(Order order) throws Exception {
// Business rule: orders over $10,000 need a manual approval flag
if (order.getAmount().compareTo(new BigDecimal("10000")) > 0
&& !order.isManuallyApproved()) {
log.warn("Filtering high-value unapproved order {}: amount={}",
order.getOrderId(), order.getAmount());
return null; // filter out
}
// Business rule: order date cannot be more than 30 days in the past
if (order.getOrderDate().isBefore(LocalDate.now().minusDays(30))) {
log.warn("Filtering stale order {}: orderDate={}", order.getOrderId(), order.getOrderDate());
return null;
}
return order;
}
}
Transformation
Type conversion
When reader output type differs from writer input type:
public class OrderToCsvRecordProcessor implements ItemProcessor<Order, OrderCsvRecord> {
private static final DateTimeFormatter FMT =
DateTimeFormatter.ofPattern("dd/MM/yyyy");
@Override
public OrderCsvRecord process(Order order) {
return new OrderCsvRecord(
order.getOrderId().toString(),
order.getCustomerId().toString(),
String.format("%.2f", order.getAmount()),
order.getOrderDate().format(FMT),
order.getStatus()
);
}
}
Field normalization
@Component
public class OrderNormalizationProcessor implements ItemProcessor<Order, Order> {
@Override
public Order process(Order order) {
// Normalize status to uppercase
order.setStatus(order.getStatus().trim().toUpperCase());
// Normalize customer ID (remove leading zeros from imported string IDs)
if (order.getRawCustomerId() != null) {
order.setCustomerId(Long.parseLong(order.getRawCustomerId().stripLeading("0")));
}
return order;
}
}
Enrichment
Enrichment processors add data from secondary sources — databases, caches, or external services — to each item.
Database lookup enrichment
@Component
@RequiredArgsConstructor
public class OrderEnrichmentProcessor implements ItemProcessor<Order, Order> {
private final JdbcTemplate jdbcTemplate;
@Override
public Order process(Order order) {
// Look up customer tier from DB
String tier = jdbcTemplate.queryForObject(
"SELECT tier FROM customers WHERE customer_id = ?",
String.class,
order.getCustomerId());
order.setCustomerTier(tier);
// Apply tier-based discount
if ("GOLD".equals(tier)) {
order.setAmount(order.getAmount().multiply(new BigDecimal("0.90")));
} else if ("SILVER".equals(tier)) {
order.setAmount(order.getAmount().multiply(new BigDecimal("0.95")));
}
return order;
}
}
Warning: this executes one SELECT per item — for a large chunk this is N queries. In a multi-threaded step this is N * thread-count concurrent queries. Cache the customer data to avoid this:
Cache-backed enrichment
@Component
@RequiredArgsConstructor
public class CachedOrderEnrichmentProcessor implements ItemProcessor<Order, Order> {
private final JdbcTemplate jdbcTemplate;
// Simple per-step cache — populated on first access, cleared when step ends
private final Map<Long, String> customerTierCache = new ConcurrentHashMap<>();
@Override
public Order process(Order order) {
String tier = customerTierCache.computeIfAbsent(
order.getCustomerId(),
id -> jdbcTemplate.queryForObject(
"SELECT tier FROM customers WHERE customer_id = ?",
String.class, id));
order.setCustomerTier(tier);
return order;
}
// Clear the cache when the step ends to avoid stale data across restarts
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
customerTierCache.clear();
return null;
}
}
Register the processor as a StepExecutionListener in the step to get the @AfterStep callback:
.listener(cachedOrderEnrichmentProcessor)
For large lookup tables, load the entire reference table at step open time instead of computing-if-absent:
@Override
public void open(ExecutionContext ctx) {
jdbcTemplate.query("SELECT customer_id, tier FROM customers",
rs -> customerTierCache.put(rs.getLong(1), rs.getString(2)));
}
Filtering vs Skipping
Two different mechanisms for removing items from the output:
| Filter (return null) | Skip (exception + skip config) | |
|---|---|---|
| How | Processor returns null | Exception thrown, caught by faultTolerant |
| Count tracked in | filter_count | skip_count |
| Use when | Item is valid but not needed | Item is malformed or violates a constraint |
| Example | Order already processed | CSV line has wrong column count |
// Filter — item deliberately excluded, no error
@Override
public Order process(Order order) {
if (alreadyProcessed(order.getOrderId())) {
return null; // filtered, not an error
}
return order;
}
// Skip — in step configuration, not in processor
.faultTolerant()
.skip(OrderValidationException.class)
.skipLimit(100)
Complete Example: Order Processing Processor Chain
@Component
@RequiredArgsConstructor
public class OrderProcessingProcessor implements ItemProcessor<Order, ProcessedOrder> {
private final JdbcTemplate jdbcTemplate;
private final Validator validator;
private final Map<Long, String> tierCache = new ConcurrentHashMap<>();
@Override
public ProcessedOrder process(Order order) throws Exception {
// Step 1: Bean validation
Set<ConstraintViolation<Order>> violations = validator.validate(order);
if (!violations.isEmpty()) {
String msg = violations.stream()
.map(v -> v.getPropertyPath() + ": " + v.getMessage())
.collect(Collectors.joining(", "));
throw new OrderValidationException("Invalid order " + order.getOrderId() + ": " + msg);
}
// Step 2: Filter already-processed orders
boolean exists = Boolean.TRUE.equals(jdbcTemplate.queryForObject(
"SELECT COUNT(*) > 0 FROM processed_orders WHERE order_id = ?",
Boolean.class, order.getOrderId()));
if (exists) return null;
// Step 3: Enrich with customer tier
String tier = tierCache.computeIfAbsent(order.getCustomerId(),
id -> jdbcTemplate.queryForObject(
"SELECT tier FROM customers WHERE customer_id = ?",
String.class, id));
// Step 4: Calculate final amount with discount
BigDecimal discount = switch (tier) {
case "GOLD" -> new BigDecimal("0.10");
case "SILVER" -> new BigDecimal("0.05");
default -> BigDecimal.ZERO;
};
BigDecimal finalAmount = order.getAmount()
.subtract(order.getAmount().multiply(discount));
return new ProcessedOrder(
order.getOrderId(),
order.getCustomerId(),
finalAmount,
order.getOrderDate(),
"PROCESSED",
tier,
discount
);
}
}
Wire into the step with skip on validation errors:
@Bean
public Step processOrdersStep(
JdbcPagingItemReader<Order> reader,
OrderProcessingProcessor processor,
JdbcBatchItemWriter<ProcessedOrder> writer) {
return new StepBuilder("processOrdersStep", jobRepository)
.<Order, ProcessedOrder>chunk(100, tx)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(processor) // for @AfterStep cache clear
.faultTolerant()
.skip(OrderValidationException.class)
.skipLimit(500)
.build();
}
Key Takeaways
ItemProcessor.process()returns the transformed item ornullto filter.nullincrementsfilter_count, notskip_count.- Use
ValidatingItemProcessor+ Bean Validation annotations for declarative validation. Use a custom processor for business rule validation. - Enrichment processors that query a database on every item are an N+1 problem. Load reference data into a
Mapat step start or usecomputeIfAbsentwith a per-step cache. - Distinguish between filtering (deliberate exclusion, return null) and skipping (error handling for bad data, exception + faultTolerant config).
- Keep processors focused on one responsibility. Chain them with
CompositeItemProcessor(Article 12) rather than putting all logic in one class.
What’s Next
Article 12 covers CompositeItemProcessor for chaining multiple processors, calling external APIs asynchronously within a processor, and advanced filtering patterns.