javaadvanced

Structured Concurrency (Java 21+)

Use StructuredTaskScope for parallel subtasks with automatic cancellation and error propagation.

java
import java.util.concurrent.*;
import java.time.Duration;
import java.util.List;

public class StructuredConcurrencyDemo {

    record User(String name, String email) {}
    record Order(String id, double total) {}
    record Dashboard(User user, List<Order> orders, String recommendation) {}

    // Simulate service calls
    static User fetchUser(String id) throws InterruptedException {
        Thread.sleep(200);
        return new User("Alice", "alice@example.com");
    }

    static List<Order> fetchOrders(String userId) throws InterruptedException {
        Thread.sleep(300);
        return List.of(new Order("ord-1", 99.99), new Order("ord-2", 49.50));
    }

    static String fetchRecommendation(String userId) throws InterruptedException {
        Thread.sleep(150);
        return "You might like: Widget Pro";
    }

    // StructuredTaskScope.ShutdownOnFailure — cancel all on first failure
    static Dashboard loadDashboard(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Fork parallel subtasks
            Subtask<User> userTask = scope.fork(() -> fetchUser(userId));
            Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(userId));
            Subtask<String> recTask = scope.fork(() -> fetchRecommendation(userId));

            // Wait for all to complete (or first failure)
            scope.join()           // wait for all
                 .throwIfFailed(); // propagate exception

            // All succeeded — get results
            return new Dashboard(
                userTask.get(),
                ordersTask.get(),
                recTask.get()
            );
        }
    }

    // ShutdownOnSuccess — return first successful result
    static String fetchFromAnyMirror(String resource) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            scope.fork(() -> fetchFromMirror("us-east", resource));
            scope.fork(() -> fetchFromMirror("eu-west", resource));
            scope.fork(() -> fetchFromMirror("ap-south", resource));

            scope.join();
            return scope.result(); // first to succeed
        }
    }

    static String fetchFromMirror(String region, String resource)
            throws InterruptedException {
        Thread.sleep((long) (Math.random() * 500));
        return "[" + region + "] " + resource;
    }

    public static void main(String[] args) throws Exception {
        // Parallel dashboard load
        long start = System.currentTimeMillis();
        Dashboard dashboard = loadDashboard("user-123");
        long elapsed = System.currentTimeMillis() - start;

        System.out.printf("Dashboard loaded in %dms%n", elapsed); // ~300ms (not 650ms)
        System.out.println("User: " + dashboard.user().name());
        System.out.println("Orders: " + dashboard.orders().size());
        System.out.println("Rec: " + dashboard.recommendation());

        // Race to fastest mirror
        String data = fetchFromAnyMirror("config.json");
        System.out.println("Got: " + data);
    }
}

Use Cases

  • Parallel API aggregation with auto-cancellation
  • Racing multiple data sources for fastest response
  • Reliable concurrent task management

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.