typescriptintermediate
Pub/Sub Messaging Pattern
Implement publish/subscribe messaging with topics, filtered subscriptions, and message replay.
typescriptPress ⌘/Ctrl + Shift + C to copy
type Message<T = unknown> = {
id: string;
topic: string;
data: T;
timestamp: number;
metadata?: Record<string, string>;
};
type Subscriber<T = unknown> = (message: Message<T>) => void | Promise<void>;
type Filter<T = unknown> = (message: Message<T>) => boolean;
class PubSub {
private subscribers = new Map<string, { handler: Subscriber; filter?: Filter }[]>();
private history = new Map<string, Message[]>();
private maxHistory = 100;
private counter = 0;
subscribe<T>(topic: string, handler: Subscriber<T>, filter?: Filter<T>): () => void {
if (!this.subscribers.has(topic)) this.subscribers.set(topic, []);
const entry = { handler: handler as Subscriber, filter: filter as Filter | undefined };
this.subscribers.get(topic)!.push(entry);
// Return unsubscribe function
return () => {
const subs = this.subscribers.get(topic);
if (subs) {
const idx = subs.indexOf(entry);
if (idx >= 0) subs.splice(idx, 1);
}
};
}
async publish<T>(topic: string, data: T, metadata?: Record<string, string>): Promise<void> {
const message: Message<T> = {
id: `msg-${++this.counter}`,
topic,
data,
timestamp: Date.now(),
metadata,
};
// Store in history
if (!this.history.has(topic)) this.history.set(topic, []);
const hist = this.history.get(topic)!;
hist.push(message as Message);
if (hist.length > this.maxHistory) hist.shift();
// Notify subscribers
const subs = this.subscribers.get(topic) ?? [];
await Promise.all(
subs
.filter(s => !s.filter || s.filter(message as Message))
.map(s => s.handler(message as Message)),
);
}
// Replay messages from history
async replay(topic: string, handler: Subscriber, since?: number) {
const msgs = this.history.get(topic) ?? [];
const filtered = since ? msgs.filter(m => m.timestamp >= since) : msgs;
for (const msg of filtered) await handler(msg);
}
topics() { return [...this.subscribers.keys()]; }
}
// Usage
const bus = new PubSub();
// Subscribe to all orders
bus.subscribe('order', (msg) => {
console.log(`[All Orders] ${msg.id}:`, msg.data);
});
// Subscribe only to high-value orders
bus.subscribe<{ amount: number; item: string }>(
'order',
(msg) => console.log(`[High Value] $${msg.data.amount}`),
(msg) => (msg.data as { amount: number }).amount > 100,
);
// Publish
bus.publish('order', { amount: 50, item: 'Widget' });
bus.publish('order', { amount: 500, item: 'Server' });
bus.publish('order', { amount: 25, item: 'Cable' });Use Cases
- In-process event messaging
- Module decoupling
- Event replay and auditing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
typescriptintermediate
TypeScript Typed Event Emitter
Create type-safe event emitters in Node.js with full TypeScript support and autocomplete.
Best for: Type-safe pub/sub communication between modules
#nodejs#events
typescriptintermediate
Event-Driven Architecture Pattern
Build loosely coupled systems with typed event bus, async event handlers, and domain event patterns.
Best for: Microservice communication
#nodejs#events
typescriptadvanced
JavaScript Proxy Handler Pattern
Use Proxy and Reflect to create observable objects, validation layers, and dynamic API clients.
Best for: Observable state management
#nodejs#proxy
typescriptintermediate
Middleware Chain Pattern
Implement the middleware/pipeline pattern for composable request processing without Express.
Best for: Custom HTTP framework
#nodejs#middleware