Retry Logic for Data Pipelines
Configurable retry decorator with exponential backoff and jitter for resilient data pipeline tasks.
import time
import random
import logging
from functools import wraps
from typing import TypeVar, Callable, Any
log = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
def retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential: bool = True,
retryable_exceptions: tuple = (Exception,),
) -> Callable[[F], F]:
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == max_attempts:
log.error(f"{func.__name__} failed after {max_attempts} attempts")
raise
delay = base_delay * (2 ** (attempt - 1)) if exponential else base_delay
delay = min(delay, max_delay)
jitter = random.uniform(0, delay * 0.1)
wait = delay + jitter
log.warning(
f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {wait:.1f}s"
)
time.sleep(wait)
raise last_exception # type: ignore
return wrapper # type: ignore
return decorator
@retry(max_attempts=3, base_delay=2.0, retryable_exceptions=(ConnectionError, TimeoutError))
def fetch_data(url: str) -> dict:
import urllib.request
with urllib.request.urlopen(url, timeout=30) as resp:
import json
return json.loads(resp.read())
@retry(max_attempts=5, base_delay=1.0)
def write_to_warehouse(records: list[dict]) -> int:
# Simulated write operation
return len(records)Use Cases
- Resilient API calls in data pipelines
- Retrying failed database writes with backoff
- Handling transient network failures in ETL
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Retry Decorator with Exponential Backoff
Generic retry decorator with configurable attempts, exponential backoff, and exception filtering.
Tenacity Retry with Backoff
Add robust retry logic with exponential backoff, jitter, and conditional retry using the tenacity library.
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Databricks Notebook Data Pipeline
Databricks notebook with Delta Lake reads, transformations, merge operations, and table optimization.