Reading from External Sources: REST APIs, S3, and Custom ItemReaders

Introduction

Not all batch input comes from files or databases. You may need to pull orders from an e-commerce API, sync products from a supplier feed, or process CSV exports stored in Amazon S3. Spring Batch provides MultiResourceItemReader for S3 files and a clean ItemReader interface for anything else.

In this article you will build:

  1. A custom ItemReader that pages through a REST API
  2. An S3 reader that downloads files on demand
  3. A composite reader that merges multiple sources into one step

The ItemReader Contract

The entire ItemReader interface is one method:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

Return the next item, or null when the source is exhausted. Spring Batch stops reading when it sees null.

For readers that need open/close lifecycle (REST client, S3 client, database cursor), implement ItemStream as well:

public interface ItemStream {
    void open(ExecutionContext executionContext);
    void update(ExecutionContext executionContext);
    void close();
}

Extend AbstractItemStreamItemReader<T> to get both interfaces with default no-op implementations.


Reading from a REST API

REST APIs typically paginate results with page and size parameters. The reader must:

  1. Fetch the first page on first call.
  2. Return items one at a time from the current page.
  3. Fetch the next page when the current one is exhausted.
  4. Return null when the last page is empty.
  5. Save the current page number in ExecutionContext for restartability.

The reader

@Component
public class OrderApiItemReader extends AbstractItemStreamItemReader<Order> {

    private static final String PAGE_KEY = "order.api.page";

    private final RestClient restClient;
    private final String apiUrl;
    private final int pageSize;

    private Queue<Order> currentPage = new LinkedList<>();
    private int currentPageNumber = 0;
    private boolean exhausted = false;

    public OrderApiItemReader(RestClient restClient,
                               @Value("${orders.api.url}") String apiUrl,
                               @Value("${orders.api.page-size:100}") int pageSize) {
        this.restClient = restClient;
        this.apiUrl = apiUrl;
        this.pageSize = pageSize;
        setName("orderApiItemReader");
    }

    @Override
    public void open(ExecutionContext ctx) {
        super.open(ctx);
        // Restore page position on restart
        if (ctx.containsKey(PAGE_KEY)) {
            currentPageNumber = ctx.getInt(PAGE_KEY);
        }
        fetchPage(currentPageNumber);
    }

    @Override
    public Order read() {
        if (exhausted) return null;

        if (currentPage.isEmpty()) {
            currentPageNumber++;
            fetchPage(currentPageNumber);
        }

        return currentPage.isEmpty() ? null : currentPage.poll();
    }

    @Override
    public void update(ExecutionContext ctx) {
        super.update(ctx);
        // Persist current page number after each successful chunk
        ctx.putInt(PAGE_KEY, currentPageNumber);
    }

    private void fetchPage(int page) {
        OrderApiResponse response = restClient.get()
                .uri(apiUrl + "?page={page}&size={size}", page, pageSize)
                .retrieve()
                .body(OrderApiResponse.class);

        if (response == null || response.getOrders().isEmpty()) {
            exhausted = true;
            return;
        }

        currentPage.addAll(response.getOrders());

        // If we got fewer items than requested, this is the last page
        if (response.getOrders().size() < pageSize) {
            exhausted = true;
        }
    }
}

Supporting classes

@Data
public class OrderApiResponse {
    private List<Order> orders;
    private int totalPages;
    private int currentPage;
}
@Bean
public RestClient ordersRestClient() {
    return RestClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + apiToken)
            .build();
}

Handling rate limiting

Add retry with exponential backoff for APIs that return 429 Too Many Requests:

private void fetchPage(int page) {
    int attempts = 0;
    while (attempts < 3) {
        try {
            OrderApiResponse response = restClient.get()
                    .uri(apiUrl + "?page={page}&size={size}", page, pageSize)
                    .retrieve()
                    .body(OrderApiResponse.class);
            // process response ...
            return;
        } catch (HttpClientErrorException e) {
            if (e.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
                attempts++;
                long backoffMs = (long) Math.pow(2, attempts) * 1000;
                log.warn("Rate limited. Retrying in {}ms (attempt {})", backoffMs, attempts);
                try { Thread.sleep(backoffMs); } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new ItemStreamException("Interrupted during rate-limit backoff", ie);
                }
            } else {
                throw e;
            }
        }
    }
    throw new ItemStreamException("API rate limit exceeded after 3 retries on page " + page);
}

