Part 7 of 15

Structured Concurrency (JEP 453): Safe, Readable Concurrent Code

Preview Feature — Requires --enable-preview at compile and runtime. The API stabilized significantly from Java 21 through 24 and will be finalized in a future release.

The Unstructured Concurrency Problem

When you submit tasks to an ExecutorService, the tasks are logically related but structurally unconnected — the executor doesn’t know they belong together:

// Unstructured concurrency — looks simple, hides serious problems
ExecutorService executor = Executors.newCachedThreadPool();
Future<Order>    orderFuture    = executor.submit(() -> fetchOrder(orderId));
Future<Customer> customerFuture = executor.submit(() -> fetchCustomer(customerId));

try {
    Order    order    = orderFuture.get();    // blocks
    Customer customer = customerFuture.get(); // blocks
    return new OrderSummary(order, customer);
} catch (ExecutionException e) {
    // orderFuture failed — but customerFuture is STILL RUNNING
    // Thread leak until customerFuture eventually completes
    throw new RuntimeException(e);
}

Problems:

  • If orderFuture throws, customerFuture keeps running — thread leak
  • Cancellation must be done manually and is easy to forget
  • Exceptions from subtasks are wrapped in ExecutionException — stack traces are confusing
  • Observability tools see disconnected threads, not a task tree

Structured Concurrency Principles

Structured concurrency applies the same principle to threads that structured programming applied to control flow: the lifetime of a concurrent task must be nested within its parent’s lifetime. A parent task waits for all its children before returning.

flowchart TD
    Parent["Parent task\n(fetchOrderSummary)"]
    C1["Child task\n(fetchOrder)"]
    C2["Child task\n(fetchCustomer)"]
    Join["scope.join() — parent waits here\nuntil both children finish"]
    Result["Parent uses results\nand returns"]

    Parent --> C1
    Parent --> C2
    C1 --> Join
    C2 --> Join
    Join --> Result

The scope’s close() (via try-with-resources) ensures all children finish before the parent continues — guaranteed by the language runtime, not by your code.


StructuredTaskScope — The Core API

// Enable preview: --enable-preview --release 21
import java.util.concurrent.StructuredTaskScope;

record OrderSummary(Order order, Customer customer) {}

OrderSummary fetchOrderSummary(String orderId, String customerId)
        throws InterruptedException, ExecutionException {

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        StructuredTaskScope.Subtask<Order>    orderTask    =
            scope.fork(() -> fetchOrder(orderId));
        StructuredTaskScope.Subtask<Customer> customerTask =
            scope.fork(() -> fetchCustomer(customerId));

        scope.join();           // wait for both to complete
        scope.throwIfFailed();  // rethrow first exception if any failed

        return new OrderSummary(orderTask.get(), customerTask.get());
    }
}

What happens if fetchOrder fails:

  1. scope.join() returns (all tasks finished, some with failure)
  2. scope.throwIfFailed() throws the exception from the failed task
  3. The try-with-resources closes the scope — fetchCustomer is cancelled if still running
  4. No thread leak

ShutdownOnFailure — All-or-Nothing

ShutdownOnFailure cancels all remaining subtasks the moment any one subtask fails:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> callServiceA());
    var task2 = scope.fork(() -> callServiceB());
    var task3 = scope.fork(() -> callServiceC());

    scope.join();
    scope.throwIfFailed();  // throws if any task failed

    return merge(task1.get(), task2.get(), task3.get());
}
sequenceDiagram
    participant Scope
    participant TaskA
    participant TaskB
    participant TaskC

    Scope->>TaskA: fork
    Scope->>TaskB: fork
    Scope->>TaskC: fork
    TaskB-->>Scope: RuntimeException
    Note over Scope: ShutdownOnFailure:\ncancel A and C immediately
    Scope->>TaskA: interrupt()
    Scope->>TaskC: interrupt()
    Scope->>Scope: scope.throwIfFailed()\nrethrows TaskB's exception

ShutdownOnSuccess — First Success Wins

ShutdownOnSuccess completes the scope the moment any one subtask succeeds. Useful for redundant calls to multiple providers — use the fastest response:

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Price>()) {
    scope.fork(() -> getPriceFromProviderA(productId));
    scope.fork(() -> getPriceFromProviderB(productId));
    scope.fork(() -> getPriceFromProviderC(productId));

    scope.join();
    return scope.result();  // result of the first successful task
}
// All remaining tasks are cancelled when the scope closes
sequenceDiagram
    participant Scope
    participant ProviderA
    participant ProviderB
    participant ProviderC

    Scope->>ProviderA: fork
    Scope->>ProviderB: fork
    Scope->>ProviderC: fork
    Note over ProviderA: slow...
    ProviderB-->>Scope: Price(99.99) — FIRST SUCCESS
    Note over Scope: ShutdownOnSuccess:\ncancel A and C
    Scope->>ProviderA: interrupt()
    Scope->>ProviderC: interrupt()
    Scope->>Scope: scope.result() = 99.99

