typescriptadvanced

Async Iterators for Stream Processing

Process streams and async data sources with for-await-of loops and custom async iterators in Node.js.

typescript
import { createReadStream } from 'fs';
import { createInterface } from 'readline';

// Read file line by line with async iterator
async function* readLines(filePath: string): AsyncGenerator<string> {
  const rl = createInterface({
    input: createReadStream(filePath),
    crlfDelay: Infinity,
  });
  for await (const line of rl) {
    yield line;
  }
}

// Async iterator with backpressure
async function* paginate<T>(
  fetchPage: (cursor: number) => Promise<{ items: T[]; hasMore: boolean }>,
): AsyncGenerator<T> {
  let cursor = 0;
  let hasMore = true;
  while (hasMore) {
    const { items, hasMore: more } = await fetchPage(cursor);
    for (const item of items) yield item;
    hasMore = more;
    cursor += items.length;
  }
}

// Transform async iterator (map)
async function* asyncMap<T, U>(
  source: AsyncIterable<T>,
  fn: (item: T) => U | Promise<U>,
): AsyncGenerator<U> {
  for await (const item of source) {
    yield await fn(item);
  }
}

// Filter async iterator
async function* asyncFilter<T>(
  source: AsyncIterable<T>,
  predicate: (item: T) => boolean | Promise<boolean>,
): AsyncGenerator<T> {
  for await (const item of source) {
    if (await predicate(item)) yield item;
  }
}

// Batch async iterator
async function* asyncBatch<T>(
  source: AsyncIterable<T>,
  size: number,
): AsyncGenerator<T[]> {
  let batch: T[] = [];
  for await (const item of source) {
    batch.push(item);
    if (batch.length >= size) { yield batch; batch = []; }
  }
  if (batch.length > 0) yield batch;
}

// Usage
async function main() {
  // Simulated paginated API
  const allUsers = Array.from({ length: 55 }, (_, i) => ({ id: i, name: `User ${i}` }));
  const pages = paginate(async (cursor) => {
    const items = allUsers.slice(cursor, cursor + 10);
    return { items, hasMore: cursor + 10 < allUsers.length };
  });

  // Chain: paginate -> filter -> batch
  const filtered = asyncFilter(pages, (u) => u.id % 3 === 0);
  const batched = asyncBatch(filtered, 5);

  for await (const batch of batched) {
    console.log(`Batch: ${batch.map(u => u.name).join(', ')}`);
  }
}

main();

Use Cases

  • Paginated API data consumption
  • Large file processing
  • Stream transformation pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.