pythonintermediate
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
pythonPress ⌘/Ctrl + Shift + C to copy
import csv
import json
import logging
from pathlib import Path
from typing import Iterator
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
log = logging.getLogger(__name__)
def read_batches(filepath: str, batch_size: int = 1000) -> Iterator[list[dict]]:
batch: list[dict] = []
with open(filepath, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def process_batch(batch: list[dict]) -> list[dict]:
results = []
for row in batch:
row["amount"] = float(row.get("amount", 0))
row["name"] = row.get("name", "").strip().title()
if row["amount"] > 0:
results.append(row)
return results
def run(
source: str,
target: str,
batch_size: int = 1000,
checkpoint_file: str = ".checkpoint",
) -> None:
target_path = Path(target)
target_path.parent.mkdir(parents=True, exist_ok=True)
checkpoint = Path(checkpoint_file)
processed_batches = int(checkpoint.read_text()) if checkpoint.exists() else 0
total_rows = 0
errors = 0
with open(target_path, "a") as out:
for i, batch in enumerate(read_batches(source, batch_size)):
if i < processed_batches:
continue
try:
results = process_batch(batch)
for row in results:
out.write(json.dumps(row) + "\n")
total_rows += len(results)
checkpoint.write_text(str(i + 1))
except Exception as e:
errors += 1
log.error(f"Batch {i} failed: {e}")
checkpoint.unlink(missing_ok=True)
log.info(f"Done: {total_rows} rows processed, {errors} errors")
if __name__ == "__main__":
run("data/input.csv", "data/output.jsonl", batch_size=5000)Use Cases
- Processing large CSV files that don't fit in memory
- Resumable data migration scripts
- ETL jobs with checkpoint support
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonadvanced
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
Best for: Replicating data between databases
#database#sync
pythonbeginner
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Best for: Cleaning raw data files for analysis
#pandas#dataframe
pythonintermediate
Nested JSON Flattening in Python
Flatten deeply nested JSON structures into flat dictionaries suitable for DataFrames or CSV export.
Best for: Converting API responses to flat tables
#json#flattening