Subtask States

After scope.join(), subtasks are in one of three states:

var task = scope.fork(() -> fetchOrder(id));
scope.join();

switch (task.state()) {
    case UNAVAILABLE  -> // task hasn't finished yet (should not happen after join)
    case SUCCESS      -> Order order = task.get();
    case FAILED       -> Throwable ex = task.exception();
}

task.get() throws IllegalStateException if the task failed. Always check state or use throwIfFailed() first.


Nested Scopes — Hierarchical Task Trees

Scopes can be nested — each level waits for its children:

OrderSummary fetchOrderSummary(String orderId, String customerId) 
        throws Exception {

    try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {

        var orderTask = outerScope.fork(() -> {
            // Inner scope for parallel sub-tasks of fetching order
            try (var innerScope = new StructuredTaskScope.ShutdownOnFailure()) {
                var detailsTask  = innerScope.fork(() -> fetchOrderDetails(orderId));
                var itemsTask    = innerScope.fork(() -> fetchOrderItems(orderId));
                innerScope.join();
                innerScope.throwIfFailed();
                return new Order(detailsTask.get(), itemsTask.get());
            }
        });

        var customerTask = outerScope.fork(() -> fetchCustomer(customerId));

        outerScope.join();
        outerScope.throwIfFailed();

        return new OrderSummary(orderTask.get(), customerTask.get());
    }
}
flowchart TD
    OS["outer scope"]
    OT["orderTask"]
    CT["customerTask"]
    IS["inner scope"]
    DT["detailsTask"]
    IT["itemsTask"]

    OS --> OT & CT
    OT --> IS
    IS --> DT & IT

Custom Shutdown Policies

Implement StructuredTaskScope directly for custom policies:

// Policy: succeed if at least 2 out of 3 tasks succeed
class QuorumScope<T> extends StructuredTaskScope<T> {
    private final List<T> results = new CopyOnWriteArrayList<>();
    private final int quorum;

    QuorumScope(int quorum) {
        this.quorum = quorum;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
            if (results.size() >= quorum) {
                shutdown();  // signal: enough successes, cancel remaining
            }
        }
    }

    List<T> results() {
        super.ensureOwnerAndJoined();
        if (results.size() < quorum) throw new RuntimeException("Quorum not reached");
        return results;
    }
}

// Usage
try (var scope = new QuorumScope<String>(2)) {
    scope.fork(() -> fetchFromReplica1());
    scope.fork(() -> fetchFromReplica2());
    scope.fork(() -> fetchFromReplica3());
    scope.join();
    return scope.results().get(0);  // use first result that arrived
}

Integration with Scoped Values

Scoped values (JEP 446) are inherited automatically by forked tasks inside a scope:

static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();

void handleRequest(Request req) throws Exception {
    ScopedValue.where(CONTEXT, new RequestContext(req.traceId(), req.userId()))
        .run(() -> {
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                scope.fork(() -> {
                    // CONTEXT.get() returns the parent's value automatically
                    log.info("Processing in context: {}", CONTEXT.get().traceId());
                    return processOrder();
                });
                scope.join();
                scope.throwIfFailed();
            }
        });
}

Enabling Preview Features

<!-- pom.xml -->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
        <release>21</release>
        <compilerArgs><arg>--enable-preview</arg></compilerArgs>
    </configuration>
</plugin>
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <configuration>
        <argLine>--enable-preview</argLine>
    </configuration>
</plugin>

Structured Concurrency vs ExecutorService

AspectExecutorServiceStructuredTaskScope
Task relationshipsUnrelated to executorParent-child enforced
Failure propagationManual future.get()Automatic via throwIfFailed()
CancellationManual for each futureAutomatic on scope close
Thread leaksPossible on exceptionImpossible — scope waits
ObservabilityFlat thread listHierarchical task tree
Result accessfuture.get() (blocking)subtask.get() after join

Key Takeaways

  • StructuredTaskScope is try-with-resources — its close() guarantees all subtasks finish before the parent continues
  • ShutdownOnFailure: cancels all remaining tasks when any task fails — use for all-or-nothing parallel fetches
  • ShutdownOnSuccess: cancels all remaining tasks when any task succeeds — use for hedged requests
  • After scope.join(), subtask state is SUCCESS, FAILED, or never UNAVAILABLE
  • Nested scopes form a task hierarchy — parent cancellation propagates to all descendants
  • Scoped values are inherited automatically by forked tasks
  • Requires --enable-preview in Java 21 — finalization expected in a future LTS

Next: Scoped Values (JEP 446) — replace ThreadLocal with ScopedValue for safe, efficient context propagation across virtual threads.