typescriptintermediate

Node.js Stream Pipeline with Transform

Build efficient data processing pipelines using Node.js streams for large file handling.

typescript
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline, Transform } from 'node:stream';
import { promisify } from 'node:util';
import { createGzip } from 'node:zlib';

const pipelineAsync = promisify(pipeline);

// Custom transform: process CSV lines
class CSVTransform extends Transform {
  private buffer = '';
  private header: string[] = [];
  private lineCount = 0;

  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk: Buffer, _encoding: string, callback: Function) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop() || '';

    for (const line of lines) {
      if (this.lineCount === 0) {
        this.header = line.split(',').map((h) => h.trim());
      } else {
        const values = line.split(',');
        const row: Record<string, string> = {};
        this.header.forEach((h, i) => (row[h] = values[i]?.trim() || ''));
        // Transform: filter + format
        if (row.amount && parseFloat(row.amount) > 100) {
          this.push(JSON.stringify(row) + '\n');
        }
      }
      this.lineCount++;
    }
    callback();
  }

  _flush(callback: Function) {
    if (this.buffer.trim()) this.push(this.buffer + '\n');
    callback();
  }
}

// Pipeline: read CSV → transform → compress → write
await pipelineAsync(
  createReadStream('input.csv'),
  new CSVTransform(),
  createGzip(),
  createWriteStream('output.jsonl.gz')
);

console.log('Pipeline complete');

Use Cases

  • Processing large CSV files without loading into memory
  • ETL pipelines with backpressure handling
  • Compressed file generation from data streams

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.