javaadvanced

CompletableFuture Combinators — allOf, anyOf, compose

Compose async operations with CompletableFuture: allOf, anyOf, thenCombine, handle, and timeout.

java
import java.util.concurrent.*;
import java.util.List;
import java.util.stream.Collectors;

public class FutureCombinators {

    static final ExecutorService pool = Executors.newFixedThreadPool(4);

    // Simulate async service call
    static <T> CompletableFuture<T> asyncCall(String name, T result, long delayMs) {
        return CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(delayMs); }
            catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            System.out.printf("[%s] completed%n", name);
            return result;
        }, pool);
    }

    // allOf — wait for all
    static CompletableFuture<List<String>> fetchAll() {
        List<CompletableFuture<String>> futures = List.of(
            asyncCall("user", "Alice", 200),
            asyncCall("orders", "3 orders", 300),
            asyncCall("prefs", "dark mode", 100)
        );

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }

    // anyOf — first to complete wins
    static CompletableFuture<Object> raceServices() {
        return CompletableFuture.anyOf(
            asyncCall("cache", "cached-result", 50),
            asyncCall("db", "db-result", 200),
            asyncCall("remote", "remote-result", 500)
        );
    }

    // thenCombine — merge two futures
    static CompletableFuture<String> combineResults() {
        CompletableFuture<String> name = asyncCall("name", "Alice", 100);
        CompletableFuture<Integer> age = asyncCall("age", 30, 150);

        return name.thenCombine(age, (n, a) -> n + " (age " + a + ")");
    }

    // Chain with error handling
    static CompletableFuture<String> resilientCall() {
        return asyncCall("primary", "data", 100)
            .thenApply(String::toUpperCase)
            .exceptionally(ex -> {
                System.err.println("Primary failed: " + ex.getMessage());
                return "FALLBACK";
            })
            .thenCompose(result ->
                asyncCall("enrich", result + " + enriched", 50)
            )
            .orTimeout(2, TimeUnit.SECONDS)
            .handle((result, ex) -> {
                if (ex != null) return "TIMEOUT";
                return result;
            });
    }

    // Collect all results (even partial failures)
    static <T> CompletableFuture<List<T>> allSettled(
            List<CompletableFuture<T>> futures, T fallback) {
        List<CompletableFuture<T>> safe = futures.stream()
            .map(f -> f.exceptionally(ex -> fallback))
            .toList();
        return CompletableFuture.allOf(safe.toArray(new CompletableFuture[0]))
            .thenApply(v -> safe.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }

    public static void main(String[] args) throws Exception {
        // All
        List<String> all = fetchAll().get();
        System.out.println("All: " + all);

        // Race
        Object winner = raceServices().get();
        System.out.println("Winner: " + winner);

        // Combine
        String combined = combineResults().get();
        System.out.println("Combined: " + combined);

        // Resilient
        String resilient = resilientCall().get();
        System.out.println("Resilient: " + resilient);

        pool.shutdown();
    }
}

Use Cases

  • Aggregating results from multiple microservices
  • Racing multiple data sources for fastest response
  • Building resilient async pipelines with fallbacks

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.