Advanced Writers: JpaItemWriter, CompositeItemWriter, and Custom Writers

Introduction

The previous article covered the two most common writers: FlatFileItemWriter for files and JdbcBatchItemWriter for database inserts. This article covers advanced scenarios:

  • Persisting JPA entities with JpaItemWriter
  • Writing to multiple destinations simultaneously with CompositeItemWriter
  • Routing items to different writers based on content with ClassifierCompositeItemWriter
  • Building custom writers for REST APIs, message queues, and cloud storage
  • Combining a writer with a post-step cleanup tasklet

JpaItemWriter

JpaItemWriter calls EntityManager.merge() on each item. It is appropriate when your output is a managed JPA entity and you want Hibernate to handle the INSERT vs UPDATE decision automatically (merge = insert if new, update if existing).

@Bean
public JpaItemWriter<Order> jpaOrderWriter(EntityManagerFactory emf) {
    JpaItemWriter<Order> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(emf);
    return writer;
}

When to use JpaItemWriter vs JdbcBatchItemWriter

JpaItemWriterJdbcBatchItemWriter
SQL controlNone — Hibernate generates SQLFull control
AssociationsCascades respectedMust handle manually
PerformanceLower — entity lifecycle overheadHigher — raw JDBC batch
Upsert logicmerge() handles new vs existingUse ON DUPLICATE KEY
Batch modeNeeds hibernate.jdbc.batch_sizeAutomatic via executeBatch()

For bulk inserts on large tables, JdbcBatchItemWriter with rewriteBatchedStatements=true is significantly faster. Use JpaItemWriter when you need Hibernate cascade operations or auditing (@PrePersist, @PreUpdate).

Enabling Hibernate batch mode for JpaItemWriter

spring.jpa.properties.hibernate.jdbc.batch_size=50
spring.jpa.properties.hibernate.order_inserts=true
spring.jpa.properties.hibernate.order_updates=true
spring.jpa.properties.hibernate.generate_statistics=false

Without batch_size, Hibernate sends one SQL statement per entity — equivalent to not batching at all.


CompositeItemWriter

CompositeItemWriter fans out to multiple writers in sequence. Every writer receives the same chunk of items. Use it when you need to write to two destinations at once — for example, insert into a database and also write a CSV audit file.

@Bean
public CompositeItemWriter<Order> compositeOrderWriter(
        JdbcBatchItemWriter<Order> dbWriter,
        FlatFileItemWriter<Order> auditCsvWriter) {

    CompositeItemWriter<Order> composite = new CompositeItemWriter<>();
    composite.setDelegates(List.of(dbWriter, auditCsvWriter));
    return composite;
}

All delegates receive the same list of items. If any delegate throws, the chunk transaction rolls back. Order matters: put the most critical writer first — if the second writer fails you want the transaction to roll back the first writer’s work too.

CompositeItemWriter with different output types

If your processor outputs a wrapper object, each delegate extracts what it needs:

@Data
@AllArgsConstructor
public class OrderResult {
    private Order order;            // for DB insert
    private OrderAuditRecord audit; // for audit CSV
}

@Bean
public ItemWriter<OrderResult> dbInsertWriter(DataSource dataSource) {
    JdbcBatchItemWriter<Order> delegate = new JdbcBatchItemWriterBuilder<Order>()
            .dataSource(dataSource)
            .sql("INSERT INTO orders ...")
            .beanMapped()
            .build();

    // Wrap: extract Order from OrderResult before passing to delegate
    return chunk -> delegate.write(
            new Chunk<>(chunk.getItems().stream()
                    .map(OrderResult::getOrder)
                    .collect(Collectors.toList()))
    );
}

ClassifierCompositeItemWriter

ClassifierCompositeItemWriter routes each item to a different writer based on a Classifier<C, T> function. Use it when different item types need different destinations.

Routing orders by status

@Bean
public ClassifierCompositeItemWriter<Order> routingWriter(
        JdbcBatchItemWriter<Order> completedWriter,
        JdbcBatchItemWriter<Order> pendingWriter,
        JdbcBatchItemWriter<Order> cancelledWriter) {

    BackToBackPatternClassifier<Order, ItemWriter<? super Order>> classifier =
            new BackToBackPatternClassifier<>();

    classifier.setRouterDelegate((Order order) -> order.getStatus());

    classifier.setMatcherMap(Map.of(
            "COMPLETED",  completedWriter,
            "PENDING",    pendingWriter,
            "CANCELLED",  cancelledWriter
    ));

    ClassifierCompositeItemWriter<Order> writer = new ClassifierCompositeItemWriter<>();
    writer.setClassifier(classifier);
    return writer;
}

