Apache Airflow DAG Example
Airflow DAG with task dependencies, retries, SLA, and PythonOperator for daily data pipeline.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=1),
}
def extract_data(**context):
execution_date = context["ds"]
# Pull data for the execution date
records = [{"id": 1, "date": execution_date, "value": 100}]
context["ti"].xcom_push(key="raw_count", value=len(records))
return records
def transform_data(**context):
raw_count = context["ti"].xcom_pull(key="raw_count", task_ids="extract")
# Transform logic
return {"processed": raw_count, "status": "success"}
def load_data(**context):
result = context["ti"].xcom_pull(task_ids="transform")
# Load to warehouse
print(f"Loading {result['processed']} records")
with DAG(
dag_id="daily_etl_pipeline",
default_args=default_args,
description="Daily ETL pipeline for sales data",
schedule="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["etl", "production"],
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_data)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_data)
validate = BashOperator(
task_id="validate",
bash_command='echo "Validation complete for {{ ds }}"',
)
extract >> transform >> load >> validateUse Cases
- Orchestrating daily data pipelines
- Scheduling ETL jobs with dependency management
- Building production workflow automation
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Kubernetes Deployment Configuration
Production-ready Kubernetes deployment with replicas, resource limits, health checks, and rolling updates.
Linux Cron Job Setup Examples
Common crontab entries for database backups, log cleanup, health checks, and certificate renewal.
Pandas DataFrame Transformations
Common pandas DataFrame transformations including column operations, type casting, and string methods.
Pandas DataFrame Filtering Techniques
Filter DataFrames using boolean masks, query syntax, isin, between, and string matching methods.