typescriptintermediate

Semaphore for Concurrency Limiting

Control concurrent async operations with a semaphore pattern: rate limiting, connection pooling, and batch processing.

typescript
class Semaphore {
  private queue: (() => void)[] = [];
  private running = 0;

  constructor(private maxConcurrency: number) {}

  async acquire(): Promise<void> {
    if (this.running < this.maxConcurrency) {
      this.running++;
      return;
    }
    return new Promise<void>((resolve) => {
      this.queue.push(() => { this.running++; resolve(); });
    });
  }

  release(): void {
    this.running--;
    const next = this.queue.shift();
    if (next) next();
  }

  async run<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try { return await fn(); }
    finally { this.release(); }
  }

  get active() { return this.running; }
  get pending() { return this.queue.length; }
}

// Concurrent map with limit
async function mapConcurrent<T, R>(
  items: T[],
  concurrency: number,
  fn: (item: T, index: number) => Promise<R>,
): Promise<R[]> {
  const sem = new Semaphore(concurrency);
  return Promise.all(
    items.map((item, i) => sem.run(() => fn(item, i))),
  );
}

// Usage
async function main() {
  const urls = Array.from({ length: 20 }, (_, i) => `https://api.example.com/item/${i}`);

  console.time('concurrent');
  const results = await mapConcurrent(urls, 5, async (url, i) => {
    const delay = Math.random() * 200 + 50;
    await new Promise(r => setTimeout(r, delay));
    console.log(`  [${i}] Fetched ${url} (${delay.toFixed(0)}ms)`);
    return { url, status: 200 };
  });
  console.timeEnd('concurrent');
  console.log(`Fetched ${results.length} items`);
}

main();

Use Cases

  • API rate limiting
  • Database connection pooling
  • Batch file processing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.