Add batch SVO to MP4 wrapper

This commit is contained in:
2026-03-19 10:25:57 +00:00
parent 2671ac7ba9
commit 58ee647c8a
4 changed files with 426 additions and 1 deletions
+361
View File
@@ -0,0 +1,361 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import os
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import click
from tqdm import tqdm
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
DEFAULT_PATTERNS = ("*.svo2",)
SUPPORTED_SUFFIXES = {".svo", ".svo2"}
@dataclass(slots=True, frozen=True)
class BatchConfig:
zed_bin: Path
cuda_visible_devices: str | None
overwrite: bool
fail_fast: bool
codec: str
encoder_device: str
preset: str
tune: str
quality: int
gop: int
b_frames: int
start_frame: int
end_frame: int | None
@dataclass(slots=True, frozen=True)
class ConversionJob:
input_path: Path
output_path: Path
@dataclass(slots=True, frozen=True)
class JobResult:
status: str
input_path: Path
output_path: Path
command: tuple[str, ...]
return_code: int = 0
stdout: str = ""
stderr: str = ""
def locate_binary(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / "zed_svo_to_mp4",
REPO_ROOT / "build" / "zed_svo_to_mp4",
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find zed_svo_to_mp4 under {REPO_ROOT / 'build'}")
def discover_inputs(root: Path, patterns: Iterable[str], recursive: bool) -> list[Path]:
discovered: set[Path] = set()
for pattern in patterns:
iterator = root.rglob(pattern) if recursive else root.glob(pattern)
for path in iterator:
if path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES:
discovered.add(path.absolute())
return sorted(discovered)
def output_path_for(input_path: Path) -> Path:
if input_path.suffix:
return input_path.with_suffix(".mp4")
return input_path.with_name(f"{input_path.name}.mp4")
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
command = [
str(config.zed_bin),
"--input",
str(job.input_path),
"--codec",
config.codec,
"--encoder-device",
config.encoder_device,
"--preset",
config.preset,
"--tune",
config.tune,
"--quality",
str(config.quality),
"--gop",
str(config.gop),
"--b-frames",
str(config.b_frames),
"--start-frame",
str(config.start_frame),
]
if config.end_frame is not None:
command.extend(["--end-frame", str(config.end_frame)])
return command
def env_for_job(config: BatchConfig) -> dict[str, str]:
env = dict(os.environ)
if config.cuda_visible_devices is not None:
env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices
return env
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
env=env_for_job(config),
)
status = "converted" if completed.returncode == 0 else "failed"
return JobResult(
status=status,
input_path=job.input_path,
output_path=job.output_path,
command=tuple(command),
return_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def summarize_failures(results: list[JobResult]) -> None:
failed_results = [result for result in results if result.status == "failed"]
if not failed_results:
return
click.echo("\nFailed conversions:", err=True)
for result in failed_results:
click.echo(f"- {result.input_path} (exit {result.return_code})", err=True)
if result.stderr.strip():
click.echo(result.stderr.rstrip(), err=True)
elif result.stdout.strip():
click.echo(result.stdout.rstrip(), err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = []
aborted_count = 0
if not jobs:
return results, aborted_count
future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {}
job_iter = iter(jobs)
stop_submitting = False
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor:
with tqdm(total=len(jobs), unit="file", dynamic_ncols=True) as progress:
def submit_next() -> bool:
if stop_submitting:
return False
try:
job = next(job_iter)
except StopIteration:
return False
future = executor.submit(run_conversion, job, config)
future_to_job[future] = job
return True
for _ in range(min(jobs_limit, len(jobs))):
submit_next()
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
job = future_to_job.pop(future)
result = future.result()
results.append(result)
progress.update(1)
if result.status == "failed":
tqdm.write(f"failed: {job.input_path} (exit {result.return_code})", file=sys.stderr)
if config.fail_fast:
stop_submitting = True
if not stop_submitting:
submit_next()
if stop_submitting:
remaining = sum(1 for _ in job_iter)
aborted_count = remaining
progress.total = progress.n + len(future_to_job)
progress.refresh()
return results, aborted_count
@click.command()
@click.argument("input_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
@click.option(
"--pattern",
"patterns",
multiple=True,
default=DEFAULT_PATTERNS,
show_default=True,
help="Glob pattern to match under the input directory. Repeatable.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Use rglob instead of glob.")
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_to_mp4 binary.",
)
@click.option(
"--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
)
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing MP4 files.")
@click.option(
"--fail-fast/--continue-on-error",
default=False,
show_default=True,
help="Stop submitting new work after the first failed conversion.",
)
@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True)
@click.option(
"--encoder-device",
type=click.Choice(("auto", "nvidia", "software")),
default="auto",
show_default=True,
)
@click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True)
@click.option(
"--tune",
type=click.Choice(("low-latency", "balanced")),
default="low-latency",
show_default=True,
)
@click.option(
"--quality",
type=click.IntRange(min=0, max=51),
default=23,
show_default=True,
help="Lower values mean higher quality.",
)
@click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True)
@click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True)
@click.option("--start-frame", type=click.IntRange(min=0), default=0, show_default=True)
@click.option("--end-frame", type=click.IntRange(min=0), default=None)
def main(
input_dir: Path,
patterns: tuple[str, ...],
recursive: bool,
jobs: int,
zed_bin: Path | None,
cuda_visible_devices: str | None,
overwrite: bool,
fail_fast: bool,
codec: str,
encoder_device: str,
preset: str,
tune: str,
quality: int,
gop: int,
b_frames: int,
start_frame: int,
end_frame: int | None,
) -> None:
"""Batch-convert ZED SVO/SVO2 recordings in a folder to MP4."""
if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if end_frame is not None and end_frame < start_frame:
raise click.BadParameter(
f"end-frame {end_frame} must be >= start-frame {start_frame}",
param_hint="--end-frame",
)
binary_path = locate_binary(zed_bin)
inputs = discover_inputs(input_dir.absolute(), patterns, recursive)
if not inputs:
raise click.ClickException(f"no .svo/.svo2 files matched under {input_dir}")
config = BatchConfig(
zed_bin=binary_path,
cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite,
fail_fast=fail_fast,
codec=codec,
encoder_device=encoder_device,
preset=preset,
tune=tune,
quality=quality,
gop=gop,
b_frames=b_frames,
start_frame=start_frame,
end_frame=end_frame,
)
skipped_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = []
for input_path in inputs:
output_path = output_path_for(input_path)
command = tuple(command_for_job(ConversionJob(input_path, output_path), config))
if output_path.exists() and not overwrite:
skipped_results.append(
JobResult(
status="skipped",
input_path=input_path,
output_path=output_path,
command=command,
)
)
continue
pending_jobs.append(ConversionJob(input_path=input_path, output_path=output_path))
click.echo(
f"matched={len(inputs)} pending={len(pending_jobs)} skipped={len(skipped_results)} jobs={jobs}",
err=True,
)
results = list(skipped_results)
conversion_results, aborted_count = run_batch(pending_jobs, config, jobs)
results.extend(conversion_results)
converted_count = sum(1 for result in results if result.status == "converted")
skipped_count = sum(1 for result in results if result.status == "skipped")
failed_count = sum(1 for result in results if result.status == "failed")
click.echo(
(
f"summary: matched={len(inputs)} converted={converted_count} "
f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}"
),
err=True,
)
summarize_failures(results)
if failed_count > 0:
raise SystemExit(1)
if aborted_count > 0:
raise SystemExit(1)
if __name__ == "__main__":
main()