#!/usr/bin/env python3 from __future__ import annotations import concurrent.futures import csv import json import math import os import re import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path import click from tqdm import tqdm SCRIPT_PATH = Path(__file__).resolve() REPO_ROOT = SCRIPT_PATH.parents[1] SEGMENT_FILE_PATTERN = re.compile(r".*_zed([1-4])\.svo2?$", re.IGNORECASE) EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4") @dataclass(slots=True, frozen=True) class BatchConfig: zed_bin: Path | None ffprobe_bin: Path | None probe_existing: bool cuda_visible_devices: str | None overwrite: bool fail_fast: bool codec: str encoder_device: str preset: str tune: str quality: int gop: int b_frames: int start_offset_seconds: float duration_seconds: float | None output_fps: float | None tile_scale: float @dataclass(slots=True, frozen=True) class ConversionJob: segment_dir: Path output_path: Path @dataclass(slots=True, frozen=True) class JobResult: status: str segment_dir: Path output_path: Path command: tuple[str, ...] return_code: int = 0 stdout: str = "" stderr: str = "" @dataclass(slots=True, frozen=True) class SegmentScan: segment_dir: Path matched_files: int is_valid: bool reason: str | None = None @dataclass(slots=True, frozen=True) class SourceResolution: mode: str segment_dirs: tuple[Path, ...] ignored_partial_dirs: tuple[SegmentScan, ...] @dataclass(slots=True, frozen=True) class OutputProbeResult: output_path: Path status: str reason: str = "" duration_seconds: float | None = None def locate_binary(override: Path | None) -> Path: if override is not None: candidate = override.expanduser().resolve() if not candidate.is_file(): raise click.ClickException(f"binary not found: {candidate}") return candidate candidates = ( REPO_ROOT / "build" / "bin" / "zed_svo_grid_to_mp4", REPO_ROOT / "build" / "zed_svo_grid_to_mp4", ) for candidate in candidates: if candidate.is_file(): return candidate raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}") def locate_ffprobe(override: Path | None) -> Path: if override is not None: candidate = override.expanduser().resolve() if not candidate.is_file(): raise click.ClickException(f"ffprobe binary not found: {candidate}") return candidate resolved = shutil.which("ffprobe") if resolved is None: raise click.ClickException("could not find ffprobe on PATH") return Path(resolved).resolve() def scan_segment_dir(segment_dir: Path) -> SegmentScan: if not segment_dir.is_dir(): return SegmentScan( segment_dir=segment_dir, matched_files=0, is_valid=False, reason=f"segment directory does not exist: {segment_dir}", ) matched_by_camera: dict[str, list[Path]] = {camera: [] for camera in EXPECTED_CAMERAS} for child in segment_dir.iterdir(): if not child.is_file(): continue match = SEGMENT_FILE_PATTERN.fullmatch(child.name) if match is None: continue matched_by_camera[f"zed{match.group(1)}"].append(child) matched_files = sum(len(paths) for paths in matched_by_camera.values()) duplicate_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) > 1] missing_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) == 0] if duplicate_cameras: duplicate_text = ", ".join(duplicate_cameras) return SegmentScan( segment_dir=segment_dir, matched_files=matched_files, is_valid=False, reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}", ) if missing_cameras: missing_text = ", ".join(missing_cameras) return SegmentScan( segment_dir=segment_dir, matched_files=matched_files, is_valid=False, reason=f"missing camera inputs under {segment_dir}: {missing_text}", ) return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True) def dedupe_paths(paths: list[Path]) -> list[Path]: ordered: list[Path] = [] seen: set[Path] = set() for path in paths: resolved = path.expanduser().resolve() if resolved in seen: continue seen.add(resolved) ordered.append(resolved) return ordered def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution: if not root.is_dir(): raise click.ClickException(f"input directory does not exist: {root}") candidate_dirs = {root.resolve()} iterator = root.rglob("*") if recursive else root.iterdir() for path in iterator: if path.is_dir(): candidate_dirs.add(path.resolve()) valid_dirs: list[Path] = [] ignored_partial_dirs: list[SegmentScan] = [] for segment_dir in sorted(candidate_dirs): scan = scan_segment_dir(segment_dir) if scan.is_valid: valid_dirs.append(segment_dir) elif scan.matched_files > 0: ignored_partial_dirs.append(scan) if not valid_dirs: raise click.ClickException(f"no complete four-camera segments found under {root}") return SourceResolution( mode="discovery", segment_dirs=tuple(valid_dirs), ignored_partial_dirs=tuple(ignored_partial_dirs), ) def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]: csv_path = csv_path.expanduser().resolve() if not csv_path.is_file(): raise click.ClickException(f"CSV not found: {csv_path}") if csv_root is not None: base_dir = csv_root.expanduser().resolve() if not base_dir.is_dir(): raise click.ClickException(f"CSV root is not a directory: {base_dir}") else: base_dir = csv_path.parent segment_dirs: list[Path] = [] seen: set[Path] = set() with csv_path.open(newline="") as stream: reader = csv.DictReader(stream) if reader.fieldnames is None or "segment_dir" not in reader.fieldnames: raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header") for row_number, row in enumerate(reader, start=2): raw_segment_dir = (row.get("segment_dir") or "").strip() if not raw_segment_dir: raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value") segment_dir = Path(raw_segment_dir) resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir resolved = resolved.expanduser().resolve() if resolved in seen: continue seen.add(resolved) segment_dirs.append(resolved) if not segment_dirs: raise click.ClickException(f"{csv_path} did not contain any segment_dir rows") return tuple(segment_dirs) def resolve_sources( input_dir: Path | None, segment_dirs: tuple[Path, ...], segments_csv: Path | None, csv_root: Path | None, recursive: bool, ) -> SourceResolution: source_count = sum( ( 1 if input_dir is not None else 0, 1 if segment_dirs else 0, 1 if segments_csv is not None else 0, ) ) if source_count != 1: raise click.ClickException( "provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv" ) if input_dir is not None: return discover_segment_dirs(input_dir.expanduser().resolve(), recursive) if segment_dirs: ordered_dirs = dedupe_paths(list(segment_dirs)) return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=()) return SourceResolution( mode="segments-csv", segment_dirs=parse_segments_csv(segments_csv, csv_root), ignored_partial_dirs=(), ) def output_path_for(segment_dir: Path) -> Path: return segment_dir / f"{segment_dir.name}_grid.mp4" def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: if config.zed_bin is None: raise RuntimeError("zed_svo_grid_to_mp4 binary is not configured") command = [ str(config.zed_bin), "--segment-dir", str(job.segment_dir), "--codec", config.codec, "--encoder-device", config.encoder_device, "--preset", config.preset, "--tune", config.tune, "--quality", str(config.quality), "--gop", str(config.gop), "--b-frames", str(config.b_frames), "--start-offset-seconds", str(config.start_offset_seconds), "--tile-scale", str(config.tile_scale), ] if config.duration_seconds is not None: command.extend(["--duration-seconds", str(config.duration_seconds)]) if config.output_fps is not None: command.extend(["--output-fps", str(config.output_fps)]) return command def env_for_job(config: BatchConfig) -> dict[str, str]: env = dict(os.environ) if config.cuda_visible_devices is not None: env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices return env def probe_output(output_path: Path, ffprobe_bin: Path | None) -> OutputProbeResult: if not output_path.is_file(): return OutputProbeResult(output_path=output_path, status="missing") if ffprobe_bin is None: raise RuntimeError("ffprobe binary is not configured") completed = subprocess.run( [ str(ffprobe_bin), "-v", "error", "-print_format", "json", "-show_entries", "format=duration,size:stream=codec_type,codec_name,width,height,nb_frames", str(output_path), ], check=False, capture_output=True, text=True, ) if completed.returncode != 0: reason = completed.stderr.strip() or completed.stdout.strip() or "ffprobe failed" return OutputProbeResult(output_path=output_path, status="invalid", reason=reason) try: payload = json.loads(completed.stdout) except json.JSONDecodeError as error: return OutputProbeResult( output_path=output_path, status="invalid", reason=f"ffprobe returned invalid JSON: {error}", ) streams = payload.get("streams", []) has_video_stream = any(stream.get("codec_type") == "video" for stream in streams) if not has_video_stream: return OutputProbeResult( output_path=output_path, status="invalid", reason="ffprobe found no video stream", ) format_payload = payload.get("format", {}) duration_text = format_payload.get("duration") if duration_text in (None, ""): return OutputProbeResult( output_path=output_path, status="invalid", reason="ffprobe did not report a duration", ) try: duration_seconds = float(duration_text) except (TypeError, ValueError): return OutputProbeResult( output_path=output_path, status="invalid", reason=f"ffprobe reported a non-numeric duration: {duration_text!r}", ) if not math.isfinite(duration_seconds) or duration_seconds <= 0.0: return OutputProbeResult( output_path=output_path, status="invalid", reason=f"ffprobe reported a non-positive duration: {duration_seconds}", ) return OutputProbeResult( output_path=output_path, status="valid", duration_seconds=duration_seconds, ) def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: command = command_for_job(job, config) completed = subprocess.run( command, check=False, capture_output=True, text=True, env=env_for_job(config), ) status = "converted" if completed.returncode == 0 else "failed" return JobResult( status=status, segment_dir=job.segment_dir, output_path=job.output_path, command=tuple(command), return_code=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, ) def summarize_failures(results: list[JobResult]) -> None: failed_results = [result for result in results if result.status == "failed"] if not failed_results: return click.echo("\nFailed conversions:", err=True) for result in failed_results: click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True) if result.stderr.strip(): click.echo(result.stderr.rstrip(), err=True) elif result.stdout.strip(): click.echo(result.stdout.rstrip(), err=True) def report_invalid_existing_outputs( invalid_existing: list[tuple[ConversionJob, OutputProbeResult]], ) -> None: if not invalid_existing: return click.echo("\nInvalid existing outputs:", err=True) for job, probe in invalid_existing: click.echo(f"- {job.segment_dir}", err=True) click.echo(f" output: {probe.output_path}", err=True) reason_lines = probe.reason.splitlines() or [probe.reason] click.echo(f" reason: {reason_lines[0]}", err=True) for line in reason_lines[1:]: click.echo(f" {line}", err=True) def report_dry_run_plan( pending_jobs: list[ConversionJob], pending_reasons: dict[Path, str], pending_details: dict[Path, str], ) -> None: if not pending_jobs: click.echo("dry-run: no conversions would be launched", err=True) return click.echo("\nDry-run plan:", err=True) for job in pending_jobs: reason = pending_reasons[job.segment_dir] detail = pending_details.get(job.segment_dir) line = f"- {job.segment_dir} [{reason}]" if detail: line = f"{line}: {detail.replace(chr(10), ' | ')}" click.echo(line, err=True) def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: results: list[JobResult] = [] aborted_count = 0 if not jobs: return results, aborted_count future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {} job_iter = iter(jobs) stop_submitting = False with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor: with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress: def submit_next() -> bool: if stop_submitting: return False try: job = next(job_iter) except StopIteration: return False future = executor.submit(run_conversion, job, config) future_to_job[future] = job return True for _ in range(min(jobs_limit, len(jobs))): submit_next() while future_to_job: done, _ = concurrent.futures.wait( future_to_job, return_when=concurrent.futures.FIRST_COMPLETED, ) for future in done: job = future_to_job.pop(future) result = future.result() results.append(result) progress.update(1) if result.status == "failed": tqdm.write( f"failed: {job.segment_dir} (exit {result.return_code})", file=sys.stderr, ) if config.fail_fast: stop_submitting = True if not stop_submitting: submit_next() if stop_submitting: remaining = sum(1 for _ in job_iter) aborted_count = remaining progress.total = progress.n + len(future_to_job) progress.refresh() return results, aborted_count @click.command() @click.argument( "input_dir", required=False, type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), ) @click.option( "--segment-dir", "segment_dirs", multiple=True, type=click.Path(path_type=Path, file_okay=False, dir_okay=True), help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.", ) @click.option( "--segments-csv", type=click.Path(path_type=Path, dir_okay=False), help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.", ) @click.option( "--csv-root", type=click.Path(path_type=Path, file_okay=False, dir_okay=True), help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.", ) @click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.") @click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.") @click.option( "--zed-bin", type=click.Path(path_type=Path, dir_okay=False), help="Explicit path to the zed_svo_grid_to_mp4 binary.", ) @click.option( "--ffprobe-bin", type=click.Path(path_type=Path, dir_okay=False), help="Explicit path to ffprobe. Required when probing existing outputs and ffprobe is not on PATH.", ) @click.option( "--cuda-visible-devices", help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", ) @click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.") @click.option( "--probe-existing/--trust-existing", default=False, show_default=True, help="Validate existing grid MP4 files with ffprobe before skipping them. Invalid outputs are treated as missing.", ) @click.option( "--report-existing", is_flag=True, help="Probe existing grid MP4 files with ffprobe, report invalid ones, and do not launch conversions.", ) @click.option( "--dry-run", is_flag=True, help="Show which segments would be converted after applying skip/probe logic, without launching conversions.", ) @click.option( "--fail-fast/--continue-on-error", default=False, show_default=True, help="Stop submitting new work after the first failed conversion.", ) @click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True) @click.option( "--encoder-device", type=click.Choice(("auto", "nvidia", "software")), default="auto", show_default=True, ) @click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True) @click.option( "--tune", type=click.Choice(("low-latency", "balanced")), default="low-latency", show_default=True, ) @click.option( "--quality", type=click.IntRange(min=0, max=51), default=23, show_default=True, help="Lower values mean higher quality.", ) @click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True) @click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True) @click.option( "--start-offset-seconds", type=click.FloatRange(min=0.0), default=0.0, show_default=True, help="Offset applied after the synced common start time.", ) @click.option( "--duration-seconds", type=click.FloatRange(min=0.0, min_open=True), default=None, help="Limit export duration in seconds after sync.", ) @click.option( "--output-fps", type=click.FloatRange(min=0.0, min_open=True), default=None, help="Composite output frame rate. Defaults to the grid tool's native behavior.", ) @click.option( "--tile-scale", type=click.FloatRange(min=0.1, max=1.0), default=0.5, show_default=True, help="Scale each tile relative to the source resolution.", ) def main( input_dir: Path | None, segment_dirs: tuple[Path, ...], segments_csv: Path | None, csv_root: Path | None, recursive: bool, jobs: int, zed_bin: Path | None, ffprobe_bin: Path | None, cuda_visible_devices: str | None, overwrite: bool, probe_existing: bool, report_existing: bool, dry_run: bool, fail_fast: bool, codec: str, encoder_device: str, preset: str, tune: str, quality: int, gop: int, b_frames: int, start_offset_seconds: float, duration_seconds: float | None, output_fps: float | None, tile_scale: float, ) -> None: """Batch-convert synced four-camera ZED segments into grid MP4 files.""" if b_frames > gop: raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") if report_existing and dry_run: raise click.ClickException("--report-existing and --dry-run are mutually exclusive") ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None binary_path = None if report_existing else locate_binary(zed_bin) sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) config = BatchConfig( zed_bin=binary_path, ffprobe_bin=ffprobe_path, probe_existing=probe_existing or report_existing, cuda_visible_devices=cuda_visible_devices, overwrite=overwrite, fail_fast=fail_fast, codec=codec, encoder_device=encoder_device, preset=preset, tune=tune, quality=quality, gop=gop, b_frames=b_frames, start_offset_seconds=start_offset_seconds, duration_seconds=duration_seconds, output_fps=output_fps, tile_scale=tile_scale, ) skipped_results: list[JobResult] = [] failed_results: list[JobResult] = [] pending_jobs: list[ConversionJob] = [] pending_reasons: dict[Path, str] = {} pending_details: dict[Path, str] = {} valid_existing: list[OutputProbeResult] = [] invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = [] missing_outputs: list[ConversionJob] = [] for segment_dir in sources.segment_dirs: output_path = output_path_for(segment_dir) job = ConversionJob(segment_dir=segment_dir, output_path=output_path) command = tuple(command_for_job(job, config)) if config.zed_bin is not None else () scan = scan_segment_dir(segment_dir) if not scan.is_valid: failed_results.append( JobResult( status="failed", segment_dir=segment_dir, output_path=output_path, command=command, return_code=2, stderr=scan.reason or "", ) ) continue if report_existing: probe_result = probe_output(output_path, config.ffprobe_bin) if probe_result.status == "valid": valid_existing.append(probe_result) elif probe_result.status == "invalid": invalid_existing.append((job, probe_result)) else: missing_outputs.append(job) continue if overwrite: pending_jobs.append(job) pending_reasons[segment_dir] = "overwrite" continue if config.probe_existing: probe_result = probe_output(output_path, config.ffprobe_bin) if probe_result.status == "valid": valid_existing.append(probe_result) skipped_results.append( JobResult( status="skipped", segment_dir=segment_dir, output_path=output_path, command=command, ) ) continue if probe_result.status == "invalid": invalid_existing.append((job, probe_result)) pending_jobs.append(job) pending_reasons[segment_dir] = "invalid-existing-output" pending_details[segment_dir] = probe_result.reason continue missing_outputs.append(job) pending_jobs.append(job) pending_reasons[segment_dir] = "missing-output" continue if output_path.exists(): skipped_results.append( JobResult( status="skipped", segment_dir=segment_dir, output_path=output_path, command=command, ) ) continue pending_jobs.append(job) pending_reasons[segment_dir] = "missing-output" if report_existing: click.echo( ( f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} " f"invalid={len(invalid_existing)} missing={len(missing_outputs)} " f"invalid-segments={len(failed_results)}" ), err=True, ) if sources.ignored_partial_dirs: click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) report_invalid_existing_outputs(invalid_existing) summarize_failures(failed_results) if failed_results or invalid_existing: raise SystemExit(1) return click.echo( ( f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} " f"dry_run={'yes' if dry_run else 'no'}" ), err=True, ) if sources.ignored_partial_dirs: click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) if config.probe_existing: click.echo( ( f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} " f"missing={len(missing_outputs)}" ), err=True, ) if dry_run: report_dry_run_plan(pending_jobs, pending_reasons, pending_details) summarize_failures(failed_results) if failed_results: raise SystemExit(1) return results = list(skipped_results) results.extend(failed_results) conversion_results, aborted_count = run_batch(pending_jobs, config, jobs) results.extend(conversion_results) converted_count = sum(1 for result in results if result.status == "converted") skipped_count = sum(1 for result in results if result.status == "skipped") failed_count = sum(1 for result in results if result.status == "failed") click.echo( ( f"summary: matched={len(sources.segment_dirs)} converted={converted_count} " f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}" ), err=True, ) summarize_failures(results) if failed_count > 0 or aborted_count > 0: raise SystemExit(1) if __name__ == "__main__": main()