From c2e5cc561f3f7d1d655186fd0d796c77d47f6f2b Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 19 Mar 2026 10:41:45 +0000 Subject: [PATCH] Add batch wrapper for ZED grid MP4 conversion --- README.md | 43 ++ scripts/zed_batch_svo_grid_to_mp4.py | 582 +++++++++++++++++++++++++++ 2 files changed, 625 insertions(+) create mode 100644 scripts/zed_batch_svo_grid_to_mp4.py diff --git a/README.md b/README.md index e6d10b3..b2fdb79 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,49 @@ Use the grid converter to merge four synced ZED recordings into a 2x2 CCTV-style The tool syncs the four inputs using the same common-start timestamp rule as the ZED multi-camera playback sample, defaults to a 2x2 layout ordered as `zed1 zed2 / zed3 zed4`, and writes `/_grid.mp4` unless `--output` is provided. By default each tile is scaled to `0.5x`, so a four-camera 1920x1200 segment produces a 1920x1200 composite. Use repeated `--input` flags instead of `--segment-dir` when you want explicit row-major ordering. +Use the batch wrapper to run `zed_svo_grid_to_mp4` over many segment directories with one aggregate progress bar: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + /workspaces/data/kindergarten \ + --recursive \ + --jobs 2 \ + --encoder-device auto \ + --duration-seconds 2 +``` + +You can also provide the exact segments to convert: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + --segment-dir /workspaces/data/kindergarten/jump/external/recording/2026-03-18T11-23-22 \ + --segment-dir /workspaces/data/kindergarten/jump/experiment/1/2026-03-18T11-26-23 \ + --jobs 2 +``` + +Or preserve a precomputed CSV ordering: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + --segments-csv /workspaces/data/kindergarten/svo2_segments_sorted.csv \ + --jobs 2 \ + --duration-seconds 2 +``` + +The batch grid wrapper mirrors the grid encoder options, skips existing `/_grid.mp4` outputs by default, and returns a nonzero exit code if any segment fails. + +#### Expected CSV Input Format + +The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory. + +Repeated rows for the same `segment_dir` are allowed; the wrapper converts each unique segment once, preserving the first-seen CSV order. This makes `/workspaces/data/kindergarten/svo2_segments_sorted.csv` a valid input even though it stores one row per camera file: + +```csv +timestamp,activity,group_path,segment_dir,camera,relative_path +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed1,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed1.svo2 +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed2,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2 +``` + ### Mandatory Acceptance (Standalone) Run the full mandatory acceptance suite. This executes the complete protocol/codec matrix without requiring external servers. diff --git a/scripts/zed_batch_svo_grid_to_mp4.py b/scripts/zed_batch_svo_grid_to_mp4.py new file mode 100644 index 0000000..59a5e84 --- /dev/null +++ b/scripts/zed_batch_svo_grid_to_mp4.py @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import concurrent.futures +import csv +import os +import re +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path + +import click +from tqdm import tqdm + + +SCRIPT_PATH = Path(__file__).resolve() +REPO_ROOT = SCRIPT_PATH.parents[1] +SEGMENT_FILE_PATTERN = re.compile(r".*_zed([1-4])\.svo2?$", re.IGNORECASE) +EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4") + + +@dataclass(slots=True, frozen=True) +class BatchConfig: + zed_bin: Path + cuda_visible_devices: str | None + overwrite: bool + fail_fast: bool + codec: str + encoder_device: str + preset: str + tune: str + quality: int + gop: int + b_frames: int + start_offset_seconds: float + duration_seconds: float | None + output_fps: float | None + tile_scale: float + + +@dataclass(slots=True, frozen=True) +class ConversionJob: + segment_dir: Path + output_path: Path + + +@dataclass(slots=True, frozen=True) +class JobResult: + status: str + segment_dir: Path + output_path: Path + command: tuple[str, ...] + return_code: int = 0 + stdout: str = "" + stderr: str = "" + + +@dataclass(slots=True, frozen=True) +class SegmentScan: + segment_dir: Path + matched_files: int + is_valid: bool + reason: str | None = None + + +@dataclass(slots=True, frozen=True) +class SourceResolution: + mode: str + segment_dirs: tuple[Path, ...] + ignored_partial_dirs: tuple[SegmentScan, ...] + + +def locate_binary(override: Path | None) -> Path: + if override is not None: + candidate = override.expanduser().resolve() + if not candidate.is_file(): + raise click.ClickException(f"binary not found: {candidate}") + return candidate + + candidates = ( + REPO_ROOT / "build" / "bin" / "zed_svo_grid_to_mp4", + REPO_ROOT / "build" / "zed_svo_grid_to_mp4", + ) + for candidate in candidates: + if candidate.is_file(): + return candidate + raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}") + + +def scan_segment_dir(segment_dir: Path) -> SegmentScan: + if not segment_dir.is_dir(): + return SegmentScan( + segment_dir=segment_dir, + matched_files=0, + is_valid=False, + reason=f"segment directory does not exist: {segment_dir}", + ) + + matched_by_camera: dict[str, list[Path]] = {camera: [] for camera in EXPECTED_CAMERAS} + for child in segment_dir.iterdir(): + if not child.is_file(): + continue + match = SEGMENT_FILE_PATTERN.fullmatch(child.name) + if match is None: + continue + matched_by_camera[f"zed{match.group(1)}"].append(child) + + matched_files = sum(len(paths) for paths in matched_by_camera.values()) + duplicate_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) > 1] + missing_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) == 0] + + if duplicate_cameras: + duplicate_text = ", ".join(duplicate_cameras) + return SegmentScan( + segment_dir=segment_dir, + matched_files=matched_files, + is_valid=False, + reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}", + ) + if missing_cameras: + missing_text = ", ".join(missing_cameras) + return SegmentScan( + segment_dir=segment_dir, + matched_files=matched_files, + is_valid=False, + reason=f"missing camera inputs under {segment_dir}: {missing_text}", + ) + + return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True) + + +def dedupe_paths(paths: list[Path]) -> list[Path]: + ordered: list[Path] = [] + seen: set[Path] = set() + for path in paths: + resolved = path.expanduser().resolve() + if resolved in seen: + continue + seen.add(resolved) + ordered.append(resolved) + return ordered + + +def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution: + if not root.is_dir(): + raise click.ClickException(f"input directory does not exist: {root}") + + candidate_dirs = {root.resolve()} + iterator = root.rglob("*") if recursive else root.iterdir() + for path in iterator: + if path.is_dir(): + candidate_dirs.add(path.resolve()) + + valid_dirs: list[Path] = [] + ignored_partial_dirs: list[SegmentScan] = [] + for segment_dir in sorted(candidate_dirs): + scan = scan_segment_dir(segment_dir) + if scan.is_valid: + valid_dirs.append(segment_dir) + elif scan.matched_files > 0: + ignored_partial_dirs.append(scan) + + if not valid_dirs: + raise click.ClickException(f"no complete four-camera segments found under {root}") + + return SourceResolution( + mode="discovery", + segment_dirs=tuple(valid_dirs), + ignored_partial_dirs=tuple(ignored_partial_dirs), + ) + + +def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]: + csv_path = csv_path.expanduser().resolve() + if not csv_path.is_file(): + raise click.ClickException(f"CSV not found: {csv_path}") + + if csv_root is not None: + base_dir = csv_root.expanduser().resolve() + if not base_dir.is_dir(): + raise click.ClickException(f"CSV root is not a directory: {base_dir}") + else: + base_dir = csv_path.parent + + segment_dirs: list[Path] = [] + seen: set[Path] = set() + with csv_path.open(newline="") as stream: + reader = csv.DictReader(stream) + if reader.fieldnames is None or "segment_dir" not in reader.fieldnames: + raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header") + + for row_number, row in enumerate(reader, start=2): + raw_segment_dir = (row.get("segment_dir") or "").strip() + if not raw_segment_dir: + raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value") + segment_dir = Path(raw_segment_dir) + resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir + resolved = resolved.expanduser().resolve() + if resolved in seen: + continue + seen.add(resolved) + segment_dirs.append(resolved) + + if not segment_dirs: + raise click.ClickException(f"{csv_path} did not contain any segment_dir rows") + return tuple(segment_dirs) + + +def resolve_sources( + input_dir: Path | None, + segment_dirs: tuple[Path, ...], + segments_csv: Path | None, + csv_root: Path | None, + recursive: bool, +) -> SourceResolution: + source_count = sum( + ( + 1 if input_dir is not None else 0, + 1 if segment_dirs else 0, + 1 if segments_csv is not None else 0, + ) + ) + if source_count != 1: + raise click.ClickException( + "provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv" + ) + + if input_dir is not None: + return discover_segment_dirs(input_dir.expanduser().resolve(), recursive) + + if segment_dirs: + ordered_dirs = dedupe_paths(list(segment_dirs)) + return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=()) + + return SourceResolution( + mode="segments-csv", + segment_dirs=parse_segments_csv(segments_csv, csv_root), + ignored_partial_dirs=(), + ) + + +def output_path_for(segment_dir: Path) -> Path: + return segment_dir / f"{segment_dir.name}_grid.mp4" + + +def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: + command = [ + str(config.zed_bin), + "--segment-dir", + str(job.segment_dir), + "--codec", + config.codec, + "--encoder-device", + config.encoder_device, + "--preset", + config.preset, + "--tune", + config.tune, + "--quality", + str(config.quality), + "--gop", + str(config.gop), + "--b-frames", + str(config.b_frames), + "--start-offset-seconds", + str(config.start_offset_seconds), + "--tile-scale", + str(config.tile_scale), + ] + if config.duration_seconds is not None: + command.extend(["--duration-seconds", str(config.duration_seconds)]) + if config.output_fps is not None: + command.extend(["--output-fps", str(config.output_fps)]) + return command + + +def env_for_job(config: BatchConfig) -> dict[str, str]: + env = dict(os.environ) + if config.cuda_visible_devices is not None: + env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices + return env + + +def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: + command = command_for_job(job, config) + completed = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + env=env_for_job(config), + ) + status = "converted" if completed.returncode == 0 else "failed" + return JobResult( + status=status, + segment_dir=job.segment_dir, + output_path=job.output_path, + command=tuple(command), + return_code=completed.returncode, + stdout=completed.stdout, + stderr=completed.stderr, + ) + + +def summarize_failures(results: list[JobResult]) -> None: + failed_results = [result for result in results if result.status == "failed"] + if not failed_results: + return + + click.echo("\nFailed conversions:", err=True) + for result in failed_results: + click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True) + if result.stderr.strip(): + click.echo(result.stderr.rstrip(), err=True) + elif result.stdout.strip(): + click.echo(result.stdout.rstrip(), err=True) + + +def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: + results: list[JobResult] = [] + aborted_count = 0 + if not jobs: + return results, aborted_count + + future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {} + job_iter = iter(jobs) + stop_submitting = False + + with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor: + with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress: + + def submit_next() -> bool: + if stop_submitting: + return False + try: + job = next(job_iter) + except StopIteration: + return False + future = executor.submit(run_conversion, job, config) + future_to_job[future] = job + return True + + for _ in range(min(jobs_limit, len(jobs))): + submit_next() + + while future_to_job: + done, _ = concurrent.futures.wait( + future_to_job, + return_when=concurrent.futures.FIRST_COMPLETED, + ) + for future in done: + job = future_to_job.pop(future) + result = future.result() + results.append(result) + progress.update(1) + + if result.status == "failed": + tqdm.write( + f"failed: {job.segment_dir} (exit {result.return_code})", + file=sys.stderr, + ) + if config.fail_fast: + stop_submitting = True + + if not stop_submitting: + submit_next() + + if stop_submitting: + remaining = sum(1 for _ in job_iter) + aborted_count = remaining + progress.total = progress.n + len(future_to_job) + progress.refresh() + + return results, aborted_count + + +@click.command() +@click.argument( + "input_dir", + required=False, + type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), +) +@click.option( + "--segment-dir", + "segment_dirs", + multiple=True, + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.", +) +@click.option( + "--segments-csv", + type=click.Path(path_type=Path, dir_okay=False), + help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.", +) +@click.option( + "--csv-root", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.", +) +@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.") +@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.") +@click.option( + "--zed-bin", + type=click.Path(path_type=Path, dir_okay=False), + help="Explicit path to the zed_svo_grid_to_mp4 binary.", +) +@click.option( + "--cuda-visible-devices", + help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", +) +@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.") +@click.option( + "--fail-fast/--continue-on-error", + default=False, + show_default=True, + help="Stop submitting new work after the first failed conversion.", +) +@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True) +@click.option( + "--encoder-device", + type=click.Choice(("auto", "nvidia", "software")), + default="auto", + show_default=True, +) +@click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True) +@click.option( + "--tune", + type=click.Choice(("low-latency", "balanced")), + default="low-latency", + show_default=True, +) +@click.option( + "--quality", + type=click.IntRange(min=0, max=51), + default=23, + show_default=True, + help="Lower values mean higher quality.", +) +@click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True) +@click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True) +@click.option( + "--start-offset-seconds", + type=click.FloatRange(min=0.0), + default=0.0, + show_default=True, + help="Offset applied after the synced common start time.", +) +@click.option( + "--duration-seconds", + type=click.FloatRange(min=0.0, min_open=True), + default=None, + help="Limit export duration in seconds after sync.", +) +@click.option( + "--output-fps", + type=click.FloatRange(min=0.0, min_open=True), + default=None, + help="Composite output frame rate. Defaults to the grid tool's native behavior.", +) +@click.option( + "--tile-scale", + type=click.FloatRange(min=0.1, max=1.0), + default=0.5, + show_default=True, + help="Scale each tile relative to the source resolution.", +) +def main( + input_dir: Path | None, + segment_dirs: tuple[Path, ...], + segments_csv: Path | None, + csv_root: Path | None, + recursive: bool, + jobs: int, + zed_bin: Path | None, + cuda_visible_devices: str | None, + overwrite: bool, + fail_fast: bool, + codec: str, + encoder_device: str, + preset: str, + tune: str, + quality: int, + gop: int, + b_frames: int, + start_offset_seconds: float, + duration_seconds: float | None, + output_fps: float | None, + tile_scale: float, +) -> None: + """Batch-convert synced four-camera ZED segments into grid MP4 files.""" + if b_frames > gop: + raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") + + binary_path = locate_binary(zed_bin) + sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) + config = BatchConfig( + zed_bin=binary_path, + cuda_visible_devices=cuda_visible_devices, + overwrite=overwrite, + fail_fast=fail_fast, + codec=codec, + encoder_device=encoder_device, + preset=preset, + tune=tune, + quality=quality, + gop=gop, + b_frames=b_frames, + start_offset_seconds=start_offset_seconds, + duration_seconds=duration_seconds, + output_fps=output_fps, + tile_scale=tile_scale, + ) + + skipped_results: list[JobResult] = [] + failed_results: list[JobResult] = [] + pending_jobs: list[ConversionJob] = [] + + for segment_dir in sources.segment_dirs: + output_path = output_path_for(segment_dir) + job = ConversionJob(segment_dir=segment_dir, output_path=output_path) + command = tuple(command_for_job(job, config)) + scan = scan_segment_dir(segment_dir) + if not scan.is_valid: + failed_results.append( + JobResult( + status="failed", + segment_dir=segment_dir, + output_path=output_path, + command=command, + return_code=2, + stderr=scan.reason or "", + ) + ) + continue + if output_path.exists() and not overwrite: + skipped_results.append( + JobResult( + status="skipped", + segment_dir=segment_dir, + output_path=output_path, + command=command, + ) + ) + continue + pending_jobs.append(job) + + click.echo( + ( + f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " + f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs}" + ), + err=True, + ) + if sources.ignored_partial_dirs: + click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) + + results = list(skipped_results) + results.extend(failed_results) + conversion_results, aborted_count = run_batch(pending_jobs, config, jobs) + results.extend(conversion_results) + + converted_count = sum(1 for result in results if result.status == "converted") + skipped_count = sum(1 for result in results if result.status == "skipped") + failed_count = sum(1 for result in results if result.status == "failed") + + click.echo( + ( + f"summary: matched={len(sources.segment_dirs)} converted={converted_count} " + f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}" + ), + err=True, + ) + summarize_failures(results) + + if failed_count > 0 or aborted_count > 0: + raise SystemExit(1) + + +if __name__ == "__main__": + main()