pythonintermediate

Python Batch Processing Script

Process large files in configurable batches with progress tracking, error handling, and resume support.

python
import csv
import json
import logging
from pathlib import Path
from typing import Iterator

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
log = logging.getLogger(__name__)


def read_batches(filepath: str, batch_size: int = 1000) -> Iterator[list[dict]]:
    batch: list[dict] = []
    with open(filepath, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
    if batch:
        yield batch


def process_batch(batch: list[dict]) -> list[dict]:
    results = []
    for row in batch:
        row["amount"] = float(row.get("amount", 0))
        row["name"] = row.get("name", "").strip().title()
        if row["amount"] > 0:
            results.append(row)
    return results


def run(
    source: str,
    target: str,
    batch_size: int = 1000,
    checkpoint_file: str = ".checkpoint",
) -> None:
    target_path = Path(target)
    target_path.parent.mkdir(parents=True, exist_ok=True)
    checkpoint = Path(checkpoint_file)

    processed_batches = int(checkpoint.read_text()) if checkpoint.exists() else 0
    total_rows = 0
    errors = 0

    with open(target_path, "a") as out:
        for i, batch in enumerate(read_batches(source, batch_size)):
            if i < processed_batches:
                continue
            try:
                results = process_batch(batch)
                for row in results:
                    out.write(json.dumps(row) + "\n")
                total_rows += len(results)
                checkpoint.write_text(str(i + 1))
            except Exception as e:
                errors += 1
                log.error(f"Batch {i} failed: {e}")

    checkpoint.unlink(missing_ok=True)
    log.info(f"Done: {total_rows} rows processed, {errors} errors")


if __name__ == "__main__":
    run("data/input.csv", "data/output.jsonl", batch_size=5000)

Use Cases

  • Processing large CSV files that don't fit in memory
  • Resumable data migration scripts
  • ETL jobs with checkpoint support

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.