Airflow DAG with Python Operators
Create an Apache Airflow DAG with task dependencies, retries, and XCom data passing between tasks.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alerts@company.com"],
}
def extract(**context):
"""Extract data from source."""
api_url = Variable.get("data_api_url")
# ... fetch data ...
records = [{"id": 1, "value": 100}, {"id": 2, "value": 200}]
context["ti"].xcom_push(key="raw_data", value=records)
return len(records)
def transform(**context):
"""Transform extracted data."""
raw = context["ti"].xcom_pull(task_ids="extract", key="raw_data")
transformed = [{**r, "value_usd": r["value"] * 1.1} for r in raw]
context["ti"].xcom_push(key="clean_data", value=transformed)
def load(**context):
"""Load transformed data to warehouse."""
data = context["ti"].xcom_pull(task_ids="transform", key="clean_data")
print(f"Loading {len(data)} records to warehouse")
# ... insert into database ...
with DAG(
dag_id="etl_pipeline",
default_args=default_args,
description="Daily ETL: extract, transform, load",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "production"],
) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t3 = PythonOperator(task_id="load", python_callable=load)
t1 >> t2 >> t3Sponsored
Astronomer
Use Cases
- Orchestrating daily ETL pipelines
- Scheduling dependent data tasks with retries
- Passing data between pipeline stages with XCom
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Apache Airflow DAG Example
Airflow DAG with task dependencies, retries, SLA, and PythonOperator for daily data pipeline.
Best for: Orchestrating daily data pipelines
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
Read Large CSV in Chunks with Pandas
Process CSV files larger than RAM by reading in chunks — memory-efficient ETL pattern for data pipelines.
Best for: Processing multi-GB CSV files without running out of memory
Bash ETL Pipeline Script
Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.
Best for: Automating daily data extract and load jobs