Part 13 of 16

CompletableFuture: Async Pipelines and Non-Blocking Composition

The Problem with Future<T>

Java 5 introduced Future<T> for async computation. The problem: you can only get the result by calling get(), which blocks:

// Java 7
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<User> future = pool.submit(() -> fetchUser(userId));

// ...

User user = future.get(); // BLOCKS until done

If you have multiple async operations that depend on each other, you end up with a chain of blocking get() calls — effectively synchronous execution with extra overhead.

CompletableFuture solves this with a fluent, callback-based API: instead of blocking to get a result, you register what should happen when the result is ready.


Creating CompletableFutures

supplyAsync — async computation returning a value

// Runs in ForkJoinPool.commonPool() by default
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> fetchUser(42L));

// With custom executor
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<User> future2 = CompletableFuture.supplyAsync(
    () -> fetchUser(42L), pool);

runAsync — async computation with no return value

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sendEmail(user));

Completed futures

// Already-completed future — useful for testing and default values
CompletableFuture<String> done = CompletableFuture.completedFuture("Hello");

// Failed future
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(new RuntimeException("Something went wrong"));

Transforming Results

thenApply — transform the result (synchronous, same thread)

CompletableFuture<String> name = CompletableFuture
    .supplyAsync(() -> fetchUser(42L))
    .thenApply(User::getName);

thenApplyAsync — transform on a different thread

CompletableFuture<String> name = CompletableFuture
    .supplyAsync(() -> fetchUser(42L))
    .thenApplyAsync(User::getName, pool); // runs getName on pool thread

The Async suffix variants submit the callback to a thread pool (common pool or custom) rather than running on the completing thread. Use Async when the callback itself is expensive.

thenCompose — chain async operations that return CompletableFuture

// fetchUser returns CompletableFuture<User>
// fetchOrders(user) returns CompletableFuture<List<Order>>

// WRONG: thenApply gives CompletableFuture<CompletableFuture<List<Order>>>
CompletableFuture<CompletableFuture<List<Order>>> nested =
    CompletableFuture.supplyAsync(() -> fetchUser(42L))
        .thenApply(user -> fetchOrders(user.getId())); // wrong!

// RIGHT: thenCompose flattens
CompletableFuture<List<Order>> orders =
    CompletableFuture.supplyAsync(() -> fetchUser(42L))
        .thenCompose(user -> fetchOrders(user.getId())); // correct

thenCompose is to CompletableFuture what flatMap is to Optional and Stream.


Consuming Results

thenAccept — consume the result without returning a value

CompletableFuture.supplyAsync(() -> fetchUser(42L))
    .thenAccept(user -> System.out.println("Fetched: " + user.getName()));

thenRun — run a Runnable after completion (ignores the result)

CompletableFuture.supplyAsync(() -> processData())
    .thenRun(() -> System.out.println("Processing complete"));

Combining Multiple Futures

thenCombine — combine two independent futures

// Fetch user and orders in parallel, combine when both are done
CompletableFuture<User> userFuture   = CompletableFuture.supplyAsync(() -> fetchUser(id));
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrders(id));

CompletableFuture<UserWithOrders> combined = userFuture
    .thenCombine(orderFuture, (user, orders) -> new UserWithOrders(user, orders));

allOf — wait for all futures to complete

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id)))
    .collect(Collectors.toList());

// Wait for all to complete
CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0]));

// Collect results after all complete
CompletableFuture<List<User>> allUsers = all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join) // join() doesn't throw checked exception
        .collect(Collectors.toList())
);

anyOf — return the result of the first future to complete

CompletableFuture<Object> fastest = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> fetchFromPrimaryDB()),
    CompletableFuture.supplyAsync(() -> fetchFromReplicaDB()),
    CompletableFuture.supplyAsync(() -> fetchFromCache())
);

// Cast to the expected type
String result = (String) fastest.get();

Error Handling

exceptionally — provide a fallback on exception

CompletableFuture<User> safe = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .exceptionally(ex -> {
        log.error("Failed to fetch user {}: {}", id, ex.getMessage());
        return User.anonymous();
    });

exceptionally only runs if the previous stage completed exceptionally. If successful, it passes the result through unchanged.

