typescriptadvanced
Streams: Readable, Writable, Transform
Build custom readable, writable, and transform streams for efficient data processing in Node.js.
typescriptPress ⌘/Ctrl + Shift + C to copy
import { Readable, Writable, Transform, pipeline } from 'stream';
import { promisify } from 'util';
import { createGzip } from 'zlib';
import { createWriteStream } from 'fs';
const pipelineAsync = promisify(pipeline);
// Custom Readable: generates data
class NumberStream extends Readable {
private current = 0;
constructor(private max: number) {
super({ objectMode: true });
}
_read() {
if (this.current >= this.max) {
this.push(null); // signal end
} else {
this.push({ value: this.current++, timestamp: Date.now() });
}
}
}
// Custom Transform: process each chunk
class DoubleTransform extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(
chunk: { value: number; timestamp: number },
_encoding: string,
callback: (error?: Error | null, data?: any) => void
) {
callback(null, {
...chunk,
doubled: chunk.value * 2,
});
}
}
// Custom Transform: filter
class FilterTransform extends Transform {
constructor(private predicate: (item: any) => boolean) {
super({ objectMode: true });
}
_transform(chunk: any, _encoding: string, callback: Function) {
if (this.predicate(chunk)) {
callback(null, chunk);
} else {
callback(); // skip
}
}
}
// Custom Writable: consume data
class LogWriter extends Writable {
private count = 0;
constructor() {
super({ objectMode: true });
}
_write(chunk: any, _encoding: string, callback: Function) {
this.count++;
console.log(` [${this.count}]`, chunk);
callback();
}
_final(callback: Function) {
console.log(` Total: ${this.count} items`);
callback();
}
}
// Pipeline: compose streams safely
async function run() {
console.log('Object stream pipeline:');
await pipelineAsync(
new NumberStream(10),
new DoubleTransform(),
new FilterTransform((item) => item.doubled > 8),
new LogWriter()
);
console.log('\nText transform pipeline:');
// String transform: uppercase lines
const upperCase = new Transform({
transform(chunk, _encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
});
// Create readable from array
const input = Readable.from(['hello\n', 'world\n', 'stream\n']);
await pipelineAsync(
input,
upperCase,
process.stdout
);
}
run().catch(console.error);Use Cases
- Processing large files without loading into memory
- Data transformation pipelines
- ETL and data processing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
typescriptintermediate
Node.js Stream Pipeline with Transform
Build efficient data processing pipelines using Node.js streams for large file handling.
Best for: Processing large CSV files without loading into memory
#nodejs#streams
typescriptadvanced
Async Iterators for Stream Processing
Process streams and async data sources with for-await-of loops and custom async iterators in Node.js.
Best for: Paginated API data consumption
#nodejs#async-iterator
typescriptadvanced
Node.js Worker Threads for Parallel Processing
Use Worker Threads to run CPU-intensive tasks in parallel without blocking the event loop.
Best for: CPU-intensive data processing without blocking
#nodejs#worker-threads
typescriptintermediate
TypeScript Typed Event Emitter
Create type-safe event emitters in Node.js with full TypeScript support and autocomplete.
Best for: Type-safe pub/sub communication between modules
#nodejs#events