pythonadvanced

Apache Airflow DAG Example

Airflow DAG with task dependencies, retries, SLA, and PythonOperator for daily data pipeline.

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=1),
}


def extract_data(**context):
    execution_date = context["ds"]
    # Pull data for the execution date
    records = [{"id": 1, "date": execution_date, "value": 100}]
    context["ti"].xcom_push(key="raw_count", value=len(records))
    return records


def transform_data(**context):
    raw_count = context["ti"].xcom_pull(key="raw_count", task_ids="extract")
    # Transform logic
    return {"processed": raw_count, "status": "success"}


def load_data(**context):
    result = context["ti"].xcom_pull(task_ids="transform")
    # Load to warehouse
    print(f"Loading {result['processed']} records")


with DAG(
    dag_id="daily_etl_pipeline",
    default_args=default_args,
    description="Daily ETL pipeline for sales data",
    schedule="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["etl", "production"],
) as dag:

    extract = PythonOperator(task_id="extract", python_callable=extract_data)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_data)

    validate = BashOperator(
        task_id="validate",
        bash_command='echo "Validation complete for {{ ds }}"',
    )

    extract >> transform >> load >> validate

Use Cases

  • Orchestrating daily data pipelines
  • Scheduling ETL jobs with dependency management
  • Building production workflow automation

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.