pythonintermediate

HTTPX Async Client with Retry

Make async HTTP requests with HTTPX featuring timeout config, retry logic, and response streaming.

python
import httpx
from typing import AsyncGenerator

async def create_client() -> httpx.AsyncClient:
    return httpx.AsyncClient(
        timeout=httpx.Timeout(10.0, connect=5.0),
        limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
        headers={"User-Agent": "SnippetsLab/1.0"},
    )

async def fetch_json(url: str) -> dict:
    async with await create_client() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

async def stream_download(url: str) -> AsyncGenerator[bytes, None]:
    async with await create_client() as client:
        async with client.stream("GET", url) as response:
            response.raise_for_status()
            async for chunk in response.aiter_bytes(chunk_size=8192):
                yield chunk

# Usage:
# data = await fetch_json("https://api.example.com/data")
# async for chunk in stream_download(url):
#     file.write(chunk)

Use Cases

  • API integrations
  • Web scraping
  • Microservice communication

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.