bashintermediate

Bash Pipeline Monitoring and Alerting

Monitor data pipeline health with row counts, runtime tracking, SLA checks, and Slack alerting.

bash
#!/usr/bin/env bash
set -euo pipefail

# Config
DB_URL="${DATABASE_URL}"
SLACK_WEBHOOK="${SLACK_WEBHOOK_URL}"
SLA_HOUR=8  # pipeline must complete by 8 AM

log() { echo "[$(date '+%H:%M:%S')] $*"; }

alert() {
  local level=$1 msg=$2
  local emoji="✅"; [[ "$level" == "warn" ]] && emoji="⚠️"; [[ "$level" == "error" ]] && emoji="❌"
  curl -sf -X POST "$SLACK_WEBHOOK" \
    -H 'Content-Type: application/json' \
    -d "{\"text\": \"${emoji} Pipeline Monitor: ${msg}\"}"
}

# 1. Check table row counts (detect anomalies)
log "Checking row counts..."
while IFS='|' read -r table expected_min; do
  count=$(psql "$DB_URL" -t -c "SELECT COUNT(*) FROM ${table} WHERE created_at >= CURRENT_DATE")
  count=$(echo "$count" | tr -d ' ')
  if (( count < expected_min )); then
    alert "warn" "${table}: only ${count} rows today (expected >=${expected_min})"
  else
    log "  ${table}: ${count} rows (OK)"
  fi
done <<EOF
orders|100
events|1000
users|10
EOF

# 2. Check freshness
log "Checking data freshness..."
STALE=$(psql "$DB_URL" -t -c "
  SELECT table_name FROM (
    SELECT 'orders' AS table_name, MAX(updated_at) AS last_update FROM orders
    UNION ALL
    SELECT 'events', MAX(created_at) FROM events
  ) t WHERE last_update < NOW() - INTERVAL '2 hours'
")
if [[ -n "$(echo "$STALE" | tr -d '[:space:]')" ]]; then
  alert "warn" "Stale tables: $(echo $STALE | tr -s ' ' ',')"
fi

# 3. SLA check
CURRENT_HOUR=$(date +%H)
if (( CURRENT_HOUR >= SLA_HOUR )); then
  PIPELINE_DONE=$(psql "$DB_URL" -t -c "
    SELECT COUNT(*) FROM pipeline_runs
    WHERE run_date = CURRENT_DATE AND status = 'success'
  " | tr -d ' ')
  if (( PIPELINE_DONE == 0 )); then
    alert "error" "SLA BREACH: pipeline not complete by ${SLA_HOUR}:00!"
  fi
fi

log "Monitoring complete"

Use Cases

  • Monitoring data pipeline health and freshness
  • SLA breach detection and alerting
  • Automated anomaly detection for row counts

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.