Files
OpenGait/scripts/monitor_drf_jobs.py

146 lines
4.5 KiB
Python

from __future__ import annotations
import argparse
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Final
ERROR_PATTERNS: Final[tuple[str, ...]] = (
"traceback",
"runtimeerror",
"error:",
"exception",
"failed",
"segmentation fault",
"killed",
)
@dataclass(frozen=True)
class JobSpec:
name: str
pid: int
log_path: Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Monitor long-running DRF preprocess/train jobs.")
parser.add_argument("--preprocess-pid", type=int, required=True)
parser.add_argument("--preprocess-log", type=Path, required=True)
parser.add_argument("--launcher-pid", type=int, required=True)
parser.add_argument("--launcher-log", type=Path, required=True)
parser.add_argument("--sentinel-path", type=Path, required=True)
parser.add_argument("--status-log", type=Path, required=True)
parser.add_argument("--poll-seconds", type=float, default=30.0)
return parser.parse_args()
def pid_alive(pid: int) -> bool:
return Path(f"/proc/{pid}").exists()
def read_tail(path: Path, limit: int = 8192) -> str:
if not path.exists():
return ""
with path.open("rb") as handle:
handle.seek(0, os.SEEK_END)
size = handle.tell()
handle.seek(max(size - limit, 0), os.SEEK_SET)
data = handle.read()
return data.decode("utf-8", errors="replace")
def detect_error(log_text: str) -> str | None:
lowered = log_text.lower()
for pattern in ERROR_PATTERNS:
if pattern in lowered:
return pattern
return None
def append_status(path: Path, line: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(f"{datetime.now().isoformat(timespec='seconds')} {line}\n")
def monitor_job(job: JobSpec, status_log: Path) -> str | None:
tail = read_tail(job.log_path)
error = detect_error(tail)
if error is not None:
append_status(status_log, f"[alert] {job.name}: detected `{error}` in {job.log_path}")
return f"{job.name} log shows `{error}`"
return None
def main() -> int:
args = parse_args()
preprocess = JobSpec("preprocess", args.preprocess_pid, args.preprocess_log)
launcher = JobSpec("launcher", args.launcher_pid, args.launcher_log)
append_status(
args.status_log,
(
"[start] monitoring "
f"preprocess_pid={preprocess.pid} launcher_pid={launcher.pid} "
f"sentinel={args.sentinel_path}"
),
)
preprocess_seen_alive = False
launcher_seen_alive = False
while True:
preprocess_alive = pid_alive(preprocess.pid)
launcher_alive = pid_alive(launcher.pid)
preprocess_seen_alive = preprocess_seen_alive or preprocess_alive
launcher_seen_alive = launcher_seen_alive or launcher_alive
preprocess_error = monitor_job(preprocess, args.status_log)
if preprocess_error is not None:
print(preprocess_error, file=sys.stderr)
return 1
launcher_error = monitor_job(launcher, args.status_log)
if launcher_error is not None:
print(launcher_error, file=sys.stderr)
return 1
sentinel_ready = args.sentinel_path.exists()
append_status(
args.status_log,
(
"[ok] "
f"preprocess_alive={preprocess_alive} "
f"launcher_alive={launcher_alive} "
f"sentinel_ready={sentinel_ready}"
),
)
if preprocess_seen_alive and not preprocess_alive and not sentinel_ready:
append_status(args.status_log, "[alert] preprocess exited before sentinel was written")
print("preprocess exited before sentinel was written", file=sys.stderr)
return 1
launcher_tail = read_tail(launcher.log_path)
train_started = "[start]" in launcher_tail
if launcher_seen_alive and not launcher_alive:
if not train_started and not sentinel_ready:
append_status(args.status_log, "[alert] launcher exited before training started")
print("launcher exited before training started", file=sys.stderr)
return 1
append_status(args.status_log, "[done] launcher exited; monitoring complete")
return 0
time.sleep(args.poll_seconds)
if __name__ == "__main__":
raise SystemExit(main())