Reading from External Sources: REST APIs, S3, and Custom ItemReaders
Introduction
Not all batch input comes from files or databases. You may need to pull orders from an e-commerce API, sync products from a supplier feed, or process CSV exports stored in Amazon S3. Spring Batch provides MultiResourceItemReader for S3 files and a clean ItemReader interface for anything else.
In this article you will build:
- A custom
ItemReaderthat pages through a REST API - An S3 reader that downloads files on demand
- A composite reader that merges multiple sources into one step
The ItemReader Contract
The entire ItemReader interface is one method:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
Return the next item, or null when the source is exhausted. Spring Batch stops reading when it sees null.
For readers that need open/close lifecycle (REST client, S3 client, database cursor), implement ItemStream as well:
public interface ItemStream {
void open(ExecutionContext executionContext);
void update(ExecutionContext executionContext);
void close();
}
Extend AbstractItemStreamItemReader<T> to get both interfaces with default no-op implementations.
Reading from a REST API
REST APIs typically paginate results with page and size parameters. The reader must:
- Fetch the first page on first call.
- Return items one at a time from the current page.
- Fetch the next page when the current one is exhausted.
- Return
nullwhen the last page is empty. - Save the current page number in
ExecutionContextfor restartability.
The reader
@Component
public class OrderApiItemReader extends AbstractItemStreamItemReader<Order> {
private static final String PAGE_KEY = "order.api.page";
private final RestClient restClient;
private final String apiUrl;
private final int pageSize;
private Queue<Order> currentPage = new LinkedList<>();
private int currentPageNumber = 0;
private boolean exhausted = false;
public OrderApiItemReader(RestClient restClient,
@Value("${orders.api.url}") String apiUrl,
@Value("${orders.api.page-size:100}") int pageSize) {
this.restClient = restClient;
this.apiUrl = apiUrl;
this.pageSize = pageSize;
setName("orderApiItemReader");
}
@Override
public void open(ExecutionContext ctx) {
super.open(ctx);
// Restore page position on restart
if (ctx.containsKey(PAGE_KEY)) {
currentPageNumber = ctx.getInt(PAGE_KEY);
}
fetchPage(currentPageNumber);
}
@Override
public Order read() {
if (exhausted) return null;
if (currentPage.isEmpty()) {
currentPageNumber++;
fetchPage(currentPageNumber);
}
return currentPage.isEmpty() ? null : currentPage.poll();
}
@Override
public void update(ExecutionContext ctx) {
super.update(ctx);
// Persist current page number after each successful chunk
ctx.putInt(PAGE_KEY, currentPageNumber);
}
private void fetchPage(int page) {
OrderApiResponse response = restClient.get()
.uri(apiUrl + "?page={page}&size={size}", page, pageSize)
.retrieve()
.body(OrderApiResponse.class);
if (response == null || response.getOrders().isEmpty()) {
exhausted = true;
return;
}
currentPage.addAll(response.getOrders());
// If we got fewer items than requested, this is the last page
if (response.getOrders().size() < pageSize) {
exhausted = true;
}
}
}
Supporting classes
@Data
public class OrderApiResponse {
private List<Order> orders;
private int totalPages;
private int currentPage;
}
@Bean
public RestClient ordersRestClient() {
return RestClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + apiToken)
.build();
}
Handling rate limiting
Add retry with exponential backoff for APIs that return 429 Too Many Requests:
private void fetchPage(int page) {
int attempts = 0;
while (attempts < 3) {
try {
OrderApiResponse response = restClient.get()
.uri(apiUrl + "?page={page}&size={size}", page, pageSize)
.retrieve()
.body(OrderApiResponse.class);
// process response ...
return;
} catch (HttpClientErrorException e) {
if (e.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
attempts++;
long backoffMs = (long) Math.pow(2, attempts) * 1000;
log.warn("Rate limited. Retrying in {}ms (attempt {})", backoffMs, attempts);
try { Thread.sleep(backoffMs); } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ItemStreamException("Interrupted during rate-limit backoff", ie);
}
} else {
throw e;
}
}
}
throw new ItemStreamException("API rate limit exceeded after 3 retries on page " + page);
}
Reading Files from Amazon S3
S3 files are just Resource objects from Spring’s perspective. Use the AWS SDK’s Spring integration to create an S3Resource and feed it to FlatFileItemReader.
Dependency
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-s3</artifactId>
</dependency>
Single S3 file reader
@Bean
public FlatFileItemReader<Order> s3OrderReader(
S3Client s3Client,
@Value("${batch.s3.bucket}") String bucket,
@Value("#{jobParameters['s3Key']}") String s3Key) {
// S3Resource implements Spring's Resource interface
S3Resource s3Resource = new S3Resource(bucket, s3Key, s3Client);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("customerId", "amount", "orderDate", "status");
DefaultLineMapper<Order> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new OrderFieldSetMapper());
return new FlatFileItemReaderBuilder<Order>()
.name("s3OrderReader")
.resource(s3Resource)
.lineMapper(lineMapper)
.linesToSkip(1)
.encoding("UTF-8")
.build();
}
Reading multiple S3 files with MultiResourceItemReader
@Bean
public MultiResourceItemReader<Order> s3MultiFileOrderReader(
S3Client s3Client,
FlatFileItemReader<Order> delegateReader,
@Value("${batch.s3.bucket}") String bucket,
@Value("${batch.s3.prefix}") String prefix) {
// List all objects under the prefix
ListObjectsV2Response listing = s3Client.listObjectsV2(
ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build());
Resource[] resources = listing.contents().stream()
.filter(obj -> obj.key().endsWith(".csv"))
.sorted(Comparator.comparing(S3Object::key))
.map(obj -> new S3Resource(bucket, obj.key(), s3Client))
.toArray(Resource[]::new);
return new MultiResourceItemReaderBuilder<Order>()
.name("s3MultiFileOrderReader")
.delegate(delegateReader)
.resources(resources)
.build();
}
Download-then-read pattern
For large files or when streaming from S3 is unreliable, download to a temp file first:
@Component
@RequiredArgsConstructor
public class S3DownloadTasklet implements Tasklet {
private final S3Client s3Client;
@Value("${batch.s3.bucket}")
private String bucket;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
String s3Key = chunkContext.getStepContext()
.getJobParameters().get("s3Key").toString();
Path localFile = Paths.get("/tmp/batch-input", Paths.get(s3Key).getFileName().toString());
Files.createDirectories(localFile.getParent());
s3Client.getObject(
GetObjectRequest.builder().bucket(bucket).key(s3Key).build(),
ResponseTransformer.toFile(localFile));
// Store local path in job execution context for the next step
chunkContext.getStepContext()
.getStepExecution().getJobExecution()
.getExecutionContext()
.putString("localFilePath", localFile.toString());
return RepeatStatus.FINISHED;
}
}
Custom ItemReader: Reading from a Message Queue
A Kafka-based reader for real-time batch windows:
public class KafkaWindowItemReader extends AbstractItemStreamItemReader<OrderEvent> {
private static final String OFFSET_KEY = "kafka.offset";
private final KafkaConsumer<String, OrderEvent> consumer;
private final String topic;
private final Duration pollTimeout;
private final long windowEndTime;
private Queue<OrderEvent> buffer = new LinkedList<>();
private boolean windowClosed = false;
@Override
public void open(ExecutionContext ctx) {
super.open(ctx);
if (ctx.containsKey(OFFSET_KEY)) {
// Seek to last committed offset on restart
long offset = ctx.getLong(OFFSET_KEY);
// ... seek logic ...
}
consumer.subscribe(List.of(topic));
}
@Override
public OrderEvent read() {
if (windowClosed) return null;
if (buffer.isEmpty()) {
if (System.currentTimeMillis() >= windowEndTime) {
windowClosed = true;
return null;
}
ConsumerRecords<String, OrderEvent> records =
consumer.poll(pollTimeout);
records.forEach(r -> buffer.add(r.value()));
}
return buffer.poll();
}
@Override
public void update(ExecutionContext ctx) {
super.update(ctx);
// Persist current offset
consumer.assignment().forEach(tp ->
ctx.putLong(OFFSET_KEY + "." + tp.partition(),
consumer.position(tp)));
}
@Override
public void close() {
consumer.close();
super.close();
}
}
Composing Multiple Sources
When a single step needs to read from two sources (e.g., a database + a CSV), use a custom composite reader:
public class CompositeOrderItemReader extends AbstractItemStreamItemReader<Order> {
private final List<ItemStreamReader<Order>> delegates;
private int currentDelegateIndex = 0;
public CompositeOrderItemReader(List<ItemStreamReader<Order>> delegates) {
this.delegates = delegates;
setName("compositeOrderItemReader");
}
@Override
public void open(ExecutionContext ctx) {
super.open(ctx);
delegates.forEach(d -> d.open(ctx));
}
@Override
public Order read() throws Exception {
while (currentDelegateIndex < delegates.size()) {
Order item = delegates.get(currentDelegateIndex).read();
if (item != null) return item;
currentDelegateIndex++; // move to next source
}
return null; // all sources exhausted
}
@Override
public void update(ExecutionContext ctx) {
super.update(ctx);
delegates.forEach(d -> d.update(ctx));
}
@Override
public void close() {
delegates.forEach(ItemStream::close);
super.close();
}
}
Wire it in configuration:
@Bean
public CompositeOrderItemReader compositeOrderReader(
JdbcPagingItemReader<Order> dbReader,
FlatFileItemReader<Order> csvReader) {
return new CompositeOrderItemReader(List.of(dbReader, csvReader));
}
Complete Job: Import Orders from API + CSV
@Configuration
@RequiredArgsConstructor
public class MultiSourceImportJobConfig {
private final DataSource dataSource;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
@Bean
public Step importFromApiStep(OrderApiItemReader apiReader,
JdbcBatchItemWriter<Order> writer) {
return new StepBuilder("importFromApiStep", jobRepository)
.<Order, Order>chunk(100, tx)
.reader(apiReader)
.writer(writer)
.faultTolerant()
.retry(Exception.class)
.retryLimit(3)
.build();
}
@Bean
public Step importFromCsvStep(FlatFileItemReader<Order> csvReader,
JdbcBatchItemWriter<Order> writer) {
return new StepBuilder("importFromCsvStep", jobRepository)
.<Order, Order>chunk(500, tx)
.reader(csvReader)
.writer(writer)
.faultTolerant()
.skip(FlatFileParseException.class)
.skipLimit(100)
.build();
}
@Bean
public JdbcBatchItemWriter<Order> orderInsertWriter() {
return new JdbcBatchItemWriterBuilder<Order>()
.dataSource(dataSource)
.sql("INSERT IGNORE INTO orders (customer_id, amount, order_date, status) " +
"VALUES (:customerId, :amount, :orderDate, :status)")
.beanMapped()
.build();
}
@Bean
public Job multiSourceImportJob(Step importFromApiStep, Step importFromCsvStep) {
return new JobBuilder("multiSourceImportJob", jobRepository)
.start(importFromApiStep)
.next(importFromCsvStep)
.build();
}
}
Key Takeaways
- Implement
ItemReader<T>(returningnullto signal end) plusItemStreamfor lifecycle management. ExtendAbstractItemStreamItemReader<T>to get both. - Save position in
ExecutionContext.update()on every chunk so restarts are efficient. - For S3, use
S3Resourcefrom Spring Cloud AWS — it implementsResourceand plugs directly intoFlatFileItemReader. - For REST APIs, implement pagination inside the reader: buffer one page, return items one at a time, fetch the next page when the buffer empties.
- Rate-limit handling belongs in the reader — use exponential backoff before propagating the exception.
- Combine multiple sources sequentially in a
CompositeItemReaderthat cycles through delegates until all returnnull.
What’s Next
Part 2 (Readers) is complete. Article 9 starts Part 3 — Writers. You will learn FlatFileItemWriter for CSV output, JdbcBatchItemWriter for high-throughput MySQL inserts, and how to tune batch insert performance.