pythonadvanced

Python ETL Pipeline Example

Complete extract-transform-load pipeline with error handling, logging, and incremental processing.

python
import logging
import pandas as pd
from pathlib import Path
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)


def extract(source: str) -> pd.DataFrame:
    log.info(f"Extracting data from {source}")
    df = pd.read_csv(source)
    log.info(f"Extracted {len(df)} rows")
    return df


def transform(df: pd.DataFrame) -> pd.DataFrame:
    log.info("Starting transformations")
    initial_count = len(df)

    # Remove duplicates
    df = df.drop_duplicates(subset=["id"])

    # Clean text fields
    for col in df.select_dtypes(include="object").columns:
        df[col] = df[col].str.strip()

    # Parse dates
    df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")

    # Remove invalid rows
    df = df.dropna(subset=["id", "created_at"])

    # Add derived fields
    df["year_month"] = df["created_at"].dt.to_period("M").astype(str)
    df["processed_at"] = datetime.now().isoformat()

    log.info(f"Transformed: {initial_count} -> {len(df)} rows")
    return df


def load(df: pd.DataFrame, target: str) -> None:
    path = Path(target)
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(target, index=False, engine="pyarrow")
    log.info(f"Loaded {len(df)} rows to {target}")


def run_pipeline(source: str, target: str) -> None:
    try:
        raw = extract(source)
        clean = transform(raw)
        load(clean, target)
        log.info("Pipeline completed successfully")
    except Exception as e:
        log.error(f"Pipeline failed: {e}")
        raise


if __name__ == "__main__":
    run_pipeline("data/raw/input.csv", "data/processed/output.parquet")

Sponsored

Snowflake — The Data Cloud for all your workloads

Use Cases

  • Automating data ingestion from CSV to warehouse
  • Batch processing and cleaning raw datasets
  • Building reproducible data processing workflows

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.