Bulk Operations and Reporting
What You'll Learn
How to process thousands or millions of records efficiently, generate clear summary reports, and export results in useful formats.
Processing Records in Batches
For large datasets, process in chunks rather than loading everything:
import logging
from typing import Iterator
log = logging.getLogger(__name__)
def batch(items: list, size: int) -> Iterator[list]:
"""Yield items in fixed-size chunks."""
for i in range(0, len(items), size):
yield items[i:i + size]
def process_large_dataset(records: list[dict], batch_size: int = 100) -> dict:
total = len(records)
stats = {"processed": 0, "errors": 0}
for chunk_num, chunk in enumerate(batch(records, batch_size), 1):
log.info("Processing batch %d (%d/%d)",
chunk_num, min(chunk_num * batch_size, total), total)
for record in chunk:
try:
process_one(record)
stats["processed"] += 1
except Exception as e:
log.warning("Record failed id=%s: %s", record.get("id"), e)
stats["errors"] += 1
return stats
Streaming Large Files
Never load a 1GB CSV into memory. Stream it:
import csv
from pathlib import Path
def stream_csv(path: Path, processor):
"""Process a large CSV file row by row."""
stats = {"ok": 0, "error": 0}
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, 1):
try:
processor(row)
stats["ok"] += 1
except Exception as e:
log.warning("Row %d failed: %s — %s", i, dict(row), e)
stats["error"] += 1
if i % 10_000 == 0:
log.info("Progress: %d rows processed", i)
return stats
Concurrent Processing
For I/O-bound work (API calls, database queries), use threading to run multiple operations in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
log = logging.getLogger(__name__)
def process_with_threads(items: list[dict], max_workers: int = 10) -> dict:
stats = {"ok": 0, "error": 0}
errors = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_one, item): item
for item in items
}
for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
stats["ok"] += 1
except Exception as e:
log.warning("Failed item_id=%s: %s", item.get("id"), e)
errors.append({"item": item, "error": str(e)})
stats["error"] += 1
stats["error_details"] = errors
return stats
For CPU-bound work (number crunching), use ProcessPoolExecutor instead.
Collecting and Aggregating Results
from collections import defaultdict, Counter
from dataclasses import dataclass, field
@dataclass
class RunReport:
total: int = 0
processed: int = 0
skipped: int = 0
errors: int = 0
by_status: dict = field(default_factory=lambda: defaultdict(int))
error_messages: list = field(default_factory=list)
start_time: float = field(default_factory=lambda: __import__("time").monotonic())
def elapsed(self) -> float:
import time
return time.monotonic() - self.start_time
def rate(self) -> float:
elapsed = self.elapsed()
return self.processed / elapsed if elapsed > 0 else 0
def summary(self) -> str:
return (
f"total={self.total} "
f"processed={self.processed} "
f"skipped={self.skipped} "
f"errors={self.errors} "
f"rate={self.rate():.1f}/s "
f"elapsed={self.elapsed():.1f}s"
)
Writing Summary Reports
Text report to stdout
def print_report(report: RunReport) -> None:
width = 60
print("=" * width)
print(f"{'RUN REPORT':^{width}}")
print("=" * width)
print(f"{'Total records:':<25} {report.total:>10,}")
print(f"{'Processed:':<25} {report.processed:>10,}")
print(f"{'Skipped:':<25} {report.skipped:>10,}")
print(f"{'Errors:':<25} {report.errors:>10,}")
print(f"{'Success rate:':<25} {report.processed/max(report.total,1):>10.1%}")
print(f"{'Throughput:':<25} {report.rate():>9.1f}/s")
print(f"{'Elapsed:':<25} {report.elapsed():>9.1f}s")
print("-" * width)
if report.by_status:
print("By status:")
for status, count in sorted(report.by_status.items()):
print(f" {status:<23} {count:>10,}")
print("=" * width)
JSON report to file
import json
from datetime import datetime
from pathlib import Path
def save_json_report(report: RunReport, output_dir: Path) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = output_dir / f"report_{timestamp}.json"
data = {
"run_at": datetime.utcnow().isoformat() + "Z",
"total": report.total,
"processed": report.processed,
"skipped": report.skipped,
"errors": report.errors,
"elapsed_s": round(report.elapsed(), 2),
"rate_per_s": round(report.rate(), 1),
"by_status": dict(report.by_status),
"sample_errors": report.error_messages[:10], # first 10 errors
}
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
return path
CSV report of failed records
import csv
from pathlib import Path
def save_error_report(errors: list[dict], output_path: Path) -> int:
if not errors:
return 0
with open(output_path, "w", newline="", encoding="utf-8") as f:
fieldnames = list(errors[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(errors)
return len(errors)
Complete Bulk Processing Pattern
import sys
import logging
import time
from pathlib import Path
log = logging.getLogger(__name__)
def run(input_file: Path, output_dir: Path, dry_run: bool = False) -> int:
log.info("Starting bulk process input=%s dry_run=%s", input_file, dry_run)
start = time.monotonic()
report = RunReport()
with open(input_file, newline="", encoding="utf-8") as f:
import csv
records = list(csv.DictReader(f))
report.total = len(records)
errors = []
for record in records:
try:
if not dry_run:
result = process_one(record)
report.by_status[result["status"]] += 1
report.processed += 1
except Exception as e:
log.warning("Failed id=%s: %s", record.get("id"), e)
errors.append({**record, "error": str(e)})
report.errors += 1
if not dry_run and errors:
error_path = output_dir / "errors.csv"
save_error_report(errors, error_path)
log.warning("Error report: %s", error_path)
print_report(report)
log.info("Done: %s", report.summary())
return 1 if report.errors > 0 else 0
Quick Reference
# Batch iterator
def batch(items, size):
for i in range(0, len(items), size):
yield items[i:i + size]
# Stream CSV
with open(path, newline="") as f:
for row in csv.DictReader(f):
process(row)
# Threaded processing
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=10) as ex:
futures = {ex.submit(fn, item): item for item in items}
for future in as_completed(futures):
result = future.result()
# Progress
if i % 1000 == 0:
log.info("Progress: %d/%d (%.1f%%)", i, total, 100*i/total)
What's Next
→ Lesson 4: Idempotent Scripts (duplicate in module — see 09-04)