195 lines
5.6 KiB
Python
195 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import subprocess
|
|
from collections.abc import Sequence
|
|
from pathlib import Path
|
|
|
|
import click
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_LOG_DIR = Path("/tmp")
|
|
|
|
|
|
def _sanitize_unit_name(raw: str) -> str:
|
|
sanitized = re.sub(r"[^A-Za-z0-9_.@-]+", "-", raw).strip("-")
|
|
if not sanitized:
|
|
raise click.ClickException("Unit name cannot be empty after sanitization.")
|
|
return sanitized
|
|
|
|
|
|
def _split_gpu_uuids(value: str) -> list[str]:
|
|
uuids = [part.strip() for part in value.split(",") if part.strip()]
|
|
if not uuids:
|
|
raise click.ClickException("At least one GPU UUID is required.")
|
|
return uuids
|
|
|
|
|
|
def _run_command(
|
|
args: Sequence[str],
|
|
*,
|
|
cwd: Path | None = None,
|
|
check: bool = True,
|
|
) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
list(args),
|
|
cwd=str(cwd) if cwd is not None else None,
|
|
text=True,
|
|
capture_output=True,
|
|
check=check,
|
|
)
|
|
|
|
|
|
def _default_unit_name(cfgs: Path, phase: str) -> str:
|
|
stem = cfgs.stem
|
|
return _sanitize_unit_name(f"opengait-{stem}-{phase}")
|
|
|
|
|
|
@click.group()
|
|
def cli() -> None:
|
|
"""Launch and manage OpenGait runs under systemd user services."""
|
|
|
|
|
|
@cli.command("launch")
|
|
@click.option("--cfgs", type=click.Path(path_type=Path, exists=True, dir_okay=False), required=True)
|
|
@click.option("--phase", type=click.Choice(["train", "test"]), required=True)
|
|
@click.option(
|
|
"--gpu-uuids",
|
|
required=True,
|
|
help="Comma-separated GPU UUID list for CUDA_VISIBLE_DEVICES.",
|
|
)
|
|
@click.option("--unit", type=str, default=None, help="systemd unit name. Defaults to a name derived from cfgs + phase.")
|
|
@click.option(
|
|
"--log-path",
|
|
type=click.Path(path_type=Path, dir_okay=False),
|
|
default=None,
|
|
help="Optional file to append stdout/stderr to. Defaults to /tmp/<unit>.log",
|
|
)
|
|
@click.option(
|
|
"--workdir",
|
|
type=click.Path(path_type=Path, file_okay=False),
|
|
default=REPO_ROOT,
|
|
show_default=True,
|
|
)
|
|
@click.option("--description", type=str, default=None, help="Optional systemd unit description.")
|
|
@click.option("--dry-run", is_flag=True, help="Print the resolved systemd-run command without launching it.")
|
|
def launch(
|
|
cfgs: Path,
|
|
phase: str,
|
|
gpu_uuids: str,
|
|
unit: str | None,
|
|
log_path: Path | None,
|
|
workdir: Path,
|
|
description: str | None,
|
|
dry_run: bool,
|
|
) -> None:
|
|
"""Launch an OpenGait run via systemd-run --user using torchrun."""
|
|
resolved_cfgs = cfgs if cfgs.is_absolute() else (workdir / cfgs).resolve()
|
|
if not resolved_cfgs.exists():
|
|
raise click.ClickException(f"Config not found: {resolved_cfgs}")
|
|
|
|
unit_name = _sanitize_unit_name(unit) if unit is not None else _default_unit_name(resolved_cfgs, phase)
|
|
resolved_log_path = (log_path if log_path is not None else DEFAULT_LOG_DIR / f"{unit_name}.log").resolve()
|
|
resolved_log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
gpu_uuid_list = _split_gpu_uuids(gpu_uuids)
|
|
nproc = len(gpu_uuid_list)
|
|
|
|
command = [
|
|
"systemd-run",
|
|
"--user",
|
|
"--unit",
|
|
unit_name,
|
|
"--collect",
|
|
"--same-dir",
|
|
"--property",
|
|
"KillMode=mixed",
|
|
"--property",
|
|
f"StandardOutput=append:{resolved_log_path}",
|
|
"--property",
|
|
f"StandardError=append:{resolved_log_path}",
|
|
"--setenv",
|
|
f"CUDA_VISIBLE_DEVICES={','.join(gpu_uuid_list)}",
|
|
]
|
|
if description:
|
|
command.extend(["--description", description])
|
|
|
|
command.extend(
|
|
[
|
|
"uv",
|
|
"run",
|
|
"python",
|
|
"-m",
|
|
"torch.distributed.run",
|
|
"--nproc_per_node",
|
|
str(nproc),
|
|
"opengait/main.py",
|
|
"--cfgs",
|
|
str(resolved_cfgs),
|
|
"--phase",
|
|
phase,
|
|
]
|
|
)
|
|
|
|
if dry_run:
|
|
click.echo(" ".join(command))
|
|
return
|
|
|
|
result = _run_command(command, cwd=workdir, check=False)
|
|
if result.returncode != 0:
|
|
raise click.ClickException(
|
|
f"systemd-run launch failed.\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}"
|
|
)
|
|
|
|
click.echo(f"unit={unit_name}")
|
|
click.echo(f"log={resolved_log_path}")
|
|
click.echo("journal: journalctl --user -u " + unit_name + " -f")
|
|
if result.stdout.strip():
|
|
click.echo(result.stdout.strip())
|
|
|
|
|
|
@cli.command("status")
|
|
@click.argument("unit")
|
|
def status(unit: str) -> None:
|
|
"""Show systemd user-unit status."""
|
|
result = _run_command(["systemctl", "--user", "status", unit], check=False)
|
|
click.echo(result.stdout, nl=False)
|
|
if result.stderr:
|
|
click.echo(result.stderr, err=True, nl=False)
|
|
if result.returncode != 0:
|
|
raise SystemExit(result.returncode)
|
|
|
|
|
|
@cli.command("logs")
|
|
@click.argument("unit")
|
|
@click.option("-n", "--lines", type=int, default=200, show_default=True)
|
|
def logs(unit: str, lines: int) -> None:
|
|
"""Show recent journal lines for a unit."""
|
|
result = _run_command(
|
|
["journalctl", "--user", "-u", unit, "-n", str(lines), "--no-pager"],
|
|
check=False,
|
|
)
|
|
click.echo(result.stdout, nl=False)
|
|
if result.stderr:
|
|
click.echo(result.stderr, err=True, nl=False)
|
|
if result.returncode != 0:
|
|
raise SystemExit(result.returncode)
|
|
|
|
|
|
@cli.command("stop")
|
|
@click.argument("unit")
|
|
def stop(unit: str) -> None:
|
|
"""Stop a systemd user unit."""
|
|
result = _run_command(["systemctl", "--user", "stop", unit], check=False)
|
|
if result.stdout:
|
|
click.echo(result.stdout, nl=False)
|
|
if result.stderr:
|
|
click.echo(result.stderr, err=True, nl=False)
|
|
if result.returncode != 0:
|
|
raise SystemExit(result.returncode)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|