Files
OpenGait/scripts/systemd_run_opengait.py

195 lines
5.6 KiB
Python

from __future__ import annotations
import re
import subprocess
from collections.abc import Sequence
from pathlib import Path
import click
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_LOG_DIR = Path("/tmp")
def _sanitize_unit_name(raw: str) -> str:
sanitized = re.sub(r"[^A-Za-z0-9_.@-]+", "-", raw).strip("-")
if not sanitized:
raise click.ClickException("Unit name cannot be empty after sanitization.")
return sanitized
def _split_gpu_uuids(value: str) -> list[str]:
uuids = [part.strip() for part in value.split(",") if part.strip()]
if not uuids:
raise click.ClickException("At least one GPU UUID is required.")
return uuids
def _run_command(
args: Sequence[str],
*,
cwd: Path | None = None,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
return subprocess.run(
list(args),
cwd=str(cwd) if cwd is not None else None,
text=True,
capture_output=True,
check=check,
)
def _default_unit_name(cfgs: Path, phase: str) -> str:
stem = cfgs.stem
return _sanitize_unit_name(f"opengait-{stem}-{phase}")
@click.group()
def cli() -> None:
"""Launch and manage OpenGait runs under systemd user services."""
@cli.command("launch")
@click.option("--cfgs", type=click.Path(path_type=Path, exists=True, dir_okay=False), required=True)
@click.option("--phase", type=click.Choice(["train", "test"]), required=True)
@click.option(
"--gpu-uuids",
required=True,
help="Comma-separated GPU UUID list for CUDA_VISIBLE_DEVICES.",
)
@click.option("--unit", type=str, default=None, help="systemd unit name. Defaults to a name derived from cfgs + phase.")
@click.option(
"--log-path",
type=click.Path(path_type=Path, dir_okay=False),
default=None,
help="Optional file to append stdout/stderr to. Defaults to /tmp/<unit>.log",
)
@click.option(
"--workdir",
type=click.Path(path_type=Path, file_okay=False),
default=REPO_ROOT,
show_default=True,
)
@click.option("--description", type=str, default=None, help="Optional systemd unit description.")
@click.option("--dry-run", is_flag=True, help="Print the resolved systemd-run command without launching it.")
def launch(
cfgs: Path,
phase: str,
gpu_uuids: str,
unit: str | None,
log_path: Path | None,
workdir: Path,
description: str | None,
dry_run: bool,
) -> None:
"""Launch an OpenGait run via systemd-run --user using torchrun."""
resolved_cfgs = cfgs if cfgs.is_absolute() else (workdir / cfgs).resolve()
if not resolved_cfgs.exists():
raise click.ClickException(f"Config not found: {resolved_cfgs}")
unit_name = _sanitize_unit_name(unit) if unit is not None else _default_unit_name(resolved_cfgs, phase)
resolved_log_path = (log_path if log_path is not None else DEFAULT_LOG_DIR / f"{unit_name}.log").resolve()
resolved_log_path.parent.mkdir(parents=True, exist_ok=True)
gpu_uuid_list = _split_gpu_uuids(gpu_uuids)
nproc = len(gpu_uuid_list)
command = [
"systemd-run",
"--user",
"--unit",
unit_name,
"--collect",
"--same-dir",
"--property",
"KillMode=mixed",
"--property",
f"StandardOutput=append:{resolved_log_path}",
"--property",
f"StandardError=append:{resolved_log_path}",
"--setenv",
f"CUDA_VISIBLE_DEVICES={','.join(gpu_uuid_list)}",
]
if description:
command.extend(["--description", description])
command.extend(
[
"uv",
"run",
"python",
"-m",
"torch.distributed.run",
"--nproc_per_node",
str(nproc),
"opengait/main.py",
"--cfgs",
str(resolved_cfgs),
"--phase",
phase,
]
)
if dry_run:
click.echo(" ".join(command))
return
result = _run_command(command, cwd=workdir, check=False)
if result.returncode != 0:
raise click.ClickException(
f"systemd-run launch failed.\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}"
)
click.echo(f"unit={unit_name}")
click.echo(f"log={resolved_log_path}")
click.echo("journal: journalctl --user -u " + unit_name + " -f")
if result.stdout.strip():
click.echo(result.stdout.strip())
@cli.command("status")
@click.argument("unit")
def status(unit: str) -> None:
"""Show systemd user-unit status."""
result = _run_command(["systemctl", "--user", "status", unit], check=False)
click.echo(result.stdout, nl=False)
if result.stderr:
click.echo(result.stderr, err=True, nl=False)
if result.returncode != 0:
raise SystemExit(result.returncode)
@cli.command("logs")
@click.argument("unit")
@click.option("-n", "--lines", type=int, default=200, show_default=True)
def logs(unit: str, lines: int) -> None:
"""Show recent journal lines for a unit."""
result = _run_command(
["journalctl", "--user", "-u", unit, "-n", str(lines), "--no-pager"],
check=False,
)
click.echo(result.stdout, nl=False)
if result.stderr:
click.echo(result.stderr, err=True, nl=False)
if result.returncode != 0:
raise SystemExit(result.returncode)
@cli.command("stop")
@click.argument("unit")
def stop(unit: str) -> None:
"""Stop a systemd user unit."""
result = _run_command(["systemctl", "--user", "stop", unit], check=False)
if result.stdout:
click.echo(result.stdout, nl=False)
if result.stderr:
click.echo(result.stderr, err=True, nl=False)
if result.returncode != 0:
raise SystemExit(result.returncode)
if __name__ == "__main__":
cli()