pythonintermediate

Python Streaming Data Processing

Process streaming data with generators, windowed aggregation, and memory-efficient line-by-line reading.

python
import json
import time
from collections import deque
from datetime import datetime
from typing import Iterator, Any


def stream_jsonl(filepath: str) -> Iterator[dict]:
    """Memory-efficient line-by-line JSON reader."""
    with open(filepath) as f:
        for line in f:
            line = line.strip()
            if line:
                yield json.loads(line)


def windowed_aggregate(
    stream: Iterator[dict],
    window_seconds: int = 60,
    key_field: str = "event_type",
    value_field: str = "amount",
) -> Iterator[dict[str, Any]]:
    """Tumbling window aggregation over a data stream."""
    window: deque[tuple[float, dict]] = deque()
    counts: dict[str, int] = {}
    sums: dict[str, float] = {}
    window_start = time.time()

    for record in stream:
        now = time.time()

        # Emit window if time has elapsed
        if now - window_start >= window_seconds:
            yield {
                "window_start": datetime.fromtimestamp(window_start).isoformat(),
                "window_end": datetime.fromtimestamp(now).isoformat(),
                "aggregations": {
                    k: {"count": counts.get(k, 0), "sum": sums.get(k, 0.0)}
                    for k in set(list(counts) + list(sums))
                },
            }
            counts.clear()
            sums.clear()
            window.clear()
            window_start = now

        # Accumulate
        key = record.get(key_field, "unknown")
        counts[key] = counts.get(key, 0) + 1
        sums[key] = sums.get(key, 0.0) + float(record.get(value_field, 0))
        window.append((now, record))


def process_stream(filepath: str) -> None:
    events = stream_jsonl(filepath)
    for window_result in windowed_aggregate(events, window_seconds=60):
        print(json.dumps(window_result, indent=2))


if __name__ == "__main__":
    process_stream("data/events.jsonl")

Use Cases

  • Processing large event log files efficiently
  • Windowed aggregation for analytics dashboards
  • Memory-efficient data stream processing

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.