pythonintermediate
Generator Pipeline for Data Processing
Chain generators to build memory-efficient data processing pipelines for large files and streams.
pythonPress ⌘/Ctrl + Shift + C to copy
import csv
import io
from typing import Generator, Iterable
def read_lines(filepath: str) -> Generator[str, None, None]:
"""Read file line by line without loading into memory."""
with open(filepath, "r") as f:
for line in f:
yield line.rstrip("\n")
def parse_csv_rows(lines: Iterable[str]) -> Generator[dict, None, None]:
"""Parse CSV lines into dictionaries."""
reader = csv.DictReader(io.StringIO("\n".join(lines)))
yield from reader
def filter_rows(
rows: Iterable[dict], key: str, value: str
) -> Generator[dict, None, None]:
"""Filter rows where key matches value."""
for row in rows:
if row.get(key) == value:
yield row
def transform(
rows: Iterable[dict], fn: callable
) -> Generator[dict, None, None]:
"""Apply a transformation function to each row."""
for row in rows:
yield fn(row)
def batch(
items: Iterable, size: int = 100
) -> Generator[list, None, None]:
"""Group items into fixed-size batches."""
chunk: list = []
for item in items:
chunk.append(item)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
# Usage — compose a pipeline:
# lines = read_lines("data.csv")
# rows = parse_csv_rows(lines)
# active = filter_rows(rows, "status", "active")
# cleaned = transform(active, lambda r: {**r, "email": r["email"].lower()})
# for chunk in batch(cleaned, size=50):
# db.bulk_insert(chunk)Use Cases
- Large file ETL
- Log processing
- Streaming data transforms
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
typescriptadvanced
Multipart Upload Stream Handler
Handles multipart file uploads with streaming to disk without buffering the entire file in memory.
Best for: Large file uploads
#upload#streaming
typescriptadvanced
JSON Stream Parser
Parses large JSON arrays from a readable stream without loading the entire file into memory.
Best for: Processing large log files
#streaming#json
typescriptadvanced
Partial Prerendering with Suspense
Combine static shells with streamed dynamic content using React Suspense for instant page loads.
Best for: E-commerce product pages
#ppr#suspense
typescriptintermediate
Next.js Streaming with Suspense
Stream server components with Suspense boundaries for progressive page loading and better TTFB.
Best for: Progressive loading for data-heavy dashboards
#nextjs#streaming