Generator Pipeline for Data Processing
Chain generators to build memory-efficient data processing pipelines for large files and streams.
import csv
import io
from typing import Generator, Iterable
def read_lines(filepath: str) -> Generator[str, None, None]:
"""Read file line by line without loading into memory."""
with open(filepath, "r") as f:
for line in f:
yield line.rstrip("\n")
def parse_csv_rows(lines: Iterable[str]) -> Generator[dict, None, None]:
"""Parse CSV lines into dictionaries."""
reader = csv.DictReader(io.StringIO("\n".join(lines)))
yield from reader
def filter_rows(
rows: Iterable[dict], key: str, value: str
) -> Generator[dict, None, None]:
"""Filter rows where key matches value."""
for row in rows:
if row.get(key) == value:
yield row
def transform(
rows: Iterable[dict], fn: callable
) -> Generator[dict, None, None]:
"""Apply a transformation function to each row."""
for row in rows:
yield fn(row)
def batch(
items: Iterable, size: int = 100
) -> Generator[list, None, None]:
"""Group items into fixed-size batches."""
chunk: list = []
for item in items:
chunk.append(item)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
# Usage — compose a pipeline:
# lines = read_lines("data.csv")
# rows = parse_csv_rows(lines)
# active = filter_rows(rows, "status", "active")
# cleaned = transform(active, lambda r: {**r, "email": r["email"].lower()})
# for chunk in batch(cleaned, size=50):
# db.bulk_insert(chunk)Use Cases
- Large file ETL
- Log processing
- Streaming data transforms
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Partial Prerendering with Suspense
Combine static shells with streamed dynamic content using React Suspense for instant page loads.
LRU Cache with TTL Support
Extend functools.lru_cache with time-based expiration for caching expensive function calls with staleness control.
functools.cache and lru_cache
Memoize expensive function calls with functools.cache and lru_cache for automatic result caching.
Redis Cache Get/Set Helper
Type-safe Redis cache wrapper with automatic JSON serialization, TTL support, and cache-aside pattern.