pythonintermediate

Retry Logic for Data Pipelines

Configurable retry decorator with exponential backoff and jitter for resilient data pipeline tasks.

python
import time
import random
import logging
from functools import wraps
from typing import TypeVar, Callable, Any

log = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])


def retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential: bool = True,
    retryable_exceptions: tuple = (Exception,),
) -> Callable[[F], F]:
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            last_exception = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt == max_attempts:
                        log.error(f"{func.__name__} failed after {max_attempts} attempts")
                        raise
                    delay = base_delay * (2 ** (attempt - 1)) if exponential else base_delay
                    delay = min(delay, max_delay)
                    jitter = random.uniform(0, delay * 0.1)
                    wait = delay + jitter
                    log.warning(
                        f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
                        f"Retrying in {wait:.1f}s"
                    )
                    time.sleep(wait)
            raise last_exception  # type: ignore
        return wrapper  # type: ignore
    return decorator


@retry(max_attempts=3, base_delay=2.0, retryable_exceptions=(ConnectionError, TimeoutError))
def fetch_data(url: str) -> dict:
    import urllib.request
    with urllib.request.urlopen(url, timeout=30) as resp:
        import json
        return json.loads(resp.read())


@retry(max_attempts=5, base_delay=1.0)
def write_to_warehouse(records: list[dict]) -> int:
    # Simulated write operation
    return len(records)

Use Cases

  • Resilient API calls in data pipelines
  • Retrying failed database writes with backoff
  • Handling transient network failures in ETL

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.