Retries, Timeouts, and Cleanup
What You'll Learn
How to handle transient failures with retries, set timeouts on network calls, clean up resources reliably, and shut down gracefully.
Why Programs Need Retries
Real-world operations fail intermittently:
- Network blips
- API rate limits (429 Too Many Requests)
- Temporary database unavailability
- Throttled cloud services
A single failure shouldn't crash the whole job. Retry the operation with a delay.
Simple Retry Pattern
import time
import logging
log = logging.getLogger(__name__)
def retry(fn, max_attempts: int = 3, delay: float = 1.0):
"""Call fn() up to max_attempts times, waiting delay seconds between tries."""
last_error = None
for attempt in range(1, max_attempts + 1):
try:
return fn()
except Exception as e:
last_error = e
if attempt < max_attempts:
log.warning("Attempt %d/%d failed: %s — retrying in %.1fs",
attempt, max_attempts, e, delay)
time.sleep(delay)
else:
log.error("All %d attempts failed: %s", max_attempts, e)
raise last_error
Usage:
result = retry(lambda: fetch_data_from_api(), max_attempts=3, delay=2.0)
Exponential Backoff
Instead of a fixed delay, wait longer after each failure:
import time
import random
import logging
log = logging.getLogger(__name__)
def retry_with_backoff(
fn,
max_attempts: int = 4,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
):
"""Retry with exponential backoff and optional jitter."""
for attempt in range(1, max_attempts + 1):
try:
return fn()
except Exception as e:
if attempt == max_attempts:
raise
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if jitter:
delay *= (0.5 + random.random() * 0.5) # add randomness
log.warning("Attempt %d/%d failed (%s) — retrying in %.1fs",
attempt, max_attempts, e, delay)
time.sleep(delay)
Delay schedule (base=1s):
- Attempt 1 fails → wait 1s
- Attempt 2 fails → wait 2s
- Attempt 3 fails → wait 4s
- Attempt 4 fails → raise
Using tenacity (Third-Party)
For production code, use the tenacity library — battle-tested and flexible:
pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(requests.exceptions.ConnectionError),
)
def fetch_data(url: str) -> dict:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
Timeouts on Network Calls
Always set timeouts. Without them, your program can hang forever:
import requests
# ❌ No timeout — can hang for minutes
response = requests.get("https://api.example.com/data")
# ✅ With timeout
response = requests.get(
"https://api.example.com/data",
timeout=(5, 30) # (connect_timeout, read_timeout) in seconds
)
With the standard library:
import urllib.request
with urllib.request.urlopen(url, timeout=10) as response:
data = response.read()
Catch timeouts:
from requests.exceptions import Timeout, ConnectionError
try:
response = requests.get(url, timeout=10)
except Timeout:
log.error("Request timed out after 10s: %s", url)
except ConnectionError:
log.error("Connection failed: %s", url)
Resource Cleanup with Context Managers
Use with statements to guarantee cleanup even when exceptions occur:
# Files
with open("data.txt", encoding="utf-8") as f:
data = f.read()
# f is always closed here, even if an exception occurred
# Multiple resources
with open("input.txt") as fin, open("output.txt", "w") as fout:
fout.write(fin.read())
Writing Your Own Context Manager
import contextlib
import logging
log = logging.getLogger(__name__)
@contextlib.contextmanager
def timer(label: str):
"""Context manager that logs how long a block took."""
import time
start = time.monotonic()
try:
yield
finally:
elapsed = time.monotonic() - start
log.info("%s took %.3fs", label, elapsed)
with timer("data processing"):
process_large_dataset()
@contextlib.contextmanager
def temporary_file(suffix: str = ".tmp"):
"""Create a temp file, yield it, delete on exit."""
from pathlib import Path
import tempfile
path = Path(tempfile.mktemp(suffix=suffix))
try:
yield path
finally:
path.unlink(missing_ok=True)
with temporary_file(".json") as tmp:
tmp.write_text('{"ok": true}')
process(tmp)
# File deleted here
Graceful Shutdown with signal Handlers
Handle Ctrl+C and SIGTERM cleanly:
import signal
import sys
import logging
log = logging.getLogger(__name__)
shutdown_requested = False
def handle_shutdown(signum, frame):
global shutdown_requested
log.info("Shutdown signal received (%d) — finishing current item", signum)
shutdown_requested = True
signal.signal(signal.SIGINT, handle_shutdown) # Ctrl+C
signal.signal(signal.SIGTERM, handle_shutdown) # kill / docker stop
def process_queue(items):
for item in items:
if shutdown_requested:
log.info("Shutdown requested — stopping after %d items", processed)
break
process(item)
Common Mistakes
| Mistake | Fix |
|---|---|
| No timeout on network calls | Always pass timeout= |
| Retrying non-transient errors | Only retry errors that may recover (network, rate limits) |
| Not sleeping between retries | Give the service time to recover |
finally: doing risky work | Keep finally simple — just cleanup |
| Not closing files/connections | Use with statements |
Quick Reference
# Simple retry
for attempt in range(1, 4):
try:
result = risky()
break
except Exception as e:
if attempt == 3: raise
time.sleep(2 ** attempt)
# tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def fn(): ...
# Timeout
requests.get(url, timeout=(5, 30))
# Context manager
@contextlib.contextmanager
def managed():
setup()
try:
yield resource
finally:
cleanup()
# Signal handler
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)