Skip to main content

Scheduling and Exit Codes

What You'll Learn

How to make Python scripts first-class citizens in automated environments — returning correct exit codes, writing machine-readable status, and integrating with scheduling systems.

Exit Codes — The Language of Automation

When a scheduler, CI/CD system, or another script runs your program, it only sees the exit code. Get this right:

CodeMeaningWhen to Use
0SuccessEverything worked
1General failureSomething went wrong
2Usage errorWrong arguments, bad config
130InterruptedCtrl+C / SIGINT
124TimeoutProcess took too long
import sys
import logging

log = logging.getLogger(__name__)


def main() -> int:
"""Returns 0 on success, 1 on failure, 2 on usage error."""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
args = parser.parse_args()

from pathlib import Path
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: file not found: {input_path}", file=sys.stderr)
return 2 # usage error

try:
result = process(input_path)
log.info("Done: %s", result)
return 0 # success

except Exception as e:
log.error("Processing failed: %s", e)
return 1 # general failure


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
sys.exit(main())

Checking exit codes in shell:

python3 script.py --input data.csv
EXIT=$?

if [ $EXIT -eq 0 ]; then
echo "Success"
elif [ $EXIT -eq 2 ]; then
echo "Bad arguments"
else
echo "Failed with exit $EXIT"
fi

In Make/CI:

run:
python3 script.py --input data.csv || (echo "Script failed"; exit 1)

Status Files — Persistent Run State

Write a status file after each run so other systems can check on you:

import json
from datetime import datetime
from pathlib import Path


def write_status(
status_file: Path,
status: str, # "ok" | "error" | "running"
details: dict
) -> None:
"""Write a JSON status file for monitoring systems."""
status_file.parent.mkdir(parents=True, exist_ok=True)
status_file.write_text(
json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"status": status,
**details,
}, indent=2),
encoding="utf-8"
)


# Usage in a job
STATUS_FILE = Path("/var/run/myapp/status.json")

def run_job() -> int:
write_status(STATUS_FILE, "running", {"started_at": datetime.utcnow().isoformat()})
try:
result = do_work()
write_status(STATUS_FILE, "ok", {
"processed": result["count"],
"elapsed_s": result["elapsed"],
})
return 0
except Exception as e:
write_status(STATUS_FILE, "error", {"error": str(e)})
return 1

Notifications on Failure

For critical jobs, send an alert when something fails:

import os
import sys
import requests


def notify_on_failure(job_name: str, error: str) -> None:
"""Send Slack notification on job failure."""
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook_url:
return

try:
requests.post(webhook_url, json={
"text": f":red_circle: *{job_name}* failed\n```{error}```"
}, timeout=5)
except Exception:
pass # Don't let notification failure mask the real error


def run_with_notification(job_name: str, fn) -> int:
try:
return fn()
except Exception as e:
notify_on_failure(job_name, str(e))
log.error("Job %s failed: %s", job_name, e)
return 1

Integrating with systemd

Make your script behave well under systemd:

import sys
import signal
import logging

log = logging.getLogger(__name__)
_shutdown = False


def handle_signal(sig, frame):
global _shutdown
log.info("Received signal %d — shutting down gracefully", sig)
_shutdown = True


signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)


def main() -> int:
log.info("Service started")

while not _shutdown:
try:
process_next_item()
except Exception as e:
log.error("Item failed: %s", e)

log.info("Service stopped gracefully")
return 0

systemd detects the service stopped cleanly (exit 0) vs crashed (non-zero).

Making Scripts CI/CD Friendly

import sys
import os
import logging


def setup_for_ci() -> None:
"""Configure logging for CI/CD environments."""
# CI usually captures stdout/stderr separately
logging.basicConfig(
level=logging.DEBUG if os.environ.get("CI") else logging.INFO,
format="%(levelname)s: %(message)s", # simpler format for CI logs
stream=sys.stderr,
)


def announce_step(msg: str) -> None:
"""Print a visible step announcement (useful in CI logs)."""
print(f"\n{'='*60}\n{msg}\n{'='*60}", flush=True)

In GitHub Actions:

- name: Run data sync
run: python3 scripts/sync.py --input data.csv
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
LOG_LEVEL: INFO

If sync.py exits 0, the step passes. If it exits non-zero, the workflow fails.

Lockfiles to Prevent Overlapping Runs

import fcntl
import sys
from pathlib import Path


def with_lock(lock_path: Path, fn) -> int:
"""Run fn() with an exclusive lock. Exit 0 if already running."""
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print(f"Another instance is running ({lock_path})", file=sys.stderr)
return 0 # exit 0 — not a failure, just skip this run

try:
return fn()
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
lock_path.unlink(missing_ok=True)

Complete Production Script Template

#!/usr/bin/env python3
"""
Long-running batch job with full operational support.
"""
import argparse
import logging
import os
import sys
from pathlib import Path


log = logging.getLogger(__name__)


def setup_logging() -> None:
logging.basicConfig(
level=getattr(logging, os.environ.get("LOG_LEVEL", "INFO")),
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
stream=sys.stderr,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", type=Path, required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verbose", "-v", action="store_true")
return parser.parse_args()


def main() -> int:
setup_logging()
args = parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

if not args.input.exists():
log.error("Input not found: %s", args.input)
return 2

log.info("Starting job input=%s dry_run=%s", args.input, args.dry_run)

try:
result = process(args.input, dry_run=args.dry_run)
log.info("Job complete: %s", result)
return 0
except KeyboardInterrupt:
log.info("Interrupted by user")
return 130
except Exception as e:
log.exception("Job failed: %s", e)
return 1


if __name__ == "__main__":
sys.exit(main())

Quick Reference

# Exit codes
sys.exit(0) # success
sys.exit(1) # failure
sys.exit(2) # usage error
sys.exit(130) # interrupted

# Always via main()
def main() -> int:
...
return 0

if __name__ == "__main__":
sys.exit(main())

# Status file
json.dumps({"status": "ok", "timestamp": datetime.utcnow().isoformat()})

# Lock to prevent overlapping runs
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)

# Graceful shutdown
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)

What's Next

Module 10: APIs and Web Integration