CompletableFuture Combinators — allOf, anyOf, compose
Compose async operations with CompletableFuture: allOf, anyOf, thenCombine, handle, and timeout.
import java.util.concurrent.*;
import java.util.List;
import java.util.stream.Collectors;
public class FutureCombinators {
static final ExecutorService pool = Executors.newFixedThreadPool(4);
// Simulate async service call
static <T> CompletableFuture<T> asyncCall(String name, T result, long delayMs) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(delayMs); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.printf("[%s] completed%n", name);
return result;
}, pool);
}
// allOf — wait for all
static CompletableFuture<List<String>> fetchAll() {
List<CompletableFuture<String>> futures = List.of(
asyncCall("user", "Alice", 200),
asyncCall("orders", "3 orders", 300),
asyncCall("prefs", "dark mode", 100)
);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
// anyOf — first to complete wins
static CompletableFuture<Object> raceServices() {
return CompletableFuture.anyOf(
asyncCall("cache", "cached-result", 50),
asyncCall("db", "db-result", 200),
asyncCall("remote", "remote-result", 500)
);
}
// thenCombine — merge two futures
static CompletableFuture<String> combineResults() {
CompletableFuture<String> name = asyncCall("name", "Alice", 100);
CompletableFuture<Integer> age = asyncCall("age", 30, 150);
return name.thenCombine(age, (n, a) -> n + " (age " + a + ")");
}
// Chain with error handling
static CompletableFuture<String> resilientCall() {
return asyncCall("primary", "data", 100)
.thenApply(String::toUpperCase)
.exceptionally(ex -> {
System.err.println("Primary failed: " + ex.getMessage());
return "FALLBACK";
})
.thenCompose(result ->
asyncCall("enrich", result + " + enriched", 50)
)
.orTimeout(2, TimeUnit.SECONDS)
.handle((result, ex) -> {
if (ex != null) return "TIMEOUT";
return result;
});
}
// Collect all results (even partial failures)
static <T> CompletableFuture<List<T>> allSettled(
List<CompletableFuture<T>> futures, T fallback) {
List<CompletableFuture<T>> safe = futures.stream()
.map(f -> f.exceptionally(ex -> fallback))
.toList();
return CompletableFuture.allOf(safe.toArray(new CompletableFuture[0]))
.thenApply(v -> safe.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
public static void main(String[] args) throws Exception {
// All
List<String> all = fetchAll().get();
System.out.println("All: " + all);
// Race
Object winner = raceServices().get();
System.out.println("Winner: " + winner);
// Combine
String combined = combineResults().get();
System.out.println("Combined: " + combined);
// Resilient
String resilient = resilientCall().get();
System.out.println("Resilient: " + resilient);
pool.shutdown();
}
}Use Cases
- Aggregating results from multiple microservices
- Racing multiple data sources for fastest response
- Building resilient async pipelines with fallbacks
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
CompletableFuture — Async Programming
Chain async operations with CompletableFuture: thenApply, thenCompose, allOf, and exception handling.
Best for: Parallel API calls to multiple services
Virtual Threads — Lightweight Concurrency
Use Java 21 virtual threads for massive concurrency without thread pool tuning or reactive frameworks.
Best for: High-concurrency web servers handling thousands of requests
ExecutorService — Thread Pool Management
Create and manage thread pools with ExecutorService: fixed, cached, scheduled, and custom pools.
Best for: Managing concurrent task execution
Concurrent Collections — Thread-Safe Maps
Use ConcurrentHashMap, CopyOnWriteArrayList, and BlockingQueue for thread-safe data structures.
Best for: Thread-safe caching in multi-threaded applications