Skip to main content

Bulk Operations and Reporting

What You'll Learn

How to process thousands or millions of records efficiently, generate clear summary reports, and export results in useful formats.

Processing Records in Batches

For large datasets, process in chunks rather than loading everything:

import logging
from typing import Iterator

log = logging.getLogger(__name__)


def batch(items: list, size: int) -> Iterator[list]:
"""Yield items in fixed-size chunks."""
for i in range(0, len(items), size):
yield items[i:i + size]


def process_large_dataset(records: list[dict], batch_size: int = 100) -> dict:
total = len(records)
stats = {"processed": 0, "errors": 0}

for chunk_num, chunk in enumerate(batch(records, batch_size), 1):
log.info("Processing batch %d (%d/%d)",
chunk_num, min(chunk_num * batch_size, total), total)

for record in chunk:
try:
process_one(record)
stats["processed"] += 1
except Exception as e:
log.warning("Record failed id=%s: %s", record.get("id"), e)
stats["errors"] += 1

return stats

Streaming Large Files

Never load a 1GB CSV into memory. Stream it:

import csv
from pathlib import Path


def stream_csv(path: Path, processor):
"""Process a large CSV file row by row."""
stats = {"ok": 0, "error": 0}

with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, 1):
try:
processor(row)
stats["ok"] += 1
except Exception as e:
log.warning("Row %d failed: %s — %s", i, dict(row), e)
stats["error"] += 1

if i % 10_000 == 0:
log.info("Progress: %d rows processed", i)

return stats

Concurrent Processing

For I/O-bound work (API calls, database queries), use threading to run multiple operations in parallel:

from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

log = logging.getLogger(__name__)


def process_with_threads(items: list[dict], max_workers: int = 10) -> dict:
stats = {"ok": 0, "error": 0}
errors = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_one, item): item
for item in items
}

for future in as_completed(futures):
item = futures[future]
try:
result = future.result()
stats["ok"] += 1
except Exception as e:
log.warning("Failed item_id=%s: %s", item.get("id"), e)
errors.append({"item": item, "error": str(e)})
stats["error"] += 1

stats["error_details"] = errors
return stats

For CPU-bound work (number crunching), use ProcessPoolExecutor instead.

Collecting and Aggregating Results

from collections import defaultdict, Counter
from dataclasses import dataclass, field


@dataclass
class RunReport:
total: int = 0
processed: int = 0
skipped: int = 0
errors: int = 0
by_status: dict = field(default_factory=lambda: defaultdict(int))
error_messages: list = field(default_factory=list)
start_time: float = field(default_factory=lambda: __import__("time").monotonic())

def elapsed(self) -> float:
import time
return time.monotonic() - self.start_time

def rate(self) -> float:
elapsed = self.elapsed()
return self.processed / elapsed if elapsed > 0 else 0

def summary(self) -> str:
return (
f"total={self.total} "
f"processed={self.processed} "
f"skipped={self.skipped} "
f"errors={self.errors} "
f"rate={self.rate():.1f}/s "
f"elapsed={self.elapsed():.1f}s"
)

Writing Summary Reports

Text report to stdout

def print_report(report: RunReport) -> None:
width = 60
print("=" * width)
print(f"{'RUN REPORT':^{width}}")
print("=" * width)
print(f"{'Total records:':<25} {report.total:>10,}")
print(f"{'Processed:':<25} {report.processed:>10,}")
print(f"{'Skipped:':<25} {report.skipped:>10,}")
print(f"{'Errors:':<25} {report.errors:>10,}")
print(f"{'Success rate:':<25} {report.processed/max(report.total,1):>10.1%}")
print(f"{'Throughput:':<25} {report.rate():>9.1f}/s")
print(f"{'Elapsed:':<25} {report.elapsed():>9.1f}s")
print("-" * width)
if report.by_status:
print("By status:")
for status, count in sorted(report.by_status.items()):
print(f" {status:<23} {count:>10,}")
print("=" * width)

JSON report to file

import json
from datetime import datetime
from pathlib import Path


def save_json_report(report: RunReport, output_dir: Path) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = output_dir / f"report_{timestamp}.json"

data = {
"run_at": datetime.utcnow().isoformat() + "Z",
"total": report.total,
"processed": report.processed,
"skipped": report.skipped,
"errors": report.errors,
"elapsed_s": round(report.elapsed(), 2),
"rate_per_s": round(report.rate(), 1),
"by_status": dict(report.by_status),
"sample_errors": report.error_messages[:10], # first 10 errors
}

path.write_text(json.dumps(data, indent=2), encoding="utf-8")
return path

CSV report of failed records

import csv
from pathlib import Path


def save_error_report(errors: list[dict], output_path: Path) -> int:
if not errors:
return 0
with open(output_path, "w", newline="", encoding="utf-8") as f:
fieldnames = list(errors[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(errors)
return len(errors)

Complete Bulk Processing Pattern

import sys
import logging
import time
from pathlib import Path

log = logging.getLogger(__name__)


def run(input_file: Path, output_dir: Path, dry_run: bool = False) -> int:
log.info("Starting bulk process input=%s dry_run=%s", input_file, dry_run)
start = time.monotonic()
report = RunReport()

with open(input_file, newline="", encoding="utf-8") as f:
import csv
records = list(csv.DictReader(f))

report.total = len(records)
errors = []

for record in records:
try:
if not dry_run:
result = process_one(record)
report.by_status[result["status"]] += 1
report.processed += 1
except Exception as e:
log.warning("Failed id=%s: %s", record.get("id"), e)
errors.append({**record, "error": str(e)})
report.errors += 1

if not dry_run and errors:
error_path = output_dir / "errors.csv"
save_error_report(errors, error_path)
log.warning("Error report: %s", error_path)

print_report(report)
log.info("Done: %s", report.summary())
return 1 if report.errors > 0 else 0

Quick Reference

# Batch iterator
def batch(items, size):
for i in range(0, len(items), size):
yield items[i:i + size]

# Stream CSV
with open(path, newline="") as f:
for row in csv.DictReader(f):
process(row)

# Threaded processing
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=10) as ex:
futures = {ex.submit(fn, item): item for item in items}
for future in as_completed(futures):
result = future.result()

# Progress
if i % 1000 == 0:
log.info("Progress: %d/%d (%.1f%%)", i, total, 100*i/total)

What's Next

Lesson 4: Idempotent Scripts (duplicate in module — see 09-04)