pythonintermediate
Async ETL Pipeline with asyncio
Run concurrent data fetches and transformations using asyncio.gather for high-throughput pipelines.
pythonPress ⌘/Ctrl + Shift + C to copy
import asyncio, httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
r = await client.get(url)
r.raise_for_status()
return r.json()
async def run_pipeline(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=10) as client:
results = await asyncio.gather(*[fetch(client, u) for u in urls])
return [r for r in results if r]
if __name__ == '__main__':
urls = [f'https://jsonplaceholder.typicode.com/posts/{i}' for i in range(1, 11)]
records = asyncio.run(run_pipeline(urls))
print(f'Fetched {len(records)} records')Use Cases
- concurrent API ingestion
- high-throughput ETL
- async data fetch
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonintermediate
asyncio.gather Concurrent Tasks
Run multiple async operations concurrently with asyncio.gather and proper error handling.
Best for: Parallel API calls
#asyncio#concurrency
typescriptadvanced
Node.js In-Memory Task Queue
A simple in-memory task queue with concurrency control, retries, and priority support.
Best for: Rate-limited API call processing
#nodejs#queue
typescriptintermediate
Promise Concurrency Patterns
Master Promise.all, allSettled, race, any — parallel execution with error handling and timeouts.
Best for: Parallel API calls with error handling
#nodejs#promise
typescriptintermediate
Semaphore for Concurrency Limiting
Control concurrent async operations with a semaphore pattern: rate limiting, connection pooling, and batch processing.
Best for: API rate limiting
#nodejs#concurrency