Advanced Writers: JpaItemWriter, CompositeItemWriter, and Custom Writers
Introduction
The previous article covered the two most common writers: FlatFileItemWriter for files and JdbcBatchItemWriter for database inserts. This article covers advanced scenarios:
- Persisting JPA entities with
JpaItemWriter - Writing to multiple destinations simultaneously with
CompositeItemWriter - Routing items to different writers based on content with
ClassifierCompositeItemWriter - Building custom writers for REST APIs, message queues, and cloud storage
- Combining a writer with a post-step cleanup tasklet
JpaItemWriter
JpaItemWriter calls EntityManager.merge() on each item. It is appropriate when your output is a managed JPA entity and you want Hibernate to handle the INSERT vs UPDATE decision automatically (merge = insert if new, update if existing).
@Bean
public JpaItemWriter<Order> jpaOrderWriter(EntityManagerFactory emf) {
JpaItemWriter<Order> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
When to use JpaItemWriter vs JdbcBatchItemWriter
JpaItemWriter | JdbcBatchItemWriter | |
|---|---|---|
| SQL control | None — Hibernate generates SQL | Full control |
| Associations | Cascades respected | Must handle manually |
| Performance | Lower — entity lifecycle overhead | Higher — raw JDBC batch |
| Upsert logic | merge() handles new vs existing | Use ON DUPLICATE KEY |
| Batch mode | Needs hibernate.jdbc.batch_size | Automatic via executeBatch() |
For bulk inserts on large tables, JdbcBatchItemWriter with rewriteBatchedStatements=true is significantly faster. Use JpaItemWriter when you need Hibernate cascade operations or auditing (@PrePersist, @PreUpdate).
Enabling Hibernate batch mode for JpaItemWriter
spring.jpa.properties.hibernate.jdbc.batch_size=50
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
spring.jpa.properties.hibernate.generate_statistics=false
Without batch_size, Hibernate sends one SQL statement per entity — equivalent to not batching at all.
CompositeItemWriter
CompositeItemWriter fans out to multiple writers in sequence. Every writer receives the same chunk of items. Use it when you need to write to two destinations at once — for example, insert into a database and also write a CSV audit file.
@Bean
public CompositeItemWriter<Order> compositeOrderWriter(
JdbcBatchItemWriter<Order> dbWriter,
FlatFileItemWriter<Order> auditCsvWriter) {
CompositeItemWriter<Order> composite = new CompositeItemWriter<>();
composite.setDelegates(List.of(dbWriter, auditCsvWriter));
return composite;
}
All delegates receive the same list of items. If any delegate throws, the chunk transaction rolls back. Order matters: put the most critical writer first — if the second writer fails you want the transaction to roll back the first writer’s work too.
CompositeItemWriter with different output types
If your processor outputs a wrapper object, each delegate extracts what it needs:
@Data
@AllArgsConstructor
public class OrderResult {
private Order order; // for DB insert
private OrderAuditRecord audit; // for audit CSV
}
@Bean
public ItemWriter<OrderResult> dbInsertWriter(DataSource dataSource) {
JdbcBatchItemWriter<Order> delegate = new JdbcBatchItemWriterBuilder<Order>()
.dataSource(dataSource)
.sql("INSERT INTO orders ...")
.beanMapped()
.build();
// Wrap: extract Order from OrderResult before passing to delegate
return chunk -> delegate.write(
new Chunk<>(chunk.getItems().stream()
.map(OrderResult::getOrder)
.collect(Collectors.toList()))
);
}
ClassifierCompositeItemWriter
ClassifierCompositeItemWriter routes each item to a different writer based on a Classifier<C, T> function. Use it when different item types need different destinations.
Routing orders by status
@Bean
public ClassifierCompositeItemWriter<Order> routingWriter(
JdbcBatchItemWriter<Order> completedWriter,
JdbcBatchItemWriter<Order> pendingWriter,
JdbcBatchItemWriter<Order> cancelledWriter) {
BackToBackPatternClassifier<Order, ItemWriter<? super Order>> classifier =
new BackToBackPatternClassifier<>();
classifier.setRouterDelegate((Order order) -> order.getStatus());
classifier.setMatcherMap(Map.of(
"COMPLETED", completedWriter,
"PENDING", pendingWriter,
"CANCELLED", cancelledWriter
));
ClassifierCompositeItemWriter<Order> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier(classifier);
return writer;
}
Each item is classified and sent to exactly one writer. Unlike CompositeItemWriter, items are not broadcast — each item goes to one destination only.
Simpler classifier with a lambda
@Bean
public ClassifierCompositeItemWriter<Order> statusRoutingWriter(
JdbcBatchItemWriter<Order> highValueWriter,
JdbcBatchItemWriter<Order> standardWriter) {
ClassifierCompositeItemWriter<Order> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier(order ->
order.getAmount().compareTo(new BigDecimal("1000")) > 0
? highValueWriter
: standardWriter
);
return writer;
}
Note: when using ClassifierCompositeItemWriter in a step, register each delegate writer as an ItemStream so Spring Batch opens and closes them correctly:
@Bean
public Step routingStep(JobRepository jobRepository,
PlatformTransactionManager tx,
ClassifierCompositeItemWriter<Order> routingWriter,
JdbcBatchItemWriter<Order> highValueWriter,
JdbcBatchItemWriter<Order> standardWriter,
JdbcPagingItemReader<Order> reader) {
return new StepBuilder("routingStep", jobRepository)
.<Order, Order>chunk(200, tx)
.reader(reader)
.writer(routingWriter)
.stream(highValueWriter) // register as stream for open/close
.stream(standardWriter)
.build();
}
Custom ItemWriter
For destinations not covered by built-in writers — REST APIs, message queues, S3, Elasticsearch — implement ItemWriter<T> directly.
REST API writer
@Component
@RequiredArgsConstructor
public class OrderApiItemWriter implements ItemWriter<Order> {
private final RestClient restClient;
private final String apiUrl;
@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
// Send entire chunk as a batch request for efficiency
List<Order> orders = (List<Order>) chunk.getItems();
restClient.post()
.uri(apiUrl + "/bulk")
.contentType(MediaType.APPLICATION_JSON)
.body(orders)
.retrieve()
.toBodilessEntity();
}
}
If the API does not support bulk requests, send items individually but build in retry:
@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
for (Order order : chunk) {
int attempts = 0;
while (true) {
try {
restClient.post()
.uri(apiUrl + "/orders")
.body(order)
.retrieve()
.toBodilessEntity();
break;
} catch (HttpServerErrorException e) {
if (++attempts >= 3) throw e;
Thread.sleep(500L * attempts);
}
}
}
}
S3 writer — upload chunk as a file
@Component
@RequiredArgsConstructor
public class S3OrderWriter implements ItemWriter<Order>, ItemStream {
private final S3Client s3Client;
private final String bucket;
private final ObjectMapper mapper;
private Path tempFile;
private BufferedWriter fileWriter;
private int chunkCount = 0;
@Override
public void open(ExecutionContext ctx) {
try {
tempFile = Files.createTempFile("batch-orders-", ".jsonl");
fileWriter = Files.newBufferedWriter(tempFile, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new ItemStreamException("Failed to open temp file", e);
}
}
@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
for (Order order : chunk) {
fileWriter.write(mapper.writeValueAsString(order));
fileWriter.newLine();
}
chunkCount++;
}
@Override
public void close() {
try {
fileWriter.close();
// Upload accumulated file to S3
s3Client.putObject(
PutObjectRequest.builder().bucket(bucket)
.key("batch-output/orders-" + System.currentTimeMillis() + ".jsonl")
.build(),
RequestBody.fromFile(tempFile));
Files.deleteIfExists(tempFile);
} catch (IOException e) {
throw new ItemStreamException("Failed to upload to S3", e);
}
}
@Override
public void update(ExecutionContext ctx) {
ctx.putInt("s3.chunk.count", chunkCount);
}
}
Database writer with custom SQL and error tracking
@Component
@RequiredArgsConstructor
public class OrderWithErrorTrackingWriter implements ItemWriter<Order> {
private final JdbcTemplate jdbcTemplate;
@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
List<Object[]> params = chunk.getItems().stream()
.map(o -> new Object[]{
o.getCustomerId(), o.getAmount(),
o.getOrderDate(), o.getStatus()
})
.collect(Collectors.toList());
int[] updateCounts = jdbcTemplate.batchUpdate(
"INSERT INTO orders (customer_id, amount, order_date, status) " +
"VALUES (?, ?, ?, ?)",
params);
// Verify all rows were inserted
long failures = Arrays.stream(updateCounts)
.filter(count -> count < 1)
.count();
if (failures > 0) {
throw new WriteFailedException(failures + " rows failed to insert");
}
}
}
Complete Example: Fan-Out Writer
Process orders and simultaneously write to MySQL (operational) + CSV (audit) + a REST reporting API.
@Configuration
@RequiredArgsConstructor
public class OrderFanOutJobConfig {
private final DataSource dataSource;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final RestClient reportingClient;
@Bean
public JdbcBatchItemWriter<Order> operationalDbWriter() {
return new JdbcBatchItemWriterBuilder<Order>()
.dataSource(dataSource)
.sql("INSERT INTO processed_orders (order_id, customer_id, amount, order_date, status) " +
"VALUES (:orderId, :customerId, :amount, :orderDate, :status)")
.beanMapped()
.build();
}
@Bean
public FlatFileItemWriter<Order> auditCsvWriter() {
BeanWrapperFieldExtractor<Order> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"orderId", "customerId", "amount", "orderDate", "status"});
DelimitedLineAggregator<Order> aggregator = new DelimitedLineAggregator<>();
aggregator.setDelimiter(",");
aggregator.setFieldExtractor(extractor);
return new FlatFileItemWriterBuilder<Order>()
.name("auditCsvWriter")
.resource(new FileSystemResource("/audit/orders-" +
LocalDate.now() + ".csv"))
.lineAggregator(aggregator)
.headerCallback(w -> w.write("orderId,customerId,amount,orderDate,status"))
.build();
}
@Bean
public ItemWriter<Order> reportingApiWriter() {
return chunk -> reportingClient.post()
.uri("/reports/orders/bulk")
.body(chunk.getItems())
.retrieve()
.toBodilessEntity();
}
@Bean
public CompositeItemWriter<Order> fanOutWriter() {
CompositeItemWriter<Order> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(
operationalDbWriter(),
auditCsvWriter(),
reportingApiWriter()
));
return writer;
}
@Bean
public Step fanOutStep(JdbcPagingItemReader<Order> reader) {
return new StepBuilder("fanOutStep", jobRepository)
.<Order, Order>chunk(100, tx)
.reader(reader)
.writer(fanOutWriter())
.stream(auditCsvWriter()) // must register FlatFileItemWriter as stream
.build();
}
@Bean
public Job fanOutJob(Step fanOutStep) {
return new JobBuilder("fanOutJob", jobRepository)
.start(fanOutStep)
.build();
}
}
Key Takeaways
JpaItemWriterusesEntityManager.merge()— it handles insert vs update automatically but is slower than JDBC. Enablehibernate.jdbc.batch_sizeor it serialises writes.CompositeItemWriterbroadcasts every item to all delegates. Ordering matters — the most critical writer goes first.ClassifierCompositeItemWriterroutes each item to exactly one writer based on a classifier function. Register each delegate as anItemStreamin the step.- For custom writers (REST, S3, queues), implement
ItemWriter<T>and optionallyItemStreamfor lifecycle management. - Writers receive a
Chunk<T>— process the entire chunk in one network/database call where possible for maximum throughput.
What’s Next
Part 3 (Writers) is complete. Article 11 starts Part 4 — Processors. You will learn how to transform, validate, filter, and enrich items with ItemProcessor, including chaining processors and calling external services safely.