pythonintermediate

Async ETL Pipeline with asyncio

Run concurrent data fetches and transformations using asyncio.gather for high-throughput pipelines.

python
import asyncio, httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    r = await client.get(url)
    r.raise_for_status()
    return r.json()

async def run_pipeline(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10) as client:
        results = await asyncio.gather(*[fetch(client, u) for u in urls])
    return [r for r in results if r]

if __name__ == '__main__':
    urls = [f'https://jsonplaceholder.typicode.com/posts/{i}' for i in range(1, 11)]
    records = asyncio.run(run_pipeline(urls))
    print(f'Fetched {len(records)} records')

Use Cases

  • concurrent API ingestion
  • high-throughput ETL
  • async data fetch

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.