Files
cvmmap-streamer/scripts/zed_batch_svo_grid_to_mp4.py
T

831 lines
27 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import csv
import json
import math
import os
import re
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import click
from tqdm import tqdm
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
SEGMENT_FILE_PATTERN = re.compile(r".*_zed([1-4])\.svo2?$", re.IGNORECASE)
EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4")
@dataclass(slots=True, frozen=True)
class BatchConfig:
zed_bin: Path | None
ffprobe_bin: Path | None
probe_existing: bool
cuda_visible_devices: str | None
overwrite: bool
fail_fast: bool
codec: str
encoder_device: str
preset: str
tune: str
quality: int
gop: int
b_frames: int
start_offset_seconds: float
duration_seconds: float | None
output_fps: float | None
tile_scale: float
@dataclass(slots=True, frozen=True)
class ConversionJob:
segment_dir: Path
output_path: Path
@dataclass(slots=True, frozen=True)
class JobResult:
status: str
segment_dir: Path
output_path: Path
command: tuple[str, ...]
return_code: int = 0
stdout: str = ""
stderr: str = ""
@dataclass(slots=True, frozen=True)
class SegmentScan:
segment_dir: Path
matched_files: int
is_valid: bool
reason: str | None = None
@dataclass(slots=True, frozen=True)
class SourceResolution:
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[SegmentScan, ...]
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
status: str
reason: str = ""
duration_seconds: float | None = None
def locate_binary(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / "zed_svo_grid_to_mp4",
REPO_ROOT / "build" / "zed_svo_grid_to_mp4",
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}")
def locate_ffprobe(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"ffprobe binary not found: {candidate}")
return candidate
resolved = shutil.which("ffprobe")
if resolved is None:
raise click.ClickException("could not find ffprobe on PATH")
return Path(resolved).resolve()
def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir():
return SegmentScan(
segment_dir=segment_dir,
matched_files=0,
is_valid=False,
reason=f"segment directory does not exist: {segment_dir}",
)
matched_by_camera: dict[str, list[Path]] = {camera: [] for camera in EXPECTED_CAMERAS}
for child in segment_dir.iterdir():
if not child.is_file():
continue
match = SEGMENT_FILE_PATTERN.fullmatch(child.name)
if match is None:
continue
matched_by_camera[f"zed{match.group(1)}"].append(child)
matched_files = sum(len(paths) for paths in matched_by_camera.values())
duplicate_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) > 1]
missing_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) == 0]
if duplicate_cameras:
duplicate_text = ", ".join(duplicate_cameras)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
is_valid=False,
reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}",
)
if missing_cameras:
missing_text = ", ".join(missing_cameras)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
is_valid=False,
reason=f"missing camera inputs under {segment_dir}: {missing_text}",
)
return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True)
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(f"no complete four-camera segments found under {root}")
return SourceResolution(
mode="discovery",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def resolve_sources(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
) -> SourceResolution:
source_count = sum(
(
1 if input_dir is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv"
)
if input_dir is not None:
return discover_segment_dirs(input_dir.expanduser().resolve(), recursive)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def output_path_for(segment_dir: Path) -> Path:
return segment_dir / f"{segment_dir.name}_grid.mp4"
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
if config.zed_bin is None:
raise RuntimeError("zed_svo_grid_to_mp4 binary is not configured")
command = [
str(config.zed_bin),
"--segment-dir",
str(job.segment_dir),
"--codec",
config.codec,
"--encoder-device",
config.encoder_device,
"--preset",
config.preset,
"--tune",
config.tune,
"--quality",
str(config.quality),
"--gop",
str(config.gop),
"--b-frames",
str(config.b_frames),
"--start-offset-seconds",
str(config.start_offset_seconds),
"--tile-scale",
str(config.tile_scale),
]
if config.duration_seconds is not None:
command.extend(["--duration-seconds", str(config.duration_seconds)])
if config.output_fps is not None:
command.extend(["--output-fps", str(config.output_fps)])
return command
def env_for_job(config: BatchConfig) -> dict[str, str]:
env = dict(os.environ)
if config.cuda_visible_devices is not None:
env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices
return env
def probe_output(output_path: Path, ffprobe_bin: Path | None) -> OutputProbeResult:
if not output_path.is_file():
return OutputProbeResult(output_path=output_path, status="missing")
if ffprobe_bin is None:
raise RuntimeError("ffprobe binary is not configured")
completed = subprocess.run(
[
str(ffprobe_bin),
"-v",
"error",
"-print_format",
"json",
"-show_entries",
"format=duration,size:stream=codec_type,codec_name,width,height,nb_frames",
str(output_path),
],
check=False,
capture_output=True,
text=True,
)
if completed.returncode != 0:
reason = completed.stderr.strip() or completed.stdout.strip() or "ffprobe failed"
return OutputProbeResult(output_path=output_path, status="invalid", reason=reason)
try:
payload = json.loads(completed.stdout)
except json.JSONDecodeError as error:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe returned invalid JSON: {error}",
)
streams = payload.get("streams", [])
has_video_stream = any(stream.get("codec_type") == "video" for stream in streams)
if not has_video_stream:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe found no video stream",
)
format_payload = payload.get("format", {})
duration_text = format_payload.get("duration")
if duration_text in (None, ""):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe did not report a duration",
)
try:
duration_seconds = float(duration_text)
except (TypeError, ValueError):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-numeric duration: {duration_text!r}",
)
if not math.isfinite(duration_seconds) or duration_seconds <= 0.0:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-positive duration: {duration_seconds}",
)
return OutputProbeResult(
output_path=output_path,
status="valid",
duration_seconds=duration_seconds,
)
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
env=env_for_job(config),
)
status = "converted" if completed.returncode == 0 else "failed"
return JobResult(
status=status,
segment_dir=job.segment_dir,
output_path=job.output_path,
command=tuple(command),
return_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def summarize_failures(results: list[JobResult]) -> None:
failed_results = [result for result in results if result.status == "failed"]
if not failed_results:
return
click.echo("\nFailed conversions:", err=True)
for result in failed_results:
click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True)
if result.stderr.strip():
click.echo(result.stderr.rstrip(), err=True)
elif result.stdout.strip():
click.echo(result.stdout.rstrip(), err=True)
def report_invalid_existing_outputs(
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]],
) -> None:
if not invalid_existing:
return
click.echo("\nInvalid existing outputs:", err=True)
for job, probe in invalid_existing:
click.echo(f"- {job.segment_dir}", err=True)
click.echo(f" output: {probe.output_path}", err=True)
reason_lines = probe.reason.splitlines() or [probe.reason]
click.echo(f" reason: {reason_lines[0]}", err=True)
for line in reason_lines[1:]:
click.echo(f" {line}", err=True)
def report_dry_run_plan(
pending_jobs: list[ConversionJob],
pending_reasons: dict[Path, str],
pending_details: dict[Path, str],
) -> None:
if not pending_jobs:
click.echo("dry-run: no conversions would be launched", err=True)
return
click.echo("\nDry-run plan:", err=True)
for job in pending_jobs:
reason = pending_reasons[job.segment_dir]
detail = pending_details.get(job.segment_dir)
line = f"- {job.segment_dir} [{reason}]"
if detail:
line = f"{line}: {detail.replace(chr(10), ' | ')}"
click.echo(line, err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = []
aborted_count = 0
if not jobs:
return results, aborted_count
future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {}
job_iter = iter(jobs)
stop_submitting = False
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor:
with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress:
def submit_next() -> bool:
if stop_submitting:
return False
try:
job = next(job_iter)
except StopIteration:
return False
future = executor.submit(run_conversion, job, config)
future_to_job[future] = job
return True
for _ in range(min(jobs_limit, len(jobs))):
submit_next()
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
job = future_to_job.pop(future)
result = future.result()
results.append(result)
progress.update(1)
if result.status == "failed":
tqdm.write(
f"failed: {job.segment_dir} (exit {result.return_code})",
file=sys.stderr,
)
if config.fail_fast:
stop_submitting = True
if not stop_submitting:
submit_next()
if stop_submitting:
remaining = sum(1 for _ in job_iter)
aborted_count = remaining
progress.total = progress.n + len(future_to_job)
progress.refresh()
return results, aborted_count
@click.command()
@click.argument(
"input_dir",
required=False,
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
)
@click.option(
"--segment-dir",
"segment_dirs",
multiple=True,
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.",
)
@click.option(
"--segments-csv",
type=click.Path(path_type=Path, dir_okay=False),
help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.",
)
@click.option(
"--csv-root",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.")
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_grid_to_mp4 binary.",
)
@click.option(
"--ffprobe-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to ffprobe. Required when probing existing outputs and ffprobe is not on PATH.",
)
@click.option(
"--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
)
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.")
@click.option(
"--probe-existing/--trust-existing",
default=False,
show_default=True,
help="Validate existing grid MP4 files with ffprobe before skipping them. Invalid outputs are treated as missing.",
)
@click.option(
"--report-existing",
is_flag=True,
help="Probe existing grid MP4 files with ffprobe, report invalid ones, and do not launch conversions.",
)
@click.option(
"--dry-run",
is_flag=True,
help="Show which segments would be converted after applying skip/probe logic, without launching conversions.",
)
@click.option(
"--fail-fast/--continue-on-error",
default=False,
show_default=True,
help="Stop submitting new work after the first failed conversion.",
)
@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True)
@click.option(
"--encoder-device",
type=click.Choice(("auto", "nvidia", "software")),
default="auto",
show_default=True,
)
@click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True)
@click.option(
"--tune",
type=click.Choice(("low-latency", "balanced")),
default="low-latency",
show_default=True,
)
@click.option(
"--quality",
type=click.IntRange(min=0, max=51),
default=23,
show_default=True,
help="Lower values mean higher quality.",
)
@click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True)
@click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True)
@click.option(
"--start-offset-seconds",
type=click.FloatRange(min=0.0),
default=0.0,
show_default=True,
help="Offset applied after the synced common start time.",
)
@click.option(
"--duration-seconds",
type=click.FloatRange(min=0.0, min_open=True),
default=None,
help="Limit export duration in seconds after sync.",
)
@click.option(
"--output-fps",
type=click.FloatRange(min=0.0, min_open=True),
default=None,
help="Composite output frame rate. Defaults to the grid tool's native behavior.",
)
@click.option(
"--tile-scale",
type=click.FloatRange(min=0.1, max=1.0),
default=0.5,
show_default=True,
help="Scale each tile relative to the source resolution.",
)
def main(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
jobs: int,
zed_bin: Path | None,
ffprobe_bin: Path | None,
cuda_visible_devices: str | None,
overwrite: bool,
probe_existing: bool,
report_existing: bool,
dry_run: bool,
fail_fast: bool,
codec: str,
encoder_device: str,
preset: str,
tune: str,
quality: int,
gop: int,
b_frames: int,
start_offset_seconds: float,
duration_seconds: float | None,
output_fps: float | None,
tile_scale: float,
) -> None:
"""Batch-convert synced four-camera ZED segments into grid MP4 files."""
if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None
binary_path = None if report_existing else locate_binary(zed_bin)
sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive)
config = BatchConfig(
zed_bin=binary_path,
ffprobe_bin=ffprobe_path,
probe_existing=probe_existing or report_existing,
cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite,
fail_fast=fail_fast,
codec=codec,
encoder_device=encoder_device,
preset=preset,
tune=tune,
quality=quality,
gop=gop,
b_frames=b_frames,
start_offset_seconds=start_offset_seconds,
duration_seconds=duration_seconds,
output_fps=output_fps,
tile_scale=tile_scale,
)
skipped_results: list[JobResult] = []
failed_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = []
pending_reasons: dict[Path, str] = {}
pending_details: dict[Path, str] = {}
valid_existing: list[OutputProbeResult] = []
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = []
missing_outputs: list[ConversionJob] = []
for segment_dir in sources.segment_dirs:
output_path = output_path_for(segment_dir)
job = ConversionJob(segment_dir=segment_dir, output_path=output_path)
command = tuple(command_for_job(job, config)) if config.zed_bin is not None else ()
scan = scan_segment_dir(segment_dir)
if not scan.is_valid:
failed_results.append(
JobResult(
status="failed",
segment_dir=segment_dir,
output_path=output_path,
command=command,
return_code=2,
stderr=scan.reason or "",
)
)
continue
if report_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
elif probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
else:
missing_outputs.append(job)
continue
if overwrite:
pending_jobs.append(job)
pending_reasons[segment_dir] = "overwrite"
continue
if config.probe_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
if probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
pending_jobs.append(job)
pending_reasons[segment_dir] = "invalid-existing-output"
pending_details[segment_dir] = probe_result.reason
continue
missing_outputs.append(job)
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
continue
if output_path.exists():
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
if report_existing:
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} "
f"invalid={len(invalid_existing)} missing={len(missing_outputs)} "
f"invalid-segments={len(failed_results)}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
report_invalid_existing_outputs(invalid_existing)
summarize_failures(failed_results)
if failed_results or invalid_existing:
raise SystemExit(1)
return
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} "
f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} "
f"dry_run={'yes' if dry_run else 'no'}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
if config.probe_existing:
click.echo(
(
f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} "
f"missing={len(missing_outputs)}"
),
err=True,
)
if dry_run:
report_dry_run_plan(pending_jobs, pending_reasons, pending_details)
summarize_failures(failed_results)
if failed_results:
raise SystemExit(1)
return
results = list(skipped_results)
results.extend(failed_results)
conversion_results, aborted_count = run_batch(pending_jobs, config, jobs)
results.extend(conversion_results)
converted_count = sum(1 for result in results if result.status == "converted")
skipped_count = sum(1 for result in results if result.status == "skipped")
failed_count = sum(1 for result in results if result.status == "failed")
click.echo(
(
f"summary: matched={len(sources.segment_dirs)} converted={converted_count} "
f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}"
),
err=True,
)
summarize_failures(results)
if failed_count > 0 or aborted_count > 0:
raise SystemExit(1)
if __name__ == "__main__":
main()