Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
import logging
import pandas as pd
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
def extract(source: str) -> pd.DataFrame:
log.info(f"Extracting data from {source}")
df = pd.read_csv(source)
log.info(f"Extracted {len(df)} rows")
return df
def transform(df: pd.DataFrame) -> pd.DataFrame:
log.info("Starting transformations")
initial_count = len(df)
# Remove duplicates
df = df.drop_duplicates(subset=["id"])
# Clean text fields
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
# Parse dates
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
# Remove invalid rows
df = df.dropna(subset=["id", "created_at"])
# Add derived fields
df["year_month"] = df["created_at"].dt.to_period("M").astype(str)
df["processed_at"] = datetime.now().isoformat()
log.info(f"Transformed: {initial_count} -> {len(df)} rows")
return df
def load(df: pd.DataFrame, target: str) -> None:
path = Path(target)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(target, index=False, engine="pyarrow")
log.info(f"Loaded {len(df)} rows to {target}")
def run_pipeline(source: str, target: str) -> None:
try:
raw = extract(source)
clean = transform(raw)
load(clean, target)
log.info("Pipeline completed successfully")
except Exception as e:
log.error(f"Pipeline failed: {e}")
raise
if __name__ == "__main__":
run_pipeline("data/raw/input.csv", "data/processed/output.parquet")Sponsored
Snowflake — The Data Cloud for all your workloads
Use Cases
- Automating data ingestion from CSV to warehouse
- Batch processing and cleaning raw datasets
- Building reproducible data processing workflows
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Bash ETL Pipeline Script
Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.
Best for: Automating daily data extract and load jobs
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Best for: Processing large CSV files that don't fit in memory
Database Sync Script in Python
Sync data between two databases with upsert logic, batch processing, and change detection.
Best for: Replicating data between databases
Read Large CSV in Chunks with Pandas
Process CSV files larger than RAM by reading in chunks — memory-efficient ETL pattern for data pipelines.
Best for: Processing multi-GB CSV files without running out of memory