pythonadvanced
Asyncio Semaphore and Timeout Patterns
Control concurrency with asyncio semaphores, timeouts, and task groups for robust async code.
pythonPress ⌘/Ctrl + Shift + C to copy
import asyncio
async def fetch_url(url: str, sem: asyncio.Semaphore) -> str:
async with sem:
print(f"Fetching {url}...")
await asyncio.sleep(1)
return f"Result from {url}"
async def fetch_all(urls: list[str], max_concurrent: int = 5) -> list[str]:
sem = asyncio.Semaphore(max_concurrent)
tasks = [fetch_url(url, sem) for url in urls]
return await asyncio.gather(*tasks)
# Timeout
async def fetch_with_timeout(url: str, timeout_secs: float = 5.0) -> str:
try:
async with asyncio.timeout(timeout_secs):
await asyncio.sleep(10)
return f"Result from {url}"
except TimeoutError:
return f"Timeout fetching {url}"
# TaskGroup (Python 3.11+)
async def process_batch(items: list[str]) -> list[str]:
results: list[str] = []
async with asyncio.TaskGroup() as tg:
async def process(item: str) -> None:
await asyncio.sleep(0.5)
results.append(f"processed: {item}")
for item in items:
tg.create_task(process(item))
return results
if __name__ == "__main__":
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = asyncio.run(fetch_all(urls, max_concurrent=3))
print(f"Fetched {len(results)} results")Use Cases
- rate limiting API calls
- parallel processing
- timeout handling
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Run Async Tasks Concurrently with gather
Execute multiple async operations concurrently using asyncio.gather.
Best for: Parallel API calls
#python#asyncio
pythonintermediate
asyncio.gather Concurrent Tasks
Run multiple async operations concurrently with asyncio.gather and proper error handling.
Best for: Parallel API calls
#asyncio#concurrency
pythonintermediate
Python Concurrent Futures for Parallel Work
Run tasks in parallel using ThreadPoolExecutor and ProcessPoolExecutor with error handling.
Best for: Parallel HTTP requests for web scraping
#python#concurrency
pythonintermediate
Asyncio Concurrent
Advanced Python pattern: asyncio-concurrent
Best for: advanced programming
#python#advanced