pythonintermediate

Airflow DAG with Python Operators

Create an Apache Airflow DAG with task dependencies, retries, and XCom data passing between tasks.

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable


default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alerts@company.com"],
}


def extract(**context):
    """Extract data from source."""
    api_url = Variable.get("data_api_url")
    # ... fetch data ...
    records = [{"id": 1, "value": 100}, {"id": 2, "value": 200}]
    context["ti"].xcom_push(key="raw_data", value=records)
    return len(records)


def transform(**context):
    """Transform extracted data."""
    raw = context["ti"].xcom_pull(task_ids="extract", key="raw_data")
    transformed = [{**r, "value_usd": r["value"] * 1.1} for r in raw]
    context["ti"].xcom_push(key="clean_data", value=transformed)


def load(**context):
    """Load transformed data to warehouse."""
    data = context["ti"].xcom_pull(task_ids="transform", key="clean_data")
    print(f"Loading {len(data)} records to warehouse")
    # ... insert into database ...


with DAG(
    dag_id="etl_pipeline",
    default_args=default_args,
    description="Daily ETL: extract, transform, load",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    t1 = PythonOperator(task_id="extract", python_callable=extract)
    t2 = PythonOperator(task_id="transform", python_callable=transform)
    t3 = PythonOperator(task_id="load", python_callable=load)

    t1 >> t2 >> t3

Sponsored

Astronomer

Use Cases

  • Orchestrating daily ETL pipelines
  • Scheduling dependent data tasks with retries
  • Passing data between pipeline stages with XCom

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.