typescriptadvanced
Async Iterators for Stream Processing
Process streams and async data sources with for-await-of loops and custom async iterators in Node.js.
typescriptPress ⌘/Ctrl + Shift + C to copy
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
// Read file line by line with async iterator
async function* readLines(filePath: string): AsyncGenerator<string> {
const rl = createInterface({
input: createReadStream(filePath),
crlfDelay: Infinity,
});
for await (const line of rl) {
yield line;
}
}
// Async iterator with backpressure
async function* paginate<T>(
fetchPage: (cursor: number) => Promise<{ items: T[]; hasMore: boolean }>,
): AsyncGenerator<T> {
let cursor = 0;
let hasMore = true;
while (hasMore) {
const { items, hasMore: more } = await fetchPage(cursor);
for (const item of items) yield item;
hasMore = more;
cursor += items.length;
}
}
// Transform async iterator (map)
async function* asyncMap<T, U>(
source: AsyncIterable<T>,
fn: (item: T) => U | Promise<U>,
): AsyncGenerator<U> {
for await (const item of source) {
yield await fn(item);
}
}
// Filter async iterator
async function* asyncFilter<T>(
source: AsyncIterable<T>,
predicate: (item: T) => boolean | Promise<boolean>,
): AsyncGenerator<T> {
for await (const item of source) {
if (await predicate(item)) yield item;
}
}
// Batch async iterator
async function* asyncBatch<T>(
source: AsyncIterable<T>,
size: number,
): AsyncGenerator<T[]> {
let batch: T[] = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= size) { yield batch; batch = []; }
}
if (batch.length > 0) yield batch;
}
// Usage
async function main() {
// Simulated paginated API
const allUsers = Array.from({ length: 55 }, (_, i) => ({ id: i, name: `User ${i}` }));
const pages = paginate(async (cursor) => {
const items = allUsers.slice(cursor, cursor + 10);
return { items, hasMore: cursor + 10 < allUsers.length };
});
// Chain: paginate -> filter -> batch
const filtered = asyncFilter(pages, (u) => u.id % 3 === 0);
const batched = asyncBatch(filtered, 5);
for await (const batch of batched) {
console.log(`Batch: ${batch.map(u => u.name).join(', ')}`);
}
}
main();Use Cases
- Paginated API data consumption
- Large file processing
- Stream transformation pipelines
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
typescriptintermediate
Node.js Stream Pipeline with Transform
Build efficient data processing pipelines using Node.js streams for large file handling.
Best for: Processing large CSV files without loading into memory
#nodejs#streams
typescriptadvanced
Node.js In-Memory Task Queue
A simple in-memory task queue with concurrency control, retries, and priority support.
Best for: Rate-limited API call processing
#nodejs#queue
typescriptintermediate
Event Loop and Timers
Understand Node.js event loop phases with setTimeout, setInterval, setImmediate, and process.nextTick.
Best for: Understanding async execution order
#nodejs#event-loop
typescriptadvanced
Streams: Readable, Writable, Transform
Build custom readable, writable, and transform streams for efficient data processing in Node.js.
Best for: Processing large files without loading into memory
#nodejs#streams