typescriptadvanced

Node.js In-Memory Task Queue

A simple in-memory task queue with concurrency control, retries, and priority support.

typescript
interface Task<T = unknown> {
  id: string;
  fn: () => Promise<T>;
  priority: number;
  retries: number;
  maxRetries: number;
}

class TaskQueue {
  private queue: Task[] = [];
  private running = 0;
  private concurrency: number;
  private results = new Map<string, { status: string; result?: unknown; error?: string }>();

  constructor(concurrency = 3) {
    this.concurrency = concurrency;
  }

  add(id: string, fn: () => Promise<unknown>, priority = 0, maxRetries = 2) {
    this.queue.push({ id, fn, priority, retries: 0, maxRetries });
    this.queue.sort((a, b) => b.priority - a.priority);
    this.process();
  }

  private async process() {
    while (this.running < this.concurrency && this.queue.length > 0) {
      const task = this.queue.shift()!;
      this.running++;

      try {
        const result = await task.fn();
        this.results.set(task.id, { status: 'done', result });
        console.log(`Task ${task.id} completed`);
      } catch (err) {
        task.retries++;
        if (task.retries <= task.maxRetries) {
          console.log(`Task ${task.id} retry ${task.retries}/${task.maxRetries}`);
          this.queue.push(task);
        } else {
          this.results.set(task.id, { status: 'failed', error: (err as Error).message });
          console.error(`Task ${task.id} failed permanently`);
        }
      } finally {
        this.running--;
        this.process();
      }
    }
  }

  getStatus(id: string) {
    if (this.results.has(id)) return this.results.get(id);
    if (this.queue.some((t) => t.id === id)) return { status: 'queued' };
    return { status: 'unknown' };
  }

  get pending() { return this.queue.length; }
  get active() { return this.running; }
}

// Usage
const queue = new TaskQueue(2);

for (let i = 0; i < 10; i++) {
  queue.add(`task-${i}`, async () => {
    await new Promise((r) => setTimeout(r, 1000));
    if (Math.random() < 0.3) throw new Error('Random failure');
    return `Result ${i}`;
  }, i % 3);
}

Use Cases

  • Rate-limited API call processing
  • Background job processing without Redis
  • Batch operations with concurrency control

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.