Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
import logging
import pandas as pd
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
def extract(source: str) -> pd.DataFrame:
log.info(f"Extracting data from {source}")
df = pd.read_csv(source)
log.info(f"Extracted {len(df)} rows")
return df
def transform(df: pd.DataFrame) -> pd.DataFrame:
log.info("Starting transformations")
initial_count = len(df)
# Remove duplicates
df = df.drop_duplicates(subset=["id"])
# Clean text fields
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
# Parse dates
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
# Remove invalid rows
df = df.dropna(subset=["id", "created_at"])
# Add derived fields
df["year_month"] = df["created_at"].dt.to_period("M").astype(str)
df["processed_at"] = datetime.now().isoformat()
log.info(f"Transformed: {initial_count} -> {len(df)} rows")
return df
def load(df: pd.DataFrame, target: str) -> None:
path = Path(target)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(target, index=False, engine="pyarrow")
log.info(f"Loaded {len(df)} rows to {target}")
def run_pipeline(source: str, target: str) -> None:
try:
raw = extract(source)
clean = transform(raw)
load(clean, target)
log.info("Pipeline completed successfully")
except Exception as e:
log.error(f"Pipeline failed: {e}")
raise
if __name__ == "__main__":
run_pipeline("data/raw/input.csv", "data/processed/output.parquet")Sponsored
Snowflake — The Data Cloud for all your workloads
Use Cases
- Automating data ingestion from CSV to warehouse
- Batch processing and cleaning raw datasets
- Building reproducible data processing workflows
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
GitHub Actions CI/CD Pipeline
Complete GitHub Actions workflow with test, build, and deploy stages for a Node.js application.
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.