typescriptadvanced

Streams: Readable, Writable, Transform

Build custom readable, writable, and transform streams for efficient data processing in Node.js.

typescript
import { Readable, Writable, Transform, pipeline } from 'stream';
import { promisify } from 'util';
import { createGzip } from 'zlib';
import { createWriteStream } from 'fs';

const pipelineAsync = promisify(pipeline);

// Custom Readable: generates data
class NumberStream extends Readable {
  private current = 0;
  constructor(private max: number) {
    super({ objectMode: true });
  }

  _read() {
    if (this.current >= this.max) {
      this.push(null); // signal end
    } else {
      this.push({ value: this.current++, timestamp: Date.now() });
    }
  }
}

// Custom Transform: process each chunk
class DoubleTransform extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(
    chunk: { value: number; timestamp: number },
    _encoding: string,
    callback: (error?: Error | null, data?: any) => void
  ) {
    callback(null, {
      ...chunk,
      doubled: chunk.value * 2,
    });
  }
}

// Custom Transform: filter
class FilterTransform extends Transform {
  constructor(private predicate: (item: any) => boolean) {
    super({ objectMode: true });
  }

  _transform(chunk: any, _encoding: string, callback: Function) {
    if (this.predicate(chunk)) {
      callback(null, chunk);
    } else {
      callback(); // skip
    }
  }
}

// Custom Writable: consume data
class LogWriter extends Writable {
  private count = 0;

  constructor() {
    super({ objectMode: true });
  }

  _write(chunk: any, _encoding: string, callback: Function) {
    this.count++;
    console.log(`  [${this.count}]`, chunk);
    callback();
  }

  _final(callback: Function) {
    console.log(`  Total: ${this.count} items`);
    callback();
  }
}

// Pipeline: compose streams safely
async function run() {
  console.log('Object stream pipeline:');

  await pipelineAsync(
    new NumberStream(10),
    new DoubleTransform(),
    new FilterTransform((item) => item.doubled > 8),
    new LogWriter()
  );

  console.log('\nText transform pipeline:');

  // String transform: uppercase lines
  const upperCase = new Transform({
    transform(chunk, _encoding, callback) {
      callback(null, chunk.toString().toUpperCase());
    },
  });

  // Create readable from array
  const input = Readable.from(['hello\n', 'world\n', 'stream\n']);

  await pipelineAsync(
    input,
    upperCase,
    process.stdout
  );
}

run().catch(console.error);

Use Cases

  • Processing large files without loading into memory
  • Data transformation pipelines
  • ETL and data processing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.