Python Streaming Data Processing
Process streaming data with generators, windowed aggregation, and memory-efficient line-by-line reading.
import json
import time
from collections import deque
from datetime import datetime
from typing import Iterator, Any
def stream_jsonl(filepath: str) -> Iterator[dict]:
"""Memory-efficient line-by-line JSON reader."""
with open(filepath) as f:
for line in f:
line = line.strip()
if line:
yield json.loads(line)
def windowed_aggregate(
stream: Iterator[dict],
window_seconds: int = 60,
key_field: str = "event_type",
value_field: str = "amount",
) -> Iterator[dict[str, Any]]:
"""Tumbling window aggregation over a data stream."""
window: deque[tuple[float, dict]] = deque()
counts: dict[str, int] = {}
sums: dict[str, float] = {}
window_start = time.time()
for record in stream:
now = time.time()
# Emit window if time has elapsed
if now - window_start >= window_seconds:
yield {
"window_start": datetime.fromtimestamp(window_start).isoformat(),
"window_end": datetime.fromtimestamp(now).isoformat(),
"aggregations": {
k: {"count": counts.get(k, 0), "sum": sums.get(k, 0.0)}
for k in set(list(counts) + list(sums))
},
}
counts.clear()
sums.clear()
window.clear()
window_start = now
# Accumulate
key = record.get(key_field, "unknown")
counts[key] = counts.get(key, 0) + 1
sums[key] = sums.get(key, 0.0) + float(record.get(value_field, 0))
window.append((now, record))
def process_stream(filepath: str) -> None:
events = stream_jsonl(filepath)
for window_result in windowed_aggregate(events, window_seconds=60):
print(json.dumps(window_result, indent=2))
if __name__ == "__main__":
process_stream("data/events.jsonl")Use Cases
- Processing large event log files efficiently
- Windowed aggregation for analytics dashboards
- Memory-efficient data stream processing
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Streaming API Response
Stream long-running API responses using ReadableStream and TransformStream for real-time data delivery.
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.