Reading Files from Amazon S3

S3 files are just Resource objects from Spring’s perspective. Use the AWS SDK’s Spring integration to create an S3Resource and feed it to FlatFileItemReader.

Dependency

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-s3</artifactId>
</dependency>

Single S3 file reader

@Bean
public FlatFileItemReader<Order> s3OrderReader(
        S3Client s3Client,
        @Value("${batch.s3.bucket}") String bucket,
        @Value("#{jobParameters['s3Key']}") String s3Key) {

    // S3Resource implements Spring's Resource interface
    S3Resource s3Resource = new S3Resource(bucket, s3Key, s3Client);

    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames("customerId", "amount", "orderDate", "status");

    DefaultLineMapper<Order> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new OrderFieldSetMapper());

    return new FlatFileItemReaderBuilder<Order>()
            .name("s3OrderReader")
            .resource(s3Resource)
            .lineMapper(lineMapper)
            .linesToSkip(1)
            .encoding("UTF-8")
            .build();
}

Reading multiple S3 files with MultiResourceItemReader

@Bean
public MultiResourceItemReader<Order> s3MultiFileOrderReader(
        S3Client s3Client,
        FlatFileItemReader<Order> delegateReader,
        @Value("${batch.s3.bucket}") String bucket,
        @Value("${batch.s3.prefix}") String prefix) {

    // List all objects under the prefix
    ListObjectsV2Response listing = s3Client.listObjectsV2(
            ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build());

    Resource[] resources = listing.contents().stream()
            .filter(obj -> obj.key().endsWith(".csv"))
            .sorted(Comparator.comparing(S3Object::key))
            .map(obj -> new S3Resource(bucket, obj.key(), s3Client))
            .toArray(Resource[]::new);

    return new MultiResourceItemReaderBuilder<Order>()
            .name("s3MultiFileOrderReader")
            .delegate(delegateReader)
            .resources(resources)
            .build();
}

Download-then-read pattern

For large files or when streaming from S3 is unreliable, download to a temp file first:

@Component
@RequiredArgsConstructor
public class S3DownloadTasklet implements Tasklet {

    private final S3Client s3Client;

    @Value("${batch.s3.bucket}")
    private String bucket;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        String s3Key = chunkContext.getStepContext()
                .getJobParameters().get("s3Key").toString();

        Path localFile = Paths.get("/tmp/batch-input", Paths.get(s3Key).getFileName().toString());
        Files.createDirectories(localFile.getParent());

        s3Client.getObject(
                GetObjectRequest.builder().bucket(bucket).key(s3Key).build(),
                ResponseTransformer.toFile(localFile));

        // Store local path in job execution context for the next step
        chunkContext.getStepContext()
                .getStepExecution().getJobExecution()
                .getExecutionContext()
                .putString("localFilePath", localFile.toString());

        return RepeatStatus.FINISHED;
    }
}

Custom ItemReader: Reading from a Message Queue

A Kafka-based reader for real-time batch windows:

public class KafkaWindowItemReader extends AbstractItemStreamItemReader<OrderEvent> {

    private static final String OFFSET_KEY = "kafka.offset";

    private final KafkaConsumer<String, OrderEvent> consumer;
    private final String topic;
    private final Duration pollTimeout;
    private final long windowEndTime;

    private Queue<OrderEvent> buffer = new LinkedList<>();
    private boolean windowClosed = false;

    @Override
    public void open(ExecutionContext ctx) {
        super.open(ctx);
        if (ctx.containsKey(OFFSET_KEY)) {
            // Seek to last committed offset on restart
            long offset = ctx.getLong(OFFSET_KEY);
            // ... seek logic ...
        }
        consumer.subscribe(List.of(topic));
    }

    @Override
    public OrderEvent read() {
        if (windowClosed) return null;

        if (buffer.isEmpty()) {
            if (System.currentTimeMillis() >= windowEndTime) {
                windowClosed = true;
                return null;
            }
            ConsumerRecords<String, OrderEvent> records =
                    consumer.poll(pollTimeout);
            records.forEach(r -> buffer.add(r.value()));
        }

        return buffer.poll();
    }

