pythonintermediate

Async HTTP Client with httpx

Production-ready async HTTP client using httpx with retries, timeouts, and connection pooling.

python
import httpx
from typing import Any


class ApiClient:
    def __init__(self, base_url: str, token: str | None = None):
        self._client = httpx.AsyncClient(
            base_url=base_url,
            timeout=httpx.Timeout(10.0, connect=5.0),
            limits=httpx.Limits(max_connections=20, max_keepalive_connections=5),
            headers={"Authorization": f"Bearer {token}"} if token else {},
        )

    async def get(self, path: str, params: dict | None = None) -> Any:
        response = await self._client.get(path, params=params)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, data: dict) -> Any:
        response = await self._client.post(path, json=data)
        response.raise_for_status()
        return response.json()

    async def close(self) -> None:
        await self._client.aclose()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()


# Usage:
# async with ApiClient("https://api.example.com", token="sk-xxx") as client:
#     users = await client.get("/users", params={"page": 1})
#     new_user = await client.post("/users", {"name": "Ada"})

Use Cases

  • REST API consumption
  • Microservice communication
  • Web scraping

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.