pythonintermediate
Prefect ETL Flow with Tasks
Define a Prefect 2 flow with typed tasks, retries, and structured logging for ETL pipelines.
pythonPress ⌘/Ctrl + Shift + C to copy
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1), retries=3)
def extract(url: str) -> list[dict]:
import httpx
return httpx.get(url).json()
@task
def transform(records: list[dict]) -> list[dict]:
return [{**r, 'name': r['name'].upper()} for r in records]
@task
def load(records: list[dict]) -> None:
logger = get_run_logger()
logger.info('Loading %d records', len(records))
@flow(name='my-etl')
def run_etl(url: str) -> None:
raw = extract(url)
transformed = transform(raw)
load(transformed)
if __name__ == '__main__':
run_etl(url='https://jsonplaceholder.typicode.com/users')Use Cases
- ETL orchestration
- scheduled pipelines
- task retries
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonintermediate
Airflow DAG with Python Operators
Create an Apache Airflow DAG with task dependencies, retries, and XCom data passing between tasks.
Best for: Orchestrating daily ETL pipelines
#airflow#dag
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonadvanced
Apache Airflow DAG Example
Airflow DAG with task dependencies, retries, SLA, and PythonOperator for daily data pipeline.
Best for: Orchestrating daily data pipelines
#airflow#dag
pythonintermediate
Python Batch Processing Script
Process large files in configurable batches with progress tracking, error handling, and resume support.
Best for: Processing large CSV files that don't fit in memory
#batch-processing#python