Each item is classified and sent to exactly one writer. Unlike CompositeItemWriter, items are not broadcast — each item goes to one destination only.

Simpler classifier with a lambda

@Bean
public ClassifierCompositeItemWriter<Order> statusRoutingWriter(
        JdbcBatchItemWriter<Order> highValueWriter,
        JdbcBatchItemWriter<Order> standardWriter) {

    ClassifierCompositeItemWriter<Order> writer = new ClassifierCompositeItemWriter<>();
    writer.setClassifier(order ->
            order.getAmount().compareTo(new BigDecimal("1000")) > 0
                    ? highValueWriter
                    : standardWriter
    );
    return writer;
}

Note: when using ClassifierCompositeItemWriter in a step, register each delegate writer as an ItemStream so Spring Batch opens and closes them correctly:

@Bean
public Step routingStep(JobRepository jobRepository,
                         PlatformTransactionManager tx,
                         ClassifierCompositeItemWriter<Order> routingWriter,
                         JdbcBatchItemWriter<Order> highValueWriter,
                         JdbcBatchItemWriter<Order> standardWriter,
                         JdbcPagingItemReader<Order> reader) {

    return new StepBuilder("routingStep", jobRepository)
            .<Order, Order>chunk(200, tx)
            .reader(reader)
            .writer(routingWriter)
            .stream(highValueWriter)   // register as stream for open/close
            .stream(standardWriter)
            .build();
}

Custom ItemWriter

For destinations not covered by built-in writers — REST APIs, message queues, S3, Elasticsearch — implement ItemWriter<T> directly.

REST API writer

@Component
@RequiredArgsConstructor
public class OrderApiItemWriter implements ItemWriter<Order> {

    private final RestClient restClient;
    private final String apiUrl;

    @Override
    public void write(Chunk<? extends Order> chunk) throws Exception {
        // Send entire chunk as a batch request for efficiency
        List<Order> orders = (List<Order>) chunk.getItems();

        restClient.post()
                .uri(apiUrl + "/bulk")
                .contentType(MediaType.APPLICATION_JSON)
                .body(orders)
                .retrieve()
                .toBodilessEntity();
    }
}

If the API does not support bulk requests, send items individually but build in retry:

@Override
public void write(Chunk<? extends Order> chunk) throws Exception {
    for (Order order : chunk) {
        int attempts = 0;
        while (true) {
            try {
                restClient.post()
                        .uri(apiUrl + "/orders")
                        .body(order)
                        .retrieve()
                        .toBodilessEntity();
                break;
            } catch (HttpServerErrorException e) {
                if (++attempts >= 3) throw e;
                Thread.sleep(500L * attempts);
            }
        }
    }
}

S3 writer — upload chunk as a file

@Component
@RequiredArgsConstructor
public class S3OrderWriter implements ItemWriter<Order>, ItemStream {

    private final S3Client s3Client;
    private final String bucket;
    private final ObjectMapper mapper;

    private Path tempFile;
    private BufferedWriter fileWriter;
    private int chunkCount = 0;

    @Override
    public void open(ExecutionContext ctx) {
        try {
            tempFile = Files.createTempFile("batch-orders-", ".jsonl");
            fileWriter = Files.newBufferedWriter(tempFile, StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new ItemStreamException("Failed to open temp file", e);
        }
    }

    @Override
    public void write(Chunk<? extends Order> chunk) throws Exception {
        for (Order order : chunk) {
            fileWriter.write(mapper.writeValueAsString(order));
            fileWriter.newLine();
        }
        chunkCount++;
    }

    @Override
    public void close() {
        try {
            fileWriter.close();
            // Upload accumulated file to S3
            s3Client.putObject(
                    PutObjectRequest.builder().bucket(bucket)
                            .key("batch-output/orders-" + System.currentTimeMillis() + ".jsonl")
                            .build(),
                    RequestBody.fromFile(tempFile));
            Files.deleteIfExists(tempFile);
        } catch (IOException e) {
            throw new ItemStreamException("Failed to upload to S3", e);
        }
    }

    @Override
    public void update(ExecutionContext ctx) {
        ctx.putInt("s3.chunk.count", chunkCount);
    }
}

Database writer with custom SQL and error tracking

@Component
@RequiredArgsConstructor
public class OrderWithErrorTrackingWriter implements ItemWriter<Order> {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public void write(Chunk<? extends Order> chunk) throws Exception {
        List<Object[]> params = chunk.getItems().stream()
                .map(o -> new Object[]{
                        o.getCustomerId(), o.getAmount(),
                        o.getOrderDate(), o.getStatus()
                })
                .collect(Collectors.toList());

