CompletableFuture: Async Pipelines and Non-Blocking Composition
The Problem with Future<T>
Java 5 introduced Future<T> for async computation. The problem: you can only get the result by calling get(), which blocks:
// Java 7
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<User> future = pool.submit(() -> fetchUser(userId));
// ...
User user = future.get(); // BLOCKS until done
If you have multiple async operations that depend on each other, you end up with a chain of blocking get() calls — effectively synchronous execution with extra overhead.
CompletableFuture solves this with a fluent, callback-based API: instead of blocking to get a result, you register what should happen when the result is ready.
Creating CompletableFutures
supplyAsync — async computation returning a value
// Runs in ForkJoinPool.commonPool() by default
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> fetchUser(42L));
// With custom executor
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<User> future2 = CompletableFuture.supplyAsync(
() -> fetchUser(42L), pool);
runAsync — async computation with no return value
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sendEmail(user));
Completed futures
// Already-completed future — useful for testing and default values
CompletableFuture<String> done = CompletableFuture.completedFuture("Hello");
// Failed future
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(new RuntimeException("Something went wrong"));
Transforming Results
thenApply — transform the result (synchronous, same thread)
CompletableFuture<String> name = CompletableFuture
.supplyAsync(() -> fetchUser(42L))
.thenApply(User::getName);
thenApplyAsync — transform on a different thread
CompletableFuture<String> name = CompletableFuture
.supplyAsync(() -> fetchUser(42L))
.thenApplyAsync(User::getName, pool); // runs getName on pool thread
The Async suffix variants submit the callback to a thread pool (common pool or custom) rather than running on the completing thread. Use Async when the callback itself is expensive.
thenCompose — chain async operations that return CompletableFuture
// fetchUser returns CompletableFuture<User>
// fetchOrders(user) returns CompletableFuture<List<Order>>
// WRONG: thenApply gives CompletableFuture<CompletableFuture<List<Order>>>
CompletableFuture<CompletableFuture<List<Order>>> nested =
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenApply(user -> fetchOrders(user.getId())); // wrong!
// RIGHT: thenCompose flattens
CompletableFuture<List<Order>> orders =
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenCompose(user -> fetchOrders(user.getId())); // correct
thenCompose is to CompletableFuture what flatMap is to Optional and Stream.
Consuming Results
thenAccept — consume the result without returning a value
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenAccept(user -> System.out.println("Fetched: " + user.getName()));
thenRun — run a Runnable after completion (ignores the result)
CompletableFuture.supplyAsync(() -> processData())
.thenRun(() -> System.out.println("Processing complete"));
Combining Multiple Futures
thenCombine — combine two independent futures
// Fetch user and orders in parallel, combine when both are done
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> fetchUser(id));
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrders(id));
CompletableFuture<UserWithOrders> combined = userFuture
.thenCombine(orderFuture, (user, orders) -> new UserWithOrders(user, orders));
allOf — wait for all futures to complete
List<CompletableFuture<User>> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id)))
.collect(Collectors.toList());
// Wait for all to complete
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// Collect results after all complete
CompletableFuture<List<User>> allUsers = all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // join() doesn't throw checked exception
.collect(Collectors.toList())
);
anyOf — return the result of the first future to complete
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> fetchFromPrimaryDB()),
CompletableFuture.supplyAsync(() -> fetchFromReplicaDB()),
CompletableFuture.supplyAsync(() -> fetchFromCache())
);
// Cast to the expected type
String result = (String) fastest.get();
Error Handling
exceptionally — provide a fallback on exception
CompletableFuture<User> safe = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.exceptionally(ex -> {
log.error("Failed to fetch user {}: {}", id, ex.getMessage());
return User.anonymous();
});
exceptionally only runs if the previous stage completed exceptionally. If successful, it passes the result through unchanged.
handle — handle both success and failure
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.handle((user, ex) -> {
if (ex != null) {
log.error("Error: {}", ex.getMessage());
return "fallback-user";
}
return user.getName();
});
handle always runs — regardless of success or failure. The ex parameter is null on success and non-null on failure.
whenComplete — side effects on completion (doesn’t transform the result)
CompletableFuture<User> user = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.whenComplete((result, ex) -> {
if (ex != null) metrics.recordFailure();
else metrics.recordSuccess();
});
// The future's value/exception is still propagated unchanged
Blocking to Get the Result
When you need to block and get the final result (e.g., at the edge of an async system):
// get() — throws checked ExecutionException, InterruptedException
try {
User user = future.get();
User userWithTimeout = future.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
// handle
}
// join() — throws unchecked CompletionException (preferred in lambdas)
User user = future.join();
// getNow(fallback) — returns fallback if not yet complete
User user = future.getNow(User.anonymous());
Full Pipeline Example
Fetch a user, their orders, and the shipping status of each order — all in parallel where possible:
CompletableFuture<Report> report = CompletableFuture
.supplyAsync(() -> fetchUser(userId), ioPool) // fetch user
.thenCompose(user ->
CompletableFuture.supplyAsync(() -> fetchOrders(user.getId()), ioPool)
.thenApply(orders -> new UserWithOrders(user, orders))
)
.thenCompose(uwo -> {
// Fetch shipping status for each order in parallel
List<CompletableFuture<OrderStatus>> statusFutures = uwo.getOrders().stream()
.map(order -> CompletableFuture.supplyAsync(
() -> fetchShippingStatus(order.getId()), ioPool))
.collect(Collectors.toList());
return CompletableFuture.allOf(statusFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<OrderStatus> statuses = statusFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return new Report(uwo.getUser(), uwo.getOrders(), statuses);
});
})
.exceptionally(ex -> {
log.error("Report generation failed", ex);
return Report.empty();
});
// Blocking get at the HTTP handler boundary
Report r = report.get(10, TimeUnit.SECONDS);
CompletableFuture vs Thread + Future
| Approach | Blocking? | Composable? | Exception handling |
|---|---|---|---|
Future.get() | Yes | No | Checked ExecutionException |
CompletableFuture | No (callbacks) | Yes | exceptionally / handle |
CompletableFuture.get() | Yes (if needed) | Yes | Checked / unchecked |
Common Pitfalls
Forgetting to use a custom pool for I/O
// WRONG: blocks ForkJoinPool.commonPool() threads with I/O
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"));
// RIGHT: use a dedicated I/O thread pool
ExecutorService ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"), ioPool);
Using get() inside async callbacks
// WRONG: deadlock risk — get() inside a ForkJoin callback can exhaust the pool
CompletableFuture<String> result = future.thenApply(user ->
anotherFuture.get() // blocking inside callback!
);
// RIGHT: use thenCompose to chain async operations
CompletableFuture<String> result = future.thenCompose(user -> anotherFuture);
Not handling exceptions
// WRONG: unhandled exception — future fails silently
CompletableFuture.supplyAsync(() -> riskyOperation());
// RIGHT: always add error handling
CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> { log.error("Failed", ex); return fallback; });
Summary
| Method | Purpose |
|---|---|
supplyAsync(Supplier) | Start async computation returning a value |
runAsync(Runnable) | Start async computation with no return |
thenApply(Function) | Transform result (sync) |
thenApplyAsync(Function) | Transform result (async, new thread) |
thenCompose(Function) | Chain async operation (flatMap equivalent) |
thenAccept(Consumer) | Consume result, return void |
thenRun(Runnable) | Run after completion, ignore result |
thenCombine(other, fn) | Combine two independent futures |
allOf(futures...) | Wait for all to complete |
anyOf(futures...) | First to complete wins |
exceptionally(fn) | Fallback on exception |
handle(fn) | Handle both success and failure |
whenComplete(fn) | Side effects on completion |
join() | Block for result (unchecked) |
get() | Block for result (checked) |
Next Step
New APIs: Base64, StampedLock, Nashorn JavaScript Engine →
Part of the DevOps Monk Java tutorial series: Java 8 → Java 11 → Java 17 → Java 21