typescriptintermediate

Pub/Sub Messaging Pattern

Implement publish/subscribe messaging with topics, filtered subscriptions, and message replay.

typescript
type Message<T = unknown> = {
  id: string;
  topic: string;
  data: T;
  timestamp: number;
  metadata?: Record<string, string>;
};

type Subscriber<T = unknown> = (message: Message<T>) => void | Promise<void>;
type Filter<T = unknown> = (message: Message<T>) => boolean;

class PubSub {
  private subscribers = new Map<string, { handler: Subscriber; filter?: Filter }[]>();
  private history = new Map<string, Message[]>();
  private maxHistory = 100;
  private counter = 0;

  subscribe<T>(topic: string, handler: Subscriber<T>, filter?: Filter<T>): () => void {
    if (!this.subscribers.has(topic)) this.subscribers.set(topic, []);
    const entry = { handler: handler as Subscriber, filter: filter as Filter | undefined };
    this.subscribers.get(topic)!.push(entry);

    // Return unsubscribe function
    return () => {
      const subs = this.subscribers.get(topic);
      if (subs) {
        const idx = subs.indexOf(entry);
        if (idx >= 0) subs.splice(idx, 1);
      }
    };
  }

  async publish<T>(topic: string, data: T, metadata?: Record<string, string>): Promise<void> {
    const message: Message<T> = {
      id: `msg-${++this.counter}`,
      topic,
      data,
      timestamp: Date.now(),
      metadata,
    };

    // Store in history
    if (!this.history.has(topic)) this.history.set(topic, []);
    const hist = this.history.get(topic)!;
    hist.push(message as Message);
    if (hist.length > this.maxHistory) hist.shift();

    // Notify subscribers
    const subs = this.subscribers.get(topic) ?? [];
    await Promise.all(
      subs
        .filter(s => !s.filter || s.filter(message as Message))
        .map(s => s.handler(message as Message)),
    );
  }

  // Replay messages from history
  async replay(topic: string, handler: Subscriber, since?: number) {
    const msgs = this.history.get(topic) ?? [];
    const filtered = since ? msgs.filter(m => m.timestamp >= since) : msgs;
    for (const msg of filtered) await handler(msg);
  }

  topics() { return [...this.subscribers.keys()]; }
}

// Usage
const bus = new PubSub();

// Subscribe to all orders
bus.subscribe('order', (msg) => {
  console.log(`[All Orders] ${msg.id}:`, msg.data);
});

// Subscribe only to high-value orders
bus.subscribe<{ amount: number; item: string }>(
  'order',
  (msg) => console.log(`[High Value] $${msg.data.amount}`),
  (msg) => (msg.data as { amount: number }).amount > 100,
);

// Publish
bus.publish('order', { amount: 50, item: 'Widget' });
bus.publish('order', { amount: 500, item: 'Server' });
bus.publish('order', { amount: 25, item: 'Cable' });

Use Cases

  • In-process event messaging
  • Module decoupling
  • Event replay and auditing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.