        int[] updateCounts = jdbcTemplate.batchUpdate(
                "INSERT INTO orders (customer_id, amount, order_date, status) " +
                "VALUES (?, ?, ?, ?)",
                params);

        // Verify all rows were inserted
        long failures = Arrays.stream(updateCounts)
                .filter(count -> count < 1)
                .count();

        if (failures > 0) {
            throw new WriteFailedException(failures + " rows failed to insert");
        }
    }
}

Complete Example: Fan-Out Writer

Process orders and simultaneously write to MySQL (operational) + CSV (audit) + a REST reporting API.

@Configuration
@RequiredArgsConstructor
public class OrderFanOutJobConfig {

    private final DataSource dataSource;
    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;
    private final RestClient reportingClient;

    @Bean
    public JdbcBatchItemWriter<Order> operationalDbWriter() {
        return new JdbcBatchItemWriterBuilder<Order>()
                .dataSource(dataSource)
                .sql("INSERT INTO processed_orders (order_id, customer_id, amount, order_date, status) " +
                     "VALUES (:orderId, :customerId, :amount, :orderDate, :status)")
                .beanMapped()
                .build();
    }

    @Bean
    public FlatFileItemWriter<Order> auditCsvWriter() {
        BeanWrapperFieldExtractor<Order> extractor = new BeanWrapperFieldExtractor<>();
        extractor.setNames(new String[]{"orderId", "customerId", "amount", "orderDate", "status"});

        DelimitedLineAggregator<Order> aggregator = new DelimitedLineAggregator<>();
        aggregator.setDelimiter(",");
        aggregator.setFieldExtractor(extractor);

        return new FlatFileItemWriterBuilder<Order>()
                .name("auditCsvWriter")
                .resource(new FileSystemResource("/audit/orders-" +
                        LocalDate.now() + ".csv"))
                .lineAggregator(aggregator)
                .headerCallback(w -> w.write("orderId,customerId,amount,orderDate,status"))
                .build();
    }

    @Bean
    public ItemWriter<Order> reportingApiWriter() {
        return chunk -> reportingClient.post()
                .uri("/reports/orders/bulk")
                .body(chunk.getItems())
                .retrieve()
                .toBodilessEntity();
    }

    @Bean
    public CompositeItemWriter<Order> fanOutWriter() {
        CompositeItemWriter<Order> writer = new CompositeItemWriter<>();
        writer.setDelegates(List.of(
                operationalDbWriter(),
                auditCsvWriter(),
                reportingApiWriter()
        ));
        return writer;
    }

    @Bean
    public Step fanOutStep(JdbcPagingItemReader<Order> reader) {
        return new StepBuilder("fanOutStep", jobRepository)
                .<Order, Order>chunk(100, tx)
                .reader(reader)
                .writer(fanOutWriter())
                .stream(auditCsvWriter())   // must register FlatFileItemWriter as stream
                .build();
    }

    @Bean
    public Job fanOutJob(Step fanOutStep) {
        return new JobBuilder("fanOutJob", jobRepository)
                .start(fanOutStep)
                .build();
    }
}

Key Takeaways

  • JpaItemWriter uses EntityManager.merge() — it handles insert vs update automatically but is slower than JDBC. Enable hibernate.jdbc.batch_size or it serialises writes.
  • CompositeItemWriter broadcasts every item to all delegates. Ordering matters — the most critical writer goes first.
  • ClassifierCompositeItemWriter routes each item to exactly one writer based on a classifier function. Register each delegate as an ItemStream in the step.
  • For custom writers (REST, S3, queues), implement ItemWriter<T> and optionally ItemStream for lifecycle management.
  • Writers receive a Chunk<T> — process the entire chunk in one network/database call where possible for maximum throughput.

What’s Next

Part 3 (Writers) is complete. Article 11 starts Part 4 — Processors. You will learn how to transform, validate, filter, and enrich items with ItemProcessor, including chaining processors and calling external services safely.