pythonintermediate

Generator Pipeline for Data Processing

Chain generators to build memory-efficient data processing pipelines for large files and streams.

python
import csv
import io
from typing import Generator, Iterable


def read_lines(filepath: str) -> Generator[str, None, None]:
    """Read file line by line without loading into memory."""
    with open(filepath, "r") as f:
        for line in f:
            yield line.rstrip("\n")


def parse_csv_rows(lines: Iterable[str]) -> Generator[dict, None, None]:
    """Parse CSV lines into dictionaries."""
    reader = csv.DictReader(io.StringIO("\n".join(lines)))
    yield from reader


def filter_rows(
    rows: Iterable[dict], key: str, value: str
) -> Generator[dict, None, None]:
    """Filter rows where key matches value."""
    for row in rows:
        if row.get(key) == value:
            yield row


def transform(
    rows: Iterable[dict], fn: callable
) -> Generator[dict, None, None]:
    """Apply a transformation function to each row."""
    for row in rows:
        yield fn(row)


def batch(
    items: Iterable, size: int = 100
) -> Generator[list, None, None]:
    """Group items into fixed-size batches."""
    chunk: list = []
    for item in items:
        chunk.append(item)
        if len(chunk) == size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk


# Usage — compose a pipeline:
# lines = read_lines("data.csv")
# rows = parse_csv_rows(lines)
# active = filter_rows(rows, "status", "active")
# cleaned = transform(active, lambda r: {**r, "email": r["email"].lower()})
# for chunk in batch(cleaned, size=50):
#     db.bulk_insert(chunk)

Use Cases

  • Large file ETL
  • Log processing
  • Streaming data transforms

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.