Bash Pipeline Monitoring and Alerting
Monitor data pipeline health with row counts, runtime tracking, SLA checks, and Slack alerting.
#!/usr/bin/env bash
set -euo pipefail
# Config
DB_URL="${DATABASE_URL}"
SLACK_WEBHOOK="${SLACK_WEBHOOK_URL}"
SLA_HOUR=8 # pipeline must complete by 8 AM
log() { echo "[$(date '+%H:%M:%S')] $*"; }
alert() {
local level=$1 msg=$2
local emoji="✅"; [[ "$level" == "warn" ]] && emoji="⚠️"; [[ "$level" == "error" ]] && emoji="❌"
curl -sf -X POST "$SLACK_WEBHOOK" \
-H 'Content-Type: application/json' \
-d "{\"text\": \"${emoji} Pipeline Monitor: ${msg}\"}"
}
# 1. Check table row counts (detect anomalies)
log "Checking row counts..."
while IFS='|' read -r table expected_min; do
count=$(psql "$DB_URL" -t -c "SELECT COUNT(*) FROM ${table} WHERE created_at >= CURRENT_DATE")
count=$(echo "$count" | tr -d ' ')
if (( count < expected_min )); then
alert "warn" "${table}: only ${count} rows today (expected >=${expected_min})"
else
log " ${table}: ${count} rows (OK)"
fi
done <<EOF
orders|100
events|1000
users|10
EOF
# 2. Check freshness
log "Checking data freshness..."
STALE=$(psql "$DB_URL" -t -c "
SELECT table_name FROM (
SELECT 'orders' AS table_name, MAX(updated_at) AS last_update FROM orders
UNION ALL
SELECT 'events', MAX(created_at) FROM events
) t WHERE last_update < NOW() - INTERVAL '2 hours'
")
if [[ -n "$(echo "$STALE" | tr -d '[:space:]')" ]]; then
alert "warn" "Stale tables: $(echo $STALE | tr -s ' ' ',')"
fi
# 3. SLA check
CURRENT_HOUR=$(date +%H)
if (( CURRENT_HOUR >= SLA_HOUR )); then
PIPELINE_DONE=$(psql "$DB_URL" -t -c "
SELECT COUNT(*) FROM pipeline_runs
WHERE run_date = CURRENT_DATE AND status = 'success'
" | tr -d ' ')
if (( PIPELINE_DONE == 0 )); then
alert "error" "SLA BREACH: pipeline not complete by ${SLA_HOUR}:00!"
fi
fi
log "Monitoring complete"Use Cases
- Monitoring data pipeline health and freshness
- SLA breach detection and alerting
- Automated anomaly detection for row counts
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Bash ETL Pipeline Script
Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.
Best for: Automating daily data extract and load jobs
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
Cron Data Sync — Database to S3
Automated script to export database tables to compressed CSV and sync to S3 on a schedule.
Best for: Nightly database exports to cloud storage
Spark Submit — Job Launcher Script
Launch PySpark jobs with spark-submit including cluster configuration, dependencies, and monitoring.
Best for: Launching PySpark batch jobs on YARN clusters