pythonintermediate

Prefect ETL Flow with Tasks

Define a Prefect 2 flow with typed tasks, retries, and structured logging for ETL pipelines.

python
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1), retries=3)
def extract(url: str) -> list[dict]:
    import httpx
    return httpx.get(url).json()

@task
def transform(records: list[dict]) -> list[dict]:
    return [{**r, 'name': r['name'].upper()} for r in records]

@task
def load(records: list[dict]) -> None:
    logger = get_run_logger()
    logger.info('Loading %d records', len(records))

@flow(name='my-etl')
def run_etl(url: str) -> None:
    raw = extract(url)
    transformed = transform(raw)
    load(transformed)

if __name__ == '__main__':
    run_etl(url='https://jsonplaceholder.typicode.com/users')

Use Cases

  • ETL orchestration
  • scheduled pipelines
  • task retries

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.