bashintermediate

Bash ETL Pipeline Script

Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.

bash
#!/usr/bin/env bash
set -euo pipefail

# Configuration
DATE=${1:-$(date -d yesterday +%Y-%m-%d)}
DATA_DIR="/data/raw/${DATE}"
OUTPUT_DIR="/data/processed/${DATE}"
LOG_FILE="/var/log/etl/${DATE}.log"

mkdir -p "$DATA_DIR" "$OUTPUT_DIR" "$(dirname "$LOG_FILE")"

log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"; }

trap 'log "ERROR: ETL failed at line $LINENO"; exit 1' ERR

# --- Extract ---
log "EXTRACT: Downloading data for ${DATE}"
curl -sf "https://api.example.com/export?date=${DATE}" \
  -H "Authorization: Bearer ${API_TOKEN}" \
  -o "${DATA_DIR}/raw.json.gz"

gunzip -f "${DATA_DIR}/raw.json.gz"
log "EXTRACT: $(wc -l < "${DATA_DIR}/raw.json") lines downloaded"

# --- Transform ---
log "TRANSFORM: Cleaning and converting data"
python3 - <<'PYEOF'
import json, csv, sys

with open(sys.argv[1]) as f:
    data = [json.loads(line) for line in f]

with open(sys.argv[2], 'w', newline='') as f:
    w = csv.DictWriter(f, fieldnames=['id','email','amount','date'])
    w.writeheader()
    for r in data:
        if r.get('amount', 0) > 0:
            w.writerow({
                'id': r['id'],
                'email': r['email'].lower().strip(),
                'amount': round(r['amount'], 2),
                'date': r['created_at'][:10],
            })
PYEOF
log "TRANSFORM: $(wc -l < "${OUTPUT_DIR}/clean.csv") rows after cleaning"

# --- Load ---
log "LOAD: Importing to database"
psql "$DATABASE_URL" -c "\\COPY staging.orders FROM '${OUTPUT_DIR}/clean.csv' CSV HEADER"
psql "$DATABASE_URL" -c "CALL merge_staging_to_production();"

log "DONE: ETL pipeline completed for ${DATE}"

Use Cases

  • Automating daily data extract and load jobs
  • Idempotent ETL scripts for cron scheduling
  • Data pipeline with logging and error handling

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.