Bash ETL Pipeline Script
Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.
#!/usr/bin/env bash
set -euo pipefail
# Configuration
DATE=${1:-$(date -d yesterday +%Y-%m-%d)}
DATA_DIR="/data/raw/${DATE}"
OUTPUT_DIR="/data/processed/${DATE}"
LOG_FILE="/var/log/etl/${DATE}.log"
mkdir -p "$DATA_DIR" "$OUTPUT_DIR" "$(dirname "$LOG_FILE")"
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"; }
trap 'log "ERROR: ETL failed at line $LINENO"; exit 1' ERR
# --- Extract ---
log "EXTRACT: Downloading data for ${DATE}"
curl -sf "https://api.example.com/export?date=${DATE}" \
-H "Authorization: Bearer ${API_TOKEN}" \
-o "${DATA_DIR}/raw.json.gz"
gunzip -f "${DATA_DIR}/raw.json.gz"
log "EXTRACT: $(wc -l < "${DATA_DIR}/raw.json") lines downloaded"
# --- Transform ---
log "TRANSFORM: Cleaning and converting data"
python3 - <<'PYEOF'
import json, csv, sys
with open(sys.argv[1]) as f:
data = [json.loads(line) for line in f]
with open(sys.argv[2], 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=['id','email','amount','date'])
w.writeheader()
for r in data:
if r.get('amount', 0) > 0:
w.writerow({
'id': r['id'],
'email': r['email'].lower().strip(),
'amount': round(r['amount'], 2),
'date': r['created_at'][:10],
})
PYEOF
log "TRANSFORM: $(wc -l < "${OUTPUT_DIR}/clean.csv") rows after cleaning"
# --- Load ---
log "LOAD: Importing to database"
psql "$DATABASE_URL" -c "\\COPY staging.orders FROM '${OUTPUT_DIR}/clean.csv' CSV HEADER"
psql "$DATABASE_URL" -c "CALL merge_staging_to_production();"
log "DONE: ETL pipeline completed for ${DATE}"Use Cases
- Automating daily data extract and load jobs
- Idempotent ETL scripts for cron scheduling
- Data pipeline with logging and error handling
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
Bash Pipeline Monitoring and Alerting
Monitor data pipeline health with row counts, runtime tracking, SLA checks, and Slack alerting.
Best for: Monitoring data pipeline health and freshness
Read Large CSV in Chunks with Pandas
Process CSV files larger than RAM by reading in chunks — memory-efficient ETL pattern for data pipelines.
Best for: Processing multi-GB CSV files without running out of memory
Airflow DAG with Python Operators
Create an Apache Airflow DAG with task dependencies, retries, and XCom data passing between tasks.
Best for: Orchestrating daily ETL pipelines