Scheduling and Exit Codes
What You'll Learn
How to make Python scripts first-class citizens in automated environments — returning correct exit codes, writing machine-readable status, and integrating with scheduling systems.
Exit Codes — The Language of Automation
When a scheduler, CI/CD system, or another script runs your program, it only sees the exit code. Get this right:
| Code | Meaning | When to Use |
|---|---|---|
0 | Success | Everything worked |
1 | General failure | Something went wrong |
2 | Usage error | Wrong arguments, bad config |
130 | Interrupted | Ctrl+C / SIGINT |
124 | Timeout | Process took too long |
import sys
import logging
log = logging.getLogger(__name__)
def main() -> int:
"""Returns 0 on success, 1 on failure, 2 on usage error."""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
args = parser.parse_args()
from pathlib import Path
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: file not found: {input_path}", file=sys.stderr)
return 2 # usage error
try:
result = process(input_path)
log.info("Done: %s", result)
return 0 # success
except Exception as e:
log.error("Processing failed: %s", e)
return 1 # general failure
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
sys.exit(main())
Checking exit codes in shell:
python3 script.py --input data.csv
EXIT=$?
if [ $EXIT -eq 0 ]; then
echo "Success"
elif [ $EXIT -eq 2 ]; then
echo "Bad arguments"
else
echo "Failed with exit $EXIT"
fi
In Make/CI:
run:
python3 script.py --input data.csv || (echo "Script failed"; exit 1)
Status Files — Persistent Run State
Write a status file after each run so other systems can check on you:
import json
from datetime import datetime
from pathlib import Path
def write_status(
status_file: Path,
status: str, # "ok" | "error" | "running"
details: dict
) -> None:
"""Write a JSON status file for monitoring systems."""
status_file.parent.mkdir(parents=True, exist_ok=True)
status_file.write_text(
json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"status": status,
**details,
}, indent=2),
encoding="utf-8"
)
# Usage in a job
STATUS_FILE = Path("/var/run/myapp/status.json")
def run_job() -> int:
write_status(STATUS_FILE, "running", {"started_at": datetime.utcnow().isoformat()})
try:
result = do_work()
write_status(STATUS_FILE, "ok", {
"processed": result["count"],
"elapsed_s": result["elapsed"],
})
return 0
except Exception as e:
write_status(STATUS_FILE, "error", {"error": str(e)})
return 1
Notifications on Failure
For critical jobs, send an alert when something fails:
import os
import sys
import requests
def notify_on_failure(job_name: str, error: str) -> None:
"""Send Slack notification on job failure."""
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook_url:
return
try:
requests.post(webhook_url, json={
"text": f":red_circle: *{job_name}* failed\n```{error}```"
}, timeout=5)
except Exception:
pass # Don't let notification failure mask the real error
def run_with_notification(job_name: str, fn) -> int:
try:
return fn()
except Exception as e:
notify_on_failure(job_name, str(e))
log.error("Job %s failed: %s", job_name, e)
return 1
Integrating with systemd
Make your script behave well under systemd:
import sys
import signal
import logging
log = logging.getLogger(__name__)
_shutdown = False
def handle_signal(sig, frame):
global _shutdown
log.info("Received signal %d — shutting down gracefully", sig)
_shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def main() -> int:
log.info("Service started")
while not _shutdown:
try:
process_next_item()
except Exception as e:
log.error("Item failed: %s", e)
log.info("Service stopped gracefully")
return 0
systemd detects the service stopped cleanly (exit 0) vs crashed (non-zero).
Making Scripts CI/CD Friendly
import sys
import os
import logging
def setup_for_ci() -> None:
"""Configure logging for CI/CD environments."""
# CI usually captures stdout/stderr separately
logging.basicConfig(
level=logging.DEBUG if os.environ.get("CI") else logging.INFO,
format="%(levelname)s: %(message)s", # simpler format for CI logs
stream=sys.stderr,
)
def announce_step(msg: str) -> None:
"""Print a visible step announcement (useful in CI logs)."""
print(f"\n{'='*60}\n{msg}\n{'='*60}", flush=True)
In GitHub Actions:
- name: Run data sync
run: python3 scripts/sync.py --input data.csv
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
LOG_LEVEL: INFO
If sync.py exits 0, the step passes. If it exits non-zero, the workflow fails.
Lockfiles to Prevent Overlapping Runs
import fcntl
import sys
from pathlib import Path
def with_lock(lock_path: Path, fn) -> int:
"""Run fn() with an exclusive lock. Exit 0 if already running."""
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print(f"Another instance is running ({lock_path})", file=sys.stderr)
return 0 # exit 0 — not a failure, just skip this run
try:
return fn()
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
lock_path.unlink(missing_ok=True)
Complete Production Script Template
#!/usr/bin/env python3
"""
Long-running batch job with full operational support.
"""
import argparse
import logging
import os
import sys
from pathlib import Path
log = logging.getLogger(__name__)
def setup_logging() -> None:
logging.basicConfig(
level=getattr(logging, os.environ.get("LOG_LEVEL", "INFO")),
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
stream=sys.stderr,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", type=Path, required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verbose", "-v", action="store_true")
return parser.parse_args()
def main() -> int:
setup_logging()
args = parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if not args.input.exists():
log.error("Input not found: %s", args.input)
return 2
log.info("Starting job input=%s dry_run=%s", args.input, args.dry_run)
try:
result = process(args.input, dry_run=args.dry_run)
log.info("Job complete: %s", result)
return 0
except KeyboardInterrupt:
log.info("Interrupted by user")
return 130
except Exception as e:
log.exception("Job failed: %s", e)
return 1
if __name__ == "__main__":
sys.exit(main())
Quick Reference
# Exit codes
sys.exit(0) # success
sys.exit(1) # failure
sys.exit(2) # usage error
sys.exit(130) # interrupted
# Always via main()
def main() -> int:
...
return 0
if __name__ == "__main__":
sys.exit(main())
# Status file
json.dumps({"status": "ok", "timestamp": datetime.utcnow().isoformat()})
# Lock to prevent overlapping runs
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Graceful shutdown
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)