Idempotent Scripts
What You'll Learn
What idempotency means for scripts, why it matters for automation, and practical patterns to make any script safe to run repeatedly.
What Is Idempotency?
An idempotent operation produces the same result whether executed once or many times:
Run 1: Creates file users.csv ✅ success
Run 2: File already exists → skips ✅ success (not an error)
Run 3: File already exists → skips ✅ success
A non-idempotent script:
Run 1: Inserts 100 records into DB ✅
Run 2: Inserts 100 more records ❌ now 200 duplicate records!
Run 3: Inserts 100 more records ❌ now 300!
Idempotent scripts are safe for:
- Cron jobs (run at midnight, re-run if failed)
- CI/CD pipelines (retry on transient failure)
- Provisioning (re-run to fix drift)
- Onboarding (re-run for new team members)
Pattern 1: Check Before Acting
from pathlib import Path
import logging
log = logging.getLogger(__name__)
def ensure_directory(path: Path) -> bool:
"""Create directory if it doesn't exist. Returns True if created."""
if path.exists():
log.debug("Directory already exists: %s", path)
return False
path.mkdir(parents=True)
log.info("Created directory: %s", path)
return True
def write_default_config(config_path: Path) -> bool:
"""Write default config only if it doesn't exist."""
if config_path.exists():
log.debug("Config already exists: %s", config_path)
return False
config_path.write_text('{"version": 1, "debug": false}\n', encoding="utf-8")
log.info("Created default config: %s", config_path)
return True
Pattern 2: UPSERT (Update or Insert)
For database operations, use UPSERT instead of INSERT:
import sqlite3
def upsert_user(db_path: str, user: dict) -> None:
"""Insert or update a user. Safe to call multiple times."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
INSERT INTO users (id, name, email)
VALUES (?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
email = excluded.email
""", (user["id"], user["name"], user["email"]))
def sync_users(db_path: str, users: list[dict]) -> dict:
"""Sync all users idempotently."""
stats = {"synced": 0}
for user in users:
upsert_user(db_path, user)
stats["synced"] += 1
return stats
Pattern 3: State Files (Job Checkpointing)
Track which items have been processed so you don't re-process them:
import json
from pathlib import Path
def load_processed(state_file: Path) -> set[str]:
"""Load set of already-processed IDs."""
if not state_file.exists():
return set()
return set(json.loads(state_file.read_text(encoding="utf-8")))
def save_processed(state_file: Path, processed: set[str]) -> None:
"""Save the set of processed IDs."""
state_file.write_text(json.dumps(sorted(processed)), encoding="utf-8")
def process_records(records: list[dict], state_file: Path) -> dict:
"""Process records, skipping already-processed ones."""
processed = load_processed(state_file)
stats = {"new": 0, "skipped": 0, "errors": 0}
for record in records:
record_id = str(record["id"])
if record_id in processed:
stats["skipped"] += 1
continue
try:
process_one(record)
processed.add(record_id)
save_processed(state_file, processed) # save after each success
stats["new"] += 1
except Exception as e:
log.error("Failed record_id=%s: %s", record_id, e)
stats["errors"] += 1
return stats
Pattern 4: --dry-run Mode
Show what would happen without making changes:
import argparse
import logging
from pathlib import Path
log = logging.getLogger(__name__)
def cleanup_old_files(directory: Path, days: int = 30, dry_run: bool = False) -> dict:
"""Delete files older than `days` days."""
import time
cutoff = time.time() - (days * 86400)
stats = {"deleted": 0, "skipped": 0}
for file in directory.rglob("*"):
if not file.is_file():
continue
if file.stat().st_mtime < cutoff:
if dry_run:
log.info("[DRY RUN] Would delete: %s", file)
else:
file.unlink()
log.info("Deleted: %s", file)
stats["deleted"] += 1
else:
stats["skipped"] += 1
return stats
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("directory", type=Path)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--dry-run", action="store_true",
help="Show what would be deleted without deleting")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
result = cleanup_old_files(args.directory, args.days, dry_run=args.dry_run)
log.info("Done: %s", result)
return 0
Pattern 5: Atomic File Writes
Prevent partial files if a script crashes mid-write:
from pathlib import Path
import json
def atomic_write_json(path: Path, data: dict) -> None:
"""Write JSON atomically — no partial files on crash."""
tmp = path.with_suffix(".tmp")
try:
tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp.rename(path) # atomic on Linux/macOS
except Exception:
tmp.unlink(missing_ok=True)
raise
Idempotency Checklist
☐ Check if the action is already done before doing it
☐ Use UPSERT instead of INSERT for database records
☐ Use --dry-run to preview changes
☐ Track progress in a state file for long jobs
☐ Use atomic writes for file output
☐ Make cleanup operations also idempotent
☐ Test by running the script twice and checking it's safe
Quick Reference
# Check before acting
if not path.exists():
path.mkdir(parents=True)
# Atomic write
tmp = path.with_suffix(".tmp")
tmp.write_text(content)
tmp.rename(path)
# State file checkpoint
processed = load_state(state_file)
if item_id not in processed:
do_work(item)
processed.add(item_id)
save_state(state_file, processed)
# Dry run pattern
if dry_run:
log.info("[DRY RUN] Would do: %s", action)
else:
do_action()
# UPSERT (SQLite)
INSERT INTO table (...) VALUES (...)
ON CONFLICT(id) DO UPDATE SET col = excluded.col