146 lines
4.5 KiB
Python
146 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Final
|
|
|
|
|
|
ERROR_PATTERNS: Final[tuple[str, ...]] = (
|
|
"traceback",
|
|
"runtimeerror",
|
|
"error:",
|
|
"exception",
|
|
"failed",
|
|
"segmentation fault",
|
|
"killed",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JobSpec:
|
|
name: str
|
|
pid: int
|
|
log_path: Path
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Monitor long-running DRF preprocess/train jobs.")
|
|
parser.add_argument("--preprocess-pid", type=int, required=True)
|
|
parser.add_argument("--preprocess-log", type=Path, required=True)
|
|
parser.add_argument("--launcher-pid", type=int, required=True)
|
|
parser.add_argument("--launcher-log", type=Path, required=True)
|
|
parser.add_argument("--sentinel-path", type=Path, required=True)
|
|
parser.add_argument("--status-log", type=Path, required=True)
|
|
parser.add_argument("--poll-seconds", type=float, default=30.0)
|
|
return parser.parse_args()
|
|
|
|
|
|
def pid_alive(pid: int) -> bool:
|
|
return Path(f"/proc/{pid}").exists()
|
|
|
|
|
|
def read_tail(path: Path, limit: int = 8192) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
with path.open("rb") as handle:
|
|
handle.seek(0, os.SEEK_END)
|
|
size = handle.tell()
|
|
handle.seek(max(size - limit, 0), os.SEEK_SET)
|
|
data = handle.read()
|
|
return data.decode("utf-8", errors="replace")
|
|
|
|
|
|
def detect_error(log_text: str) -> str | None:
|
|
lowered = log_text.lower()
|
|
for pattern in ERROR_PATTERNS:
|
|
if pattern in lowered:
|
|
return pattern
|
|
return None
|
|
|
|
|
|
def append_status(path: Path, line: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("a", encoding="utf-8") as handle:
|
|
handle.write(f"{datetime.now().isoformat(timespec='seconds')} {line}\n")
|
|
|
|
|
|
def monitor_job(job: JobSpec, status_log: Path) -> str | None:
|
|
tail = read_tail(job.log_path)
|
|
error = detect_error(tail)
|
|
if error is not None:
|
|
append_status(status_log, f"[alert] {job.name}: detected `{error}` in {job.log_path}")
|
|
return f"{job.name} log shows `{error}`"
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
preprocess = JobSpec("preprocess", args.preprocess_pid, args.preprocess_log)
|
|
launcher = JobSpec("launcher", args.launcher_pid, args.launcher_log)
|
|
|
|
append_status(
|
|
args.status_log,
|
|
(
|
|
"[start] monitoring "
|
|
f"preprocess_pid={preprocess.pid} launcher_pid={launcher.pid} "
|
|
f"sentinel={args.sentinel_path}"
|
|
),
|
|
)
|
|
|
|
preprocess_seen_alive = False
|
|
launcher_seen_alive = False
|
|
|
|
while True:
|
|
preprocess_alive = pid_alive(preprocess.pid)
|
|
launcher_alive = pid_alive(launcher.pid)
|
|
preprocess_seen_alive = preprocess_seen_alive or preprocess_alive
|
|
launcher_seen_alive = launcher_seen_alive or launcher_alive
|
|
|
|
preprocess_error = monitor_job(preprocess, args.status_log)
|
|
if preprocess_error is not None:
|
|
print(preprocess_error, file=sys.stderr)
|
|
return 1
|
|
|
|
launcher_error = monitor_job(launcher, args.status_log)
|
|
if launcher_error is not None:
|
|
print(launcher_error, file=sys.stderr)
|
|
return 1
|
|
|
|
sentinel_ready = args.sentinel_path.exists()
|
|
append_status(
|
|
args.status_log,
|
|
(
|
|
"[ok] "
|
|
f"preprocess_alive={preprocess_alive} "
|
|
f"launcher_alive={launcher_alive} "
|
|
f"sentinel_ready={sentinel_ready}"
|
|
),
|
|
)
|
|
|
|
if preprocess_seen_alive and not preprocess_alive and not sentinel_ready:
|
|
append_status(args.status_log, "[alert] preprocess exited before sentinel was written")
|
|
print("preprocess exited before sentinel was written", file=sys.stderr)
|
|
return 1
|
|
|
|
launcher_tail = read_tail(launcher.log_path)
|
|
train_started = "[start]" in launcher_tail
|
|
|
|
if launcher_seen_alive and not launcher_alive:
|
|
if not train_started and not sentinel_ready:
|
|
append_status(args.status_log, "[alert] launcher exited before training started")
|
|
print("launcher exited before training started", file=sys.stderr)
|
|
return 1
|
|
append_status(args.status_log, "[done] launcher exited; monitoring complete")
|
|
return 0
|
|
|
|
time.sleep(args.poll_seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|