javaintermediate

In-Memory Event Bus — Pub/Sub Pattern

Build a type-safe in-memory event bus with subscribe, publish, and async event delivery.

java
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class EventBus {
    private final Map<Class<?>, List<Consumer<?>>> subscribers = new ConcurrentHashMap<>();
    private final ExecutorService executor = Executors.newCachedThreadPool();

    // Subscribe to an event type
    public <T> void subscribe(Class<T> eventType, Consumer<T> handler) {
        subscribers.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
            .add(handler);
    }

    // Publish event synchronously
    @SuppressWarnings("unchecked")
    public <T> void publish(T event) {
        var handlers = subscribers.get(event.getClass());
        if (handlers != null) {
            for (var handler : handlers) {
                ((Consumer<T>) handler).accept(event);
            }
        }
    }

    // Publish event asynchronously
    @SuppressWarnings("unchecked")
    public <T> CompletableFuture<Void> publishAsync(T event) {
        var handlers = subscribers.get(event.getClass());
        if (handlers == null) return CompletableFuture.completedFuture(null);

        CompletableFuture<?>[] futures = handlers.stream()
            .map(handler -> CompletableFuture.runAsync(
                () -> ((Consumer<T>) handler).accept(event), executor))
            .toArray(CompletableFuture[]::new);

        return CompletableFuture.allOf(futures);
    }

    // Unsubscribe
    public <T> void unsubscribe(Class<T> eventType, Consumer<T> handler) {
        var handlers = subscribers.get(eventType);
        if (handlers != null) handlers.remove(handler);
    }

    public void shutdown() {
        executor.shutdown();
    }

    // Event records
    record UserCreated(String userId, String email) {}
    record OrderPlaced(String orderId, double total) {}
    record PaymentProcessed(String orderId, boolean success) {}

    public static void main(String[] args) throws Exception {
        EventBus bus = new EventBus();

        // Subscribe
        bus.subscribe(UserCreated.class, e ->
            System.out.println("Send welcome email to: " + e.email()));
        bus.subscribe(UserCreated.class, e ->
            System.out.println("Create user profile for: " + e.userId()));
        bus.subscribe(OrderPlaced.class, e ->
            System.out.printf("Process order %s ($%.2f)%n", e.orderId(), e.total()));

        // Publish
        bus.publish(new UserCreated("u-123", "alice@test.com"));
        bus.publishAsync(new OrderPlaced("ord-456", 99.99)).get();

        bus.shutdown();
    }
}

Use Cases

  • Decoupled event-driven architectures
  • Domain event handling in DDD
  • In-process message passing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.