pythonintermediate

Python Concurrent Futures for Parallel Work

Run tasks in parallel using ThreadPoolExecutor and ProcessPoolExecutor with error handling.

python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import urllib.request


# Thread pool for I/O-bound work
def fetch_url(url: str) -> tuple[str, int]:
    with urllib.request.urlopen(url, timeout=10) as response:
        return url, response.status


urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/headers",
    "https://httpbin.org/user-agent",
]

with ThreadPoolExecutor(max_workers=4) as pool:
    futures = {pool.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result_url, status = future.result(timeout=15)
            print(f"{result_url}: {status}")
        except Exception as e:
            print(f"{url}: Error - {e}")


# Process pool for CPU-bound work
def heavy_compute(n: int) -> int:
    return sum(i * i for i in range(n))


with ProcessPoolExecutor() as pool:
    inputs = [10_000_000, 20_000_000, 30_000_000]
    results = list(pool.map(heavy_compute, inputs))
    print(f"Results: {results}")


# Batch processing with progress
def process_item(item: dict) -> dict:
    time.sleep(0.1)  # simulate work
    return {**item, "processed": True}


items = [{"id": i} for i in range(100)]
results = []

with ThreadPoolExecutor(max_workers=10) as pool:
    futures = [pool.submit(process_item, item) for item in items]
    for i, future in enumerate(as_completed(futures)):
        results.append(future.result())
        if (i + 1) % 25 == 0:
            print(f"Progress: {i + 1}/{len(items)}")

print(f"Processed {len(results)} items")

Use Cases

  • Parallel HTTP requests for web scraping
  • CPU-bound computation across multiple cores
  • Batch data processing with progress tracking

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.