Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
import csv
import json
import logging
from pathlib import Path
from typing import Iterator
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
log = logging.getLogger(__name__)
def read_batches(filepath: str, batch_size: int = 1000) -> Iterator[list[dict]]:
batch: list[dict] = []
with open(filepath, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def process_batch(batch: list[dict]) -> list[dict]:
results = []
for row in batch:
row["amount"] = float(row.get("amount", 0))
row["name"] = row.get("name", "").strip().title()
if row["amount"] > 0:
results.append(row)
return results
def run(
source: str,
target: str,
batch_size: int = 1000,
checkpoint_file: str = ".checkpoint",
) -> None:
target_path = Path(target)
target_path.parent.mkdir(parents=True, exist_ok=True)
checkpoint = Path(checkpoint_file)
processed_batches = int(checkpoint.read_text()) if checkpoint.exists() else 0
total_rows = 0
errors = 0
with open(target_path, "a") as out:
for i, batch in enumerate(read_batches(source, batch_size)):
if i < processed_batches:
continue
try:
results = process_batch(batch)
for row in results:
out.write(json.dumps(row) + "\n")
total_rows += len(results)
checkpoint.write_text(str(i + 1))
except Exception as e:
errors += 1
log.error(f"Batch {i} failed: {e}")
checkpoint.unlink(missing_ok=True)
log.info(f"Done: {total_rows} rows processed, {errors} errors")
if __name__ == "__main__":
run("data/input.csv", "data/output.jsonl", batch_size=5000)Use Cases
- Processing large CSV files that don't fit in memory
- Resumable data migration scripts
- ETL jobs with checkpoint support
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Nested JSON Flattening in Python
Flatten deeply nested JSON structures into flat dictionaries suitable for DataFrames or CSV export.