#!/usr/bin/env python3 from __future__ import annotations import concurrent.futures import os import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Iterable import click from tqdm import tqdm SCRIPT_PATH = Path(__file__).resolve() REPO_ROOT = SCRIPT_PATH.parents[1] DEFAULT_PATTERNS = ("*.svo2",) SUPPORTED_SUFFIXES = {".svo", ".svo2"} @dataclass(slots=True, frozen=True) class BatchConfig: zed_bin: Path cuda_visible_devices: str | None overwrite: bool fail_fast: bool codec: str encoder_device: str preset: str tune: str quality: int gop: int b_frames: int start_frame: int end_frame: int | None @dataclass(slots=True, frozen=True) class ConversionJob: input_path: Path output_path: Path @dataclass(slots=True, frozen=True) class JobResult: status: str input_path: Path output_path: Path command: tuple[str, ...] return_code: int = 0 stdout: str = "" stderr: str = "" def locate_binary(override: Path | None) -> Path: if override is not None: candidate = override.expanduser().resolve() if not candidate.is_file(): raise click.ClickException(f"binary not found: {candidate}") return candidate candidates = ( REPO_ROOT / "build" / "bin" / "zed_svo_to_mp4", REPO_ROOT / "build" / "zed_svo_to_mp4", ) for candidate in candidates: if candidate.is_file(): return candidate raise click.ClickException(f"could not find zed_svo_to_mp4 under {REPO_ROOT / 'build'}") def discover_inputs(root: Path, patterns: Iterable[str], recursive: bool) -> list[Path]: discovered: set[Path] = set() for pattern in patterns: iterator = root.rglob(pattern) if recursive else root.glob(pattern) for path in iterator: if path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES: discovered.add(path.absolute()) return sorted(discovered) def output_path_for(input_path: Path) -> Path: if input_path.suffix: return input_path.with_suffix(".mp4") return input_path.with_name(f"{input_path.name}.mp4") def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: command = [ str(config.zed_bin), "--input", str(job.input_path), "--codec", config.codec, "--encoder-device", config.encoder_device, "--preset", config.preset, "--tune", config.tune, "--quality", str(config.quality), "--gop", str(config.gop), "--b-frames", str(config.b_frames), "--start-frame", str(config.start_frame), ] if config.end_frame is not None: command.extend(["--end-frame", str(config.end_frame)]) return command def env_for_job(config: BatchConfig) -> dict[str, str]: env = dict(os.environ) if config.cuda_visible_devices is not None: env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices return env def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: command = command_for_job(job, config) completed = subprocess.run( command, check=False, capture_output=True, text=True, env=env_for_job(config), ) status = "converted" if completed.returncode == 0 else "failed" return JobResult( status=status, input_path=job.input_path, output_path=job.output_path, command=tuple(command), return_code=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, ) def summarize_failures(results: list[JobResult]) -> None: failed_results = [result for result in results if result.status == "failed"] if not failed_results: return click.echo("\nFailed conversions:", err=True) for result in failed_results: click.echo(f"- {result.input_path} (exit {result.return_code})", err=True) if result.stderr.strip(): click.echo(result.stderr.rstrip(), err=True) elif result.stdout.strip(): click.echo(result.stdout.rstrip(), err=True) def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: results: list[JobResult] = [] aborted_count = 0 if not jobs: return results, aborted_count future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {} job_iter = iter(jobs) stop_submitting = False with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor: with tqdm(total=len(jobs), unit="file", dynamic_ncols=True) as progress: def submit_next() -> bool: if stop_submitting: return False try: job = next(job_iter) except StopIteration: return False future = executor.submit(run_conversion, job, config) future_to_job[future] = job return True for _ in range(min(jobs_limit, len(jobs))): submit_next() while future_to_job: done, _ = concurrent.futures.wait( future_to_job, return_when=concurrent.futures.FIRST_COMPLETED, ) for future in done: job = future_to_job.pop(future) result = future.result() results.append(result) progress.update(1) if result.status == "failed": tqdm.write(f"failed: {job.input_path} (exit {result.return_code})", file=sys.stderr) if config.fail_fast: stop_submitting = True if not stop_submitting: submit_next() if stop_submitting: remaining = sum(1 for _ in job_iter) aborted_count = remaining progress.total = progress.n + len(future_to_job) progress.refresh() return results, aborted_count @click.command() @click.argument("input_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path)) @click.option( "--pattern", "patterns", multiple=True, default=DEFAULT_PATTERNS, show_default=True, help="Glob pattern to match under the input directory. Repeatable.", ) @click.option("--recursive/--no-recursive", default=True, show_default=True, help="Use rglob instead of glob.") @click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.") @click.option( "--zed-bin", type=click.Path(path_type=Path, dir_okay=False), help="Explicit path to the zed_svo_to_mp4 binary.", ) @click.option( "--cuda-visible-devices", help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", ) @click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing MP4 files.") @click.option( "--fail-fast/--continue-on-error", default=False, show_default=True, help="Stop submitting new work after the first failed conversion.", ) @click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True) @click.option( "--encoder-device", type=click.Choice(("auto", "nvidia", "software")), default="auto", show_default=True, ) @click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True) @click.option( "--tune", type=click.Choice(("low-latency", "balanced")), default="low-latency", show_default=True, ) @click.option( "--quality", type=click.IntRange(min=0, max=51), default=23, show_default=True, help="Lower values mean higher quality.", ) @click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True) @click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True) @click.option("--start-frame", type=click.IntRange(min=0), default=0, show_default=True) @click.option("--end-frame", type=click.IntRange(min=0), default=None) def main( input_dir: Path, patterns: tuple[str, ...], recursive: bool, jobs: int, zed_bin: Path | None, cuda_visible_devices: str | None, overwrite: bool, fail_fast: bool, codec: str, encoder_device: str, preset: str, tune: str, quality: int, gop: int, b_frames: int, start_frame: int, end_frame: int | None, ) -> None: """Batch-convert ZED SVO/SVO2 recordings in a folder to MP4.""" if b_frames > gop: raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") if end_frame is not None and end_frame < start_frame: raise click.BadParameter( f"end-frame {end_frame} must be >= start-frame {start_frame}", param_hint="--end-frame", ) binary_path = locate_binary(zed_bin) inputs = discover_inputs(input_dir.absolute(), patterns, recursive) if not inputs: raise click.ClickException(f"no .svo/.svo2 files matched under {input_dir}") config = BatchConfig( zed_bin=binary_path, cuda_visible_devices=cuda_visible_devices, overwrite=overwrite, fail_fast=fail_fast, codec=codec, encoder_device=encoder_device, preset=preset, tune=tune, quality=quality, gop=gop, b_frames=b_frames, start_frame=start_frame, end_frame=end_frame, ) skipped_results: list[JobResult] = [] pending_jobs: list[ConversionJob] = [] for input_path in inputs: output_path = output_path_for(input_path) command = tuple(command_for_job(ConversionJob(input_path, output_path), config)) if output_path.exists() and not overwrite: skipped_results.append( JobResult( status="skipped", input_path=input_path, output_path=output_path, command=command, ) ) continue pending_jobs.append(ConversionJob(input_path=input_path, output_path=output_path)) click.echo( f"matched={len(inputs)} pending={len(pending_jobs)} skipped={len(skipped_results)} jobs={jobs}", err=True, ) results = list(skipped_results) conversion_results, aborted_count = run_batch(pending_jobs, config, jobs) results.extend(conversion_results) converted_count = sum(1 for result in results if result.status == "converted") skipped_count = sum(1 for result in results if result.status == "skipped") failed_count = sum(1 for result in results if result.status == "failed") click.echo( ( f"summary: matched={len(inputs)} converted={converted_count} " f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}" ), err=True, ) summarize_failures(results) if failed_count > 0: raise SystemExit(1) if aborted_count > 0: raise SystemExit(1) if __name__ == "__main__": main()