typescriptintermediate
Node.js Stream Pipeline with Transform
Build efficient data processing pipelines using Node.js streams for large file handling.
typescriptPress ⌘/Ctrl + Shift + C to copy
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline, Transform } from 'node:stream';
import { promisify } from 'node:util';
import { createGzip } from 'node:zlib';
const pipelineAsync = promisify(pipeline);
// Custom transform: process CSV lines
class CSVTransform extends Transform {
private buffer = '';
private header: string[] = [];
private lineCount = 0;
constructor() {
super({ objectMode: true });
}
_transform(chunk: Buffer, _encoding: string, callback: Function) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
for (const line of lines) {
if (this.lineCount === 0) {
this.header = line.split(',').map((h) => h.trim());
} else {
const values = line.split(',');
const row: Record<string, string> = {};
this.header.forEach((h, i) => (row[h] = values[i]?.trim() || ''));
// Transform: filter + format
if (row.amount && parseFloat(row.amount) > 100) {
this.push(JSON.stringify(row) + '\n');
}
}
this.lineCount++;
}
callback();
}
_flush(callback: Function) {
if (this.buffer.trim()) this.push(this.buffer + '\n');
callback();
}
}
// Pipeline: read CSV → transform → compress → write
await pipelineAsync(
createReadStream('input.csv'),
new CSVTransform(),
createGzip(),
createWriteStream('output.jsonl.gz')
);
console.log('Pipeline complete');Use Cases
- Processing large CSV files without loading into memory
- ETL pipelines with backpressure handling
- Compressed file generation from data streams
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
typescriptadvanced
Streams: Readable, Writable, Transform
Build custom readable, writable, and transform streams for efficient data processing in Node.js.
Best for: Processing large files without loading into memory
#nodejs#streams
typescriptadvanced
Async Iterators for Stream Processing
Process streams and async data sources with for-await-of loops and custom async iterators in Node.js.
Best for: Paginated API data consumption
#nodejs#async-iterator
typescriptadvanced
Node.js Worker Threads for Parallel Processing
Use Worker Threads to run CPU-intensive tasks in parallel without blocking the event loop.
Best for: CPU-intensive data processing without blocking
#nodejs#worker-threads
typescriptintermediate
TypeScript Typed Event Emitter
Create type-safe event emitters in Node.js with full TypeScript support and autocomplete.
Best for: Type-safe pub/sub communication between modules
#nodejs#events