handle — handle both success and failure

CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .handle((user, ex) -> {
        if (ex != null) {
            log.error("Error: {}", ex.getMessage());
            return "fallback-user";
        }
        return user.getName();
    });

handle always runs — regardless of success or failure. The ex parameter is null on success and non-null on failure.

whenComplete — side effects on completion (doesn’t transform the result)

CompletableFuture<User> user = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .whenComplete((result, ex) -> {
        if (ex != null) metrics.recordFailure();
        else metrics.recordSuccess();
    });
// The future's value/exception is still propagated unchanged

Blocking to Get the Result

When you need to block and get the final result (e.g., at the edge of an async system):

// get() — throws checked ExecutionException, InterruptedException
try {
    User user = future.get();
    User userWithTimeout = future.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
    // handle
}

// join() — throws unchecked CompletionException (preferred in lambdas)
User user = future.join();

// getNow(fallback) — returns fallback if not yet complete
User user = future.getNow(User.anonymous());

Full Pipeline Example

Fetch a user, their orders, and the shipping status of each order — all in parallel where possible:

CompletableFuture<Report> report = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), ioPool)          // fetch user
    .thenCompose(user ->
        CompletableFuture.supplyAsync(() -> fetchOrders(user.getId()), ioPool)
            .thenApply(orders -> new UserWithOrders(user, orders))
    )
    .thenCompose(uwo -> {
        // Fetch shipping status for each order in parallel
        List<CompletableFuture<OrderStatus>> statusFutures = uwo.getOrders().stream()
            .map(order -> CompletableFuture.supplyAsync(
                () -> fetchShippingStatus(order.getId()), ioPool))
            .collect(Collectors.toList());

        return CompletableFuture.allOf(statusFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<OrderStatus> statuses = statusFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
                return new Report(uwo.getUser(), uwo.getOrders(), statuses);
            });
    })
    .exceptionally(ex -> {
        log.error("Report generation failed", ex);
        return Report.empty();
    });

// Blocking get at the HTTP handler boundary
Report r = report.get(10, TimeUnit.SECONDS);

CompletableFuture vs Thread + Future

ApproachBlocking?Composable?Exception handling
Future.get()YesNoChecked ExecutionException
CompletableFutureNo (callbacks)Yesexceptionally / handle
CompletableFuture.get()Yes (if needed)YesChecked / unchecked

Common Pitfalls

Forgetting to use a custom pool for I/O

// WRONG: blocks ForkJoinPool.commonPool() threads with I/O
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"));

// RIGHT: use a dedicated I/O thread pool
ExecutorService ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"), ioPool);

Using get() inside async callbacks

// WRONG: deadlock risk — get() inside a ForkJoin callback can exhaust the pool
CompletableFuture<String> result = future.thenApply(user ->
    anotherFuture.get() // blocking inside callback!
);

// RIGHT: use thenCompose to chain async operations
CompletableFuture<String> result = future.thenCompose(user -> anotherFuture);

Not handling exceptions

// WRONG: unhandled exception — future fails silently
CompletableFuture.supplyAsync(() -> riskyOperation());

// RIGHT: always add error handling
CompletableFuture.supplyAsync(() -> riskyOperation())
    .exceptionally(ex -> { log.error("Failed", ex); return fallback; });

Summary

MethodPurpose
supplyAsync(Supplier)Start async computation returning a value
runAsync(Runnable)Start async computation with no return
thenApply(Function)Transform result (sync)
thenApplyAsync(Function)Transform result (async, new thread)
thenCompose(Function)Chain async operation (flatMap equivalent)
thenAccept(Consumer)Consume result, return void
thenRun(Runnable)Run after completion, ignore result
thenCombine(other, fn)Combine two independent futures
allOf(futures...)Wait for all to complete
anyOf(futures...)First to complete wins
exceptionally(fn)Fallback on exception
handle(fn)Handle both success and failure
whenComplete(fn)Side effects on completion
join()Block for result (unchecked)
get()Block for result (checked)

Next Step

New APIs: Base64, StampedLock, Nashorn JavaScript Engine →

Part of the DevOps Monk Java tutorial series: Java 8Java 11Java 17Java 21