    @Override
    public void update(ExecutionContext ctx) {
        super.update(ctx);
        // Persist current offset
        consumer.assignment().forEach(tp ->
            ctx.putLong(OFFSET_KEY + "." + tp.partition(),
                        consumer.position(tp)));
    }

    @Override
    public void close() {
        consumer.close();
        super.close();
    }
}

Composing Multiple Sources

When a single step needs to read from two sources (e.g., a database + a CSV), use a custom composite reader:

public class CompositeOrderItemReader extends AbstractItemStreamItemReader<Order> {

    private final List<ItemStreamReader<Order>> delegates;
    private int currentDelegateIndex = 0;

    public CompositeOrderItemReader(List<ItemStreamReader<Order>> delegates) {
        this.delegates = delegates;
        setName("compositeOrderItemReader");
    }

    @Override
    public void open(ExecutionContext ctx) {
        super.open(ctx);
        delegates.forEach(d -> d.open(ctx));
    }

    @Override
    public Order read() throws Exception {
        while (currentDelegateIndex < delegates.size()) {
            Order item = delegates.get(currentDelegateIndex).read();
            if (item != null) return item;
            currentDelegateIndex++;  // move to next source
        }
        return null;  // all sources exhausted
    }

    @Override
    public void update(ExecutionContext ctx) {
        super.update(ctx);
        delegates.forEach(d -> d.update(ctx));
    }

    @Override
    public void close() {
        delegates.forEach(ItemStream::close);
        super.close();
    }
}

Wire it in configuration:

@Bean
public CompositeOrderItemReader compositeOrderReader(
        JdbcPagingItemReader<Order> dbReader,
        FlatFileItemReader<Order> csvReader) {

    return new CompositeOrderItemReader(List.of(dbReader, csvReader));
}

Complete Job: Import Orders from API + CSV

@Configuration
@RequiredArgsConstructor
public class MultiSourceImportJobConfig {

    private final DataSource dataSource;
    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;

    @Bean
    public Step importFromApiStep(OrderApiItemReader apiReader,
                                   JdbcBatchItemWriter<Order> writer) {
        return new StepBuilder("importFromApiStep", jobRepository)
                .<Order, Order>chunk(100, tx)
                .reader(apiReader)
                .writer(writer)
                .faultTolerant()
                .retry(Exception.class)
                .retryLimit(3)
                .build();
    }

    @Bean
    public Step importFromCsvStep(FlatFileItemReader<Order> csvReader,
                                   JdbcBatchItemWriter<Order> writer) {
        return new StepBuilder("importFromCsvStep", jobRepository)
                .<Order, Order>chunk(500, tx)
                .reader(csvReader)
                .writer(writer)
                .faultTolerant()
                .skip(FlatFileParseException.class)
                .skipLimit(100)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Order> orderInsertWriter() {
        return new JdbcBatchItemWriterBuilder<Order>()
                .dataSource(dataSource)
                .sql("INSERT IGNORE INTO orders (customer_id, amount, order_date, status) " +
                     "VALUES (:customerId, :amount, :orderDate, :status)")
                .beanMapped()
                .build();
    }

    @Bean
    public Job multiSourceImportJob(Step importFromApiStep, Step importFromCsvStep) {
        return new JobBuilder("multiSourceImportJob", jobRepository)
                .start(importFromApiStep)
                .next(importFromCsvStep)
                .build();
    }
}

Key Takeaways

  • Implement ItemReader<T> (returning null to signal end) plus ItemStream for lifecycle management. Extend AbstractItemStreamItemReader<T> to get both.
  • Save position in ExecutionContext.update() on every chunk so restarts are efficient.
  • For S3, use S3Resource from Spring Cloud AWS — it implements Resource and plugs directly into FlatFileItemReader.
  • For REST APIs, implement pagination inside the reader: buffer one page, return items one at a time, fetch the next page when the buffer empties.
  • Rate-limit handling belongs in the reader — use exponential backoff before propagating the exception.
  • Combine multiple sources sequentially in a CompositeItemReader that cycles through delegates until all return null.

What’s Next

Part 2 (Readers) is complete. Article 9 starts Part 3 — Writers. You will learn FlatFileItemWriter for CSV output, JdbcBatchItemWriter for high-throughput MySQL inserts, and how to tune batch insert performance.