#!/usr/bin/env python3 from __future__ import annotations import concurrent.futures import csv import os import re import subprocess import sys from dataclasses import dataclass from pathlib import Path import click from tqdm import tqdm SCRIPT_PATH = Path(__file__).resolve() REPO_ROOT = SCRIPT_PATH.parents[1] SEGMENT_FILE_PATTERN = re.compile(r".*_zed([1-4])\.svo2?$", re.IGNORECASE) EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4") @dataclass(slots=True, frozen=True) class BatchConfig: zed_bin: Path cuda_visible_devices: str | None overwrite: bool fail_fast: bool codec: str encoder_device: str preset: str tune: str quality: int gop: int b_frames: int start_offset_seconds: float duration_seconds: float | None output_fps: float | None tile_scale: float @dataclass(slots=True, frozen=True) class ConversionJob: segment_dir: Path output_path: Path @dataclass(slots=True, frozen=True) class JobResult: status: str segment_dir: Path output_path: Path command: tuple[str, ...] return_code: int = 0 stdout: str = "" stderr: str = "" @dataclass(slots=True, frozen=True) class SegmentScan: segment_dir: Path matched_files: int is_valid: bool reason: str | None = None @dataclass(slots=True, frozen=True) class SourceResolution: mode: str segment_dirs: tuple[Path, ...] ignored_partial_dirs: tuple[SegmentScan, ...] def locate_binary(override: Path | None) -> Path: if override is not None: candidate = override.expanduser().resolve() if not candidate.is_file(): raise click.ClickException(f"binary not found: {candidate}") return candidate candidates = ( REPO_ROOT / "build" / "bin" / "zed_svo_grid_to_mp4", REPO_ROOT / "build" / "zed_svo_grid_to_mp4", ) for candidate in candidates: if candidate.is_file(): return candidate raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}") def scan_segment_dir(segment_dir: Path) -> SegmentScan: if not segment_dir.is_dir(): return SegmentScan( segment_dir=segment_dir, matched_files=0, is_valid=False, reason=f"segment directory does not exist: {segment_dir}", ) matched_by_camera: dict[str, list[Path]] = {camera: [] for camera in EXPECTED_CAMERAS} for child in segment_dir.iterdir(): if not child.is_file(): continue match = SEGMENT_FILE_PATTERN.fullmatch(child.name) if match is None: continue matched_by_camera[f"zed{match.group(1)}"].append(child) matched_files = sum(len(paths) for paths in matched_by_camera.values()) duplicate_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) > 1] missing_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) == 0] if duplicate_cameras: duplicate_text = ", ".join(duplicate_cameras) return SegmentScan( segment_dir=segment_dir, matched_files=matched_files, is_valid=False, reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}", ) if missing_cameras: missing_text = ", ".join(missing_cameras) return SegmentScan( segment_dir=segment_dir, matched_files=matched_files, is_valid=False, reason=f"missing camera inputs under {segment_dir}: {missing_text}", ) return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True) def dedupe_paths(paths: list[Path]) -> list[Path]: ordered: list[Path] = [] seen: set[Path] = set() for path in paths: resolved = path.expanduser().resolve() if resolved in seen: continue seen.add(resolved) ordered.append(resolved) return ordered def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution: if not root.is_dir(): raise click.ClickException(f"input directory does not exist: {root}") candidate_dirs = {root.resolve()} iterator = root.rglob("*") if recursive else root.iterdir() for path in iterator: if path.is_dir(): candidate_dirs.add(path.resolve()) valid_dirs: list[Path] = [] ignored_partial_dirs: list[SegmentScan] = [] for segment_dir in sorted(candidate_dirs): scan = scan_segment_dir(segment_dir) if scan.is_valid: valid_dirs.append(segment_dir) elif scan.matched_files > 0: ignored_partial_dirs.append(scan) if not valid_dirs: raise click.ClickException(f"no complete four-camera segments found under {root}") return SourceResolution( mode="discovery", segment_dirs=tuple(valid_dirs), ignored_partial_dirs=tuple(ignored_partial_dirs), ) def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]: csv_path = csv_path.expanduser().resolve() if not csv_path.is_file(): raise click.ClickException(f"CSV not found: {csv_path}") if csv_root is not None: base_dir = csv_root.expanduser().resolve() if not base_dir.is_dir(): raise click.ClickException(f"CSV root is not a directory: {base_dir}") else: base_dir = csv_path.parent segment_dirs: list[Path] = [] seen: set[Path] = set() with csv_path.open(newline="") as stream: reader = csv.DictReader(stream) if reader.fieldnames is None or "segment_dir" not in reader.fieldnames: raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header") for row_number, row in enumerate(reader, start=2): raw_segment_dir = (row.get("segment_dir") or "").strip() if not raw_segment_dir: raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value") segment_dir = Path(raw_segment_dir) resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir resolved = resolved.expanduser().resolve() if resolved in seen: continue seen.add(resolved) segment_dirs.append(resolved) if not segment_dirs: raise click.ClickException(f"{csv_path} did not contain any segment_dir rows") return tuple(segment_dirs) def resolve_sources( input_dir: Path | None, segment_dirs: tuple[Path, ...], segments_csv: Path | None, csv_root: Path | None, recursive: bool, ) -> SourceResolution: source_count = sum( ( 1 if input_dir is not None else 0, 1 if segment_dirs else 0, 1 if segments_csv is not None else 0, ) ) if source_count != 1: raise click.ClickException( "provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv" ) if input_dir is not None: return discover_segment_dirs(input_dir.expanduser().resolve(), recursive) if segment_dirs: ordered_dirs = dedupe_paths(list(segment_dirs)) return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=()) return SourceResolution( mode="segments-csv", segment_dirs=parse_segments_csv(segments_csv, csv_root), ignored_partial_dirs=(), ) def output_path_for(segment_dir: Path) -> Path: return segment_dir / f"{segment_dir.name}_grid.mp4" def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: command = [ str(config.zed_bin), "--segment-dir", str(job.segment_dir), "--codec", config.codec, "--encoder-device", config.encoder_device, "--preset", config.preset, "--tune", config.tune, "--quality", str(config.quality), "--gop", str(config.gop), "--b-frames", str(config.b_frames), "--start-offset-seconds", str(config.start_offset_seconds), "--tile-scale", str(config.tile_scale), ] if config.duration_seconds is not None: command.extend(["--duration-seconds", str(config.duration_seconds)]) if config.output_fps is not None: command.extend(["--output-fps", str(config.output_fps)]) return command def env_for_job(config: BatchConfig) -> dict[str, str]: env = dict(os.environ) if config.cuda_visible_devices is not None: env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices return env def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: command = command_for_job(job, config) completed = subprocess.run( command, check=False, capture_output=True, text=True, env=env_for_job(config), ) status = "converted" if completed.returncode == 0 else "failed" return JobResult( status=status, segment_dir=job.segment_dir, output_path=job.output_path, command=tuple(command), return_code=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, ) def summarize_failures(results: list[JobResult]) -> None: failed_results = [result for result in results if result.status == "failed"] if not failed_results: return click.echo("\nFailed conversions:", err=True) for result in failed_results: click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True) if result.stderr.strip(): click.echo(result.stderr.rstrip(), err=True) elif result.stdout.strip(): click.echo(result.stdout.rstrip(), err=True) def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: results: list[JobResult] = [] aborted_count = 0 if not jobs: return results, aborted_count future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {} job_iter = iter(jobs) stop_submitting = False with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor: with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress: def submit_next() -> bool: if stop_submitting: return False try: job = next(job_iter) except StopIteration: return False future = executor.submit(run_conversion, job, config) future_to_job[future] = job return True for _ in range(min(jobs_limit, len(jobs))): submit_next() while future_to_job: done, _ = concurrent.futures.wait( future_to_job, return_when=concurrent.futures.FIRST_COMPLETED, ) for future in done: job = future_to_job.pop(future) result = future.result() results.append(result) progress.update(1) if result.status == "failed": tqdm.write( f"failed: {job.segment_dir} (exit {result.return_code})", file=sys.stderr, ) if config.fail_fast: stop_submitting = True if not stop_submitting: submit_next() if stop_submitting: remaining = sum(1 for _ in job_iter) aborted_count = remaining progress.total = progress.n + len(future_to_job) progress.refresh() return results, aborted_count @click.command() @click.argument( "input_dir", required=False, type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), ) @click.option( "--segment-dir", "segment_dirs", multiple=True, type=click.Path(path_type=Path, file_okay=False, dir_okay=True), help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.", ) @click.option( "--segments-csv", type=click.Path(path_type=Path, dir_okay=False), help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.", ) @click.option( "--csv-root", type=click.Path(path_type=Path, file_okay=False, dir_okay=True), help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.", ) @click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.") @click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.") @click.option( "--zed-bin", type=click.Path(path_type=Path, dir_okay=False), help="Explicit path to the zed_svo_grid_to_mp4 binary.", ) @click.option( "--cuda-visible-devices", help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", ) @click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.") @click.option( "--fail-fast/--continue-on-error", default=False, show_default=True, help="Stop submitting new work after the first failed conversion.", ) @click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True) @click.option( "--encoder-device", type=click.Choice(("auto", "nvidia", "software")), default="auto", show_default=True, ) @click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True) @click.option( "--tune", type=click.Choice(("low-latency", "balanced")), default="low-latency", show_default=True, ) @click.option( "--quality", type=click.IntRange(min=0, max=51), default=23, show_default=True, help="Lower values mean higher quality.", ) @click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True) @click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True) @click.option( "--start-offset-seconds", type=click.FloatRange(min=0.0), default=0.0, show_default=True, help="Offset applied after the synced common start time.", ) @click.option( "--duration-seconds", type=click.FloatRange(min=0.0, min_open=True), default=None, help="Limit export duration in seconds after sync.", ) @click.option( "--output-fps", type=click.FloatRange(min=0.0, min_open=True), default=None, help="Composite output frame rate. Defaults to the grid tool's native behavior.", ) @click.option( "--tile-scale", type=click.FloatRange(min=0.1, max=1.0), default=0.5, show_default=True, help="Scale each tile relative to the source resolution.", ) def main( input_dir: Path | None, segment_dirs: tuple[Path, ...], segments_csv: Path | None, csv_root: Path | None, recursive: bool, jobs: int, zed_bin: Path | None, cuda_visible_devices: str | None, overwrite: bool, fail_fast: bool, codec: str, encoder_device: str, preset: str, tune: str, quality: int, gop: int, b_frames: int, start_offset_seconds: float, duration_seconds: float | None, output_fps: float | None, tile_scale: float, ) -> None: """Batch-convert synced four-camera ZED segments into grid MP4 files.""" if b_frames > gop: raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") binary_path = locate_binary(zed_bin) sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) config = BatchConfig( zed_bin=binary_path, cuda_visible_devices=cuda_visible_devices, overwrite=overwrite, fail_fast=fail_fast, codec=codec, encoder_device=encoder_device, preset=preset, tune=tune, quality=quality, gop=gop, b_frames=b_frames, start_offset_seconds=start_offset_seconds, duration_seconds=duration_seconds, output_fps=output_fps, tile_scale=tile_scale, ) skipped_results: list[JobResult] = [] failed_results: list[JobResult] = [] pending_jobs: list[ConversionJob] = [] for segment_dir in sources.segment_dirs: output_path = output_path_for(segment_dir) job = ConversionJob(segment_dir=segment_dir, output_path=output_path) command = tuple(command_for_job(job, config)) scan = scan_segment_dir(segment_dir) if not scan.is_valid: failed_results.append( JobResult( status="failed", segment_dir=segment_dir, output_path=output_path, command=command, return_code=2, stderr=scan.reason or "", ) ) continue if output_path.exists() and not overwrite: skipped_results.append( JobResult( status="skipped", segment_dir=segment_dir, output_path=output_path, command=command, ) ) continue pending_jobs.append(job) click.echo( ( f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs}" ), err=True, ) if sources.ignored_partial_dirs: click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) results = list(skipped_results) results.extend(failed_results) conversion_results, aborted_count = run_batch(pending_jobs, config, jobs) results.extend(conversion_results) converted_count = sum(1 for result in results if result.status == "converted") skipped_count = sum(1 for result in results if result.status == "skipped") failed_count = sum(1 for result in results if result.status == "failed") click.echo( ( f"summary: matched={len(sources.segment_dirs)} converted={converted_count} " f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}" ), err=True, ) summarize_failures(results) if failed_count > 0 or aborted_count > 0: raise SystemExit(1) if __name__ == "__main__": main()