pythonadvanced
Apache Airflow DAG Example
Airflow DAG with task dependencies, retries, SLA, and PythonOperator for daily data pipeline.
pythonPress ⌘/Ctrl + Shift + C to copy
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=1),
}
def extract_data(**context):
execution_date = context["ds"]
# Pull data for the execution date
records = [{"id": 1, "date": execution_date, "value": 100}]
context["ti"].xcom_push(key="raw_count", value=len(records))
return records
def transform_data(**context):
raw_count = context["ti"].xcom_pull(key="raw_count", task_ids="extract")
# Transform logic
return {"processed": raw_count, "status": "success"}
def load_data(**context):
result = context["ti"].xcom_pull(task_ids="transform")
# Load to warehouse
print(f"Loading {result['processed']} records")
with DAG(
dag_id="daily_etl_pipeline",
default_args=default_args,
description="Daily ETL pipeline for sales data",
schedule="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["etl", "production"],
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_data)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_data)
validate = BashOperator(
task_id="validate",
bash_command='echo "Validation complete for {{ ds }}"',
)
extract >> transform >> load >> validateUse Cases
- Orchestrating daily data pipelines
- Scheduling ETL jobs with dependency management
- Building production workflow automation
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonintermediate
Airflow DAG with Python Operators
Create an Apache Airflow DAG with task dependencies, retries, and XCom data passing between tasks.
Best for: Orchestrating daily ETL pipelines
#airflow#dag
pythonintermediate
Prefect ETL Flow with Tasks
Define a Prefect 2 flow with typed tasks, retries, and structured logging for ETL pipelines.
Best for: ETL orchestration
#prefect#etl
pythonadvanced
Detect Overlapping Date Intervals
Identify overlapping time periods in a DataFrame (e.g., booking conflicts or subscription overlaps).
Best for: scheduling conflicts
#pandas#intervals
bashadvanced
Kubernetes Deployment Configuration
Production-ready Kubernetes deployment with replicas, resource limits, health checks, and rolling updates.
Best for: Production container orchestration
#kubernetes#deployment