In-Memory Event Bus — Pub/Sub Pattern
Build a type-safe in-memory event bus with subscribe, publish, and async event delivery.
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
public class EventBus {
private final Map<Class<?>, List<Consumer<?>>> subscribers = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
// Subscribe to an event type
public <T> void subscribe(Class<T> eventType, Consumer<T> handler) {
subscribers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(handler);
}
// Publish event synchronously
@SuppressWarnings("unchecked")
public <T> void publish(T event) {
var handlers = subscribers.get(event.getClass());
if (handlers != null) {
for (var handler : handlers) {
((Consumer<T>) handler).accept(event);
}
}
}
// Publish event asynchronously
@SuppressWarnings("unchecked")
public <T> CompletableFuture<Void> publishAsync(T event) {
var handlers = subscribers.get(event.getClass());
if (handlers == null) return CompletableFuture.completedFuture(null);
CompletableFuture<?>[] futures = handlers.stream()
.map(handler -> CompletableFuture.runAsync(
() -> ((Consumer<T>) handler).accept(event), executor))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures);
}
// Unsubscribe
public <T> void unsubscribe(Class<T> eventType, Consumer<T> handler) {
var handlers = subscribers.get(eventType);
if (handlers != null) handlers.remove(handler);
}
public void shutdown() {
executor.shutdown();
}
// Event records
record UserCreated(String userId, String email) {}
record OrderPlaced(String orderId, double total) {}
record PaymentProcessed(String orderId, boolean success) {}
public static void main(String[] args) throws Exception {
EventBus bus = new EventBus();
// Subscribe
bus.subscribe(UserCreated.class, e ->
System.out.println("Send welcome email to: " + e.email()));
bus.subscribe(UserCreated.class, e ->
System.out.println("Create user profile for: " + e.userId()));
bus.subscribe(OrderPlaced.class, e ->
System.out.printf("Process order %s ($%.2f)%n", e.orderId(), e.total()));
// Publish
bus.publish(new UserCreated("u-123", "alice@test.com"));
bus.publishAsync(new OrderPlaced("ord-456", 99.99)).get();
bus.shutdown();
}
}Use Cases
- Decoupled event-driven architectures
- Domain event handling in DDD
- In-process message passing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Observer Pattern — Event System
Build a type-safe event system using the Observer pattern with generics and functional interfaces.
Best for: Decoupled event-driven architectures
Builder Pattern — Fluent Object Construction
Implement the Builder pattern for complex objects with validation, immutability, and method chaining.
Best for: Constructing complex objects with many optional parameters
Singleton Pattern — Thread-Safe Approaches
Implement thread-safe singletons in Java: enum, holder class, double-checked locking, and eager init.
Best for: Application-wide configuration managers
Strategy Pattern with Lambdas
Implement the Strategy pattern using interfaces and Java lambdas for flexible algorithm selection.
Best for: Swappable pricing or discount algorithms