Advanced Processing: CompositeItemProcessor, External APIs, and Async Processing
Introduction
Real batch jobs often need more than one transformation step per item. You might validate first, then normalize, then enrich, then convert to the output type. CompositeItemProcessor chains multiple single-responsibility processors together. For I/O-bound enrichment steps, AsyncItemProcessor runs processing concurrently on a thread pool — giving you parallelism without rewriting your step as multi-threaded.
CompositeItemProcessor
CompositeItemProcessor chains a list of processors. The output of each processor becomes the input to the next. If any processor returns null, the chain stops and the item is filtered.
Basic chain
@Bean
public CompositeItemProcessor<Order, ProcessedOrder> orderProcessingChain(
OrderValidationProcessor validationProcessor,
OrderNormalizationProcessor normalizationProcessor,
OrderEnrichmentProcessor enrichmentProcessor,
OrderToProcessedOrderProcessor conversionProcessor) {
CompositeItemProcessor<Order, ProcessedOrder> composite =
new CompositeItemProcessor<>();
composite.setDelegates(List.of(
validationProcessor, // Order → Order (or null if invalid)
normalizationProcessor, // Order → Order (uppercase, trim)
enrichmentProcessor, // Order → Order (add customer tier)
conversionProcessor // Order → ProcessedOrder
));
return composite;
}
Type safety note: The generic types of the composite (<Order, ProcessedOrder>) represent the first input and last output. Intermediate processors must have compatible types — each one’s output must match the next one’s input. The intermediate processors in this example are all ItemProcessor<Order, Order>.
Chain with type change in the middle
// Step 1: validate and normalize raw input
@Component
public class RawOrderValidator implements ItemProcessor<RawOrder, Order> {
@Override
public Order process(RawOrder raw) {
if (!raw.isValid()) return null;
return raw.toOrder();
}
}
// Step 2: enrich the clean Order
@Component
public class OrderEnricher implements ItemProcessor<Order, Order> {
@Override
public Order process(Order order) {
// ... enrichment ...
return order;
}
}
// CompositeItemProcessor<RawOrder, Order>
@Bean
public CompositeItemProcessor<RawOrder, Order> pipeline(
RawOrderValidator validator,
OrderEnricher enricher) {
CompositeItemProcessor<RawOrder, Order> chain = new CompositeItemProcessor<>();
chain.setDelegates(List.of(validator, enricher));
return chain;
}
Short-circuit on null
If RawOrderValidator.process() returns null for an invalid item, CompositeItemProcessor short-circuits — OrderEnricher is never called for that item. The item is filtered (counted in filter_count).
Calling External Services from a Processor
Processors often call REST APIs to enrich items — fraud checks, address validation, pricing services. There are three patterns, depending on throughput requirements.
Pattern 1: Synchronous per-item call (simple, low throughput)
@Component
@RequiredArgsConstructor
public class FraudCheckProcessor implements ItemProcessor<Order, Order> {
private final RestClient fraudClient;
@Override
public Order process(Order order) throws Exception {
FraudResult result = fraudClient.post()
.uri("/check")
.body(Map.of("orderId", order.getOrderId(),
"amount", order.getAmount(),
"customerId", order.getCustomerId()))
.retrieve()
.body(FraudResult.class);
if (result.isFraudulent()) {
log.warn("Fraud detected for order {}", order.getOrderId());
return null; // filter out fraudulent orders
}
order.setFraudScore(result.getScore());
return order;
}
}
This works fine for low volumes or when the fraud API is fast. For high volumes, each item blocks a thread while waiting for the HTTP response.
Pattern 2: Batch the external call at chunk level
A custom writer that calls the external API for an entire chunk at once:
@Component
@RequiredArgsConstructor
public class BatchFraudCheckWriter implements ItemWriter<Order> {
private final RestClient fraudClient;
private final JdbcBatchItemWriter<Order> downstreamWriter;
@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
List<Order> orders = (List<Order>) chunk.getItems();
// One API call for the entire chunk
Map<Long, FraudResult> results = fraudClient.post()
.uri("/check/bulk")
.body(orders.stream().map(Order::getOrderId).collect(Collectors.toList()))
.retrieve()
.body(new ParameterizedTypeReference<Map<Long, FraudResult>>() {});
// Filter out fraudulent orders
List<Order> cleanOrders = orders.stream()
.filter(o -> !results.get(o.getOrderId()).isFraudulent())
.peek(o -> o.setFraudScore(results.get(o.getOrderId()).getScore()))
.collect(Collectors.toList());
downstreamWriter.write(new Chunk<>(cleanOrders));
}
}
This is much more efficient — one network round trip per chunk instead of one per item.
Pattern 3: AsyncItemProcessor for concurrent processing
AsyncItemProcessor wraps your processor and submits each item to a TaskExecutor thread pool. The step collects the Future<O> results and passes them to AsyncItemWriter.
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
@Bean
public AsyncItemProcessor<Order, Order> asyncFraudCheckProcessor(
FraudCheckProcessor syncProcessor) throws Exception {
AsyncItemProcessor<Order, Order> async = new AsyncItemProcessor<>();
async.setDelegate(syncProcessor);
async.setTaskExecutor(new SimpleAsyncTaskExecutor()); // or a ThreadPoolTaskExecutor
async.afterPropertiesSet();
return async;
}
@Bean
public AsyncItemWriter<Order> asyncOrderWriter(
JdbcBatchItemWriter<Order> syncWriter) throws Exception {
AsyncItemWriter<Order> async = new AsyncItemWriter<>();
async.setDelegate(syncWriter);
async.afterPropertiesSet();
return async;
}
The step type signature changes to use Future:
@Bean
public Step asyncProcessingStep(
JdbcPagingItemReader<Order> reader,
AsyncItemProcessor<Order, Order> asyncProcessor,
AsyncItemWriter<Order> asyncWriter) {
return new StepBuilder("asyncProcessingStep", jobRepository)
.<Order, Future<Order>>chunk(100, tx) // note: Future<Order>
.reader(reader)
.processor(asyncProcessor)
.writer(asyncWriter)
.build();
}
How it works: AsyncItemProcessor submits each item to the thread pool and returns a Future<O> immediately. The step collects all futures for the chunk. AsyncItemWriter calls future.get() to resolve each result, then writes all resolved items. The step still processes one chunk at a time, but within a chunk the processor calls run concurrently.
Thread safety: your delegate processor must be thread-safe. Avoid shared mutable state. The RestClient itself is thread-safe.
Throughput: for an external call with 200ms latency and 8 threads, throughput improves ~8x compared to sequential processing.
ScriptItemProcessor
For simple transformations, use a ScriptItemProcessor with a Groovy or JavaScript script — useful for configurable transformation rules stored outside the application:
@Bean
public ScriptItemProcessor<Order, Order> scriptOrderProcessor() throws Exception {
ScriptItemProcessor<Order, Order> processor = new ScriptItemProcessor<>();
processor.setScript(new ClassPathResource("processors/order-transform.groovy"));
processor.setItemBindingVariableName("order");
return processor;
}
// order-transform.groovy
if (order.status == 'NEW') {
order.status = 'PENDING'
}
order.amount = order.amount * 1.1 // add 10% tax
return order
Complete Example: Multi-Stage Order Pipeline
Six-stage pipeline using CompositeItemProcessor:
- Parse and validate raw CSV input
- Normalize field formats
- Check for duplicates (filter already-processed orders)
- Enrich with customer tier from cache
- Apply pricing rules (discount/tax)
- Convert to output DTO
// Stage 1 + 2: parse, validate, normalize
@Component
public class ParseAndNormalizeProcessor implements ItemProcessor<RawOrderCsv, Order> {
@Override
public Order process(RawOrderCsv raw) {
if (raw.getCustomerId() == null || raw.getAmount() == null) return null;
Order order = new Order();
order.setCustomerId(Long.parseLong(raw.getCustomerId().trim()));
order.setAmount(new BigDecimal(raw.getAmount().trim()));
order.setOrderDate(LocalDate.parse(raw.getOrderDate().trim()));
order.setStatus(raw.getStatus().trim().toUpperCase());
return order;
}
}
// Stage 3: deduplication filter
@Component
@RequiredArgsConstructor
public class DeduplicationProcessor implements ItemProcessor<Order, Order> {
private final JdbcTemplate jdbc;
private final Set<Long> processedIds = ConcurrentHashMap.newKeySet();
@Override
public Order process(Order order) {
if (processedIds.contains(order.getOrderId())) return null;
boolean exists = Boolean.TRUE.equals(jdbc.queryForObject(
"SELECT COUNT(*) > 0 FROM processed_orders WHERE order_id = ?",
Boolean.class, order.getOrderId()));
if (exists) return null;
processedIds.add(order.getOrderId());
return order;
}
}
// Stage 4: enrichment (cached)
@Component
@RequiredArgsConstructor
public class CustomerTierEnrichmentProcessor implements ItemProcessor<Order, Order> {
private final JdbcTemplate jdbc;
private final Map<Long, String> tierCache = new ConcurrentHashMap<>();
@Override
public Order process(Order order) {
String tier = tierCache.computeIfAbsent(order.getCustomerId(),
id -> jdbc.queryForObject(
"SELECT tier FROM customers WHERE customer_id = ?",
String.class, id));
order.setCustomerTier(tier);
return order;
}
}
// Stage 5: pricing rules
@Component
public class PricingRulesProcessor implements ItemProcessor<Order, Order> {
private static final Map<String, BigDecimal> DISCOUNTS = Map.of(
"GOLD", new BigDecimal("0.10"),
"SILVER", new BigDecimal("0.05"),
"BRONZE", BigDecimal.ZERO
);
@Override
public Order process(Order order) {
BigDecimal discount = DISCOUNTS.getOrDefault(order.getCustomerTier(), BigDecimal.ZERO);
order.setFinalAmount(order.getAmount().multiply(BigDecimal.ONE.subtract(discount)));
return order;
}
}
// Stage 6: output conversion
@Component
public class OrderToProcessedOrderConverter implements ItemProcessor<Order, ProcessedOrder> {
@Override
public ProcessedOrder process(Order order) {
return new ProcessedOrder(
order.getOrderId(),
order.getCustomerId(),
order.getFinalAmount(),
order.getOrderDate(),
"PROCESSED",
order.getCustomerTier(),
LocalDateTime.now()
);
}
}
// Compose the pipeline
@Bean
public CompositeItemProcessor<RawOrderCsv, ProcessedOrder> fullOrderPipeline(
ParseAndNormalizeProcessor parseNormalize,
DeduplicationProcessor dedup,
CustomerTierEnrichmentProcessor tierEnrich,
PricingRulesProcessor pricing,
OrderToProcessedOrderConverter convert) {
CompositeItemProcessor<RawOrderCsv, ProcessedOrder> pipeline =
new CompositeItemProcessor<>();
pipeline.setDelegates(List.of(parseNormalize, dedup, tierEnrich, pricing, convert));
return pipeline;
}
@Bean
public Step fullOrderProcessingStep(
FlatFileItemReader<RawOrderCsv> csvReader,
CompositeItemProcessor<RawOrderCsv, ProcessedOrder> pipeline,
JdbcBatchItemWriter<ProcessedOrder> writer) {
return new StepBuilder("fullOrderProcessingStep", jobRepository)
.<RawOrderCsv, ProcessedOrder>chunk(200, tx)
.reader(csvReader)
.processor(pipeline)
.writer(writer)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100)
.build();
}
Key Takeaways
CompositeItemProcessorchains processors in order. Each processor’s output is the next one’s input. Anullat any stage short-circuits the chain for that item.- Keep each processor focused on one responsibility — it makes testing, debugging, and reordering easier.
- For external API calls: use synchronous per-item calls for low volume, bulk API calls at the writer level for high volume, or
AsyncItemProcessorfor concurrent I/O-bound processing. AsyncItemProcessorrequiresAsyncItemWriterand changes the step’s output type toFuture<O>.- Thread safety matters in async and multi-threaded contexts — use
ConcurrentHashMapfor shared caches and avoid mutable instance fields without synchronisation.
What’s Next
Part 4 (Processors) is complete. Article 13 starts Part 5 — Job and Step Configuration. You will learn how to build multi-step jobs with conditional flows, decision steps, and parallel flows using Spring Batch’s step builder DSL.