From 0cf0b2566db72055174ad44bd5fda7c848d39e84 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 19 Mar 2026 10:58:53 +0000 Subject: [PATCH] Add ffprobe checks for grid batch outputs --- README.md | 26 +++ scripts/zed_batch_svo_grid_to_mp4.py | 258 ++++++++++++++++++++++++++- 2 files changed, 279 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b2fdb79..2993be3 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,32 @@ uv run python scripts/zed_batch_svo_grid_to_mp4.py \ The batch grid wrapper mirrors the grid encoder options, skips existing `/_grid.mp4` outputs by default, and returns a nonzero exit code if any segment fails. +When you suspect a previous run left behind partial MP4 files, opt into `ffprobe` validation so broken existing outputs are treated as missing instead of skipped: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + /workspaces/data/kindergarten \ + --probe-existing \ + --jobs 2 +``` + +Use `--report-existing` to audit existing outputs without launching conversions. The report prints invalid existing files only, while the summary still includes valid and missing counts. This is useful for the partial-write failure mode currently seen as `moov atom not found` in some kindergarten grid MP4s: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + /workspaces/data/kindergarten \ + --report-existing +``` + +Use `--dry-run` to preview what the batch wrapper would convert after applying skip logic. Combine it with `--probe-existing` when you want to see which broken existing outputs would be requeued: + +```bash +uv run python scripts/zed_batch_svo_grid_to_mp4.py \ + /workspaces/data/kindergarten \ + --probe-existing \ + --dry-run +``` + #### Expected CSV Input Format The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory. diff --git a/scripts/zed_batch_svo_grid_to_mp4.py b/scripts/zed_batch_svo_grid_to_mp4.py index 59a5e84..2d27800 100644 --- a/scripts/zed_batch_svo_grid_to_mp4.py +++ b/scripts/zed_batch_svo_grid_to_mp4.py @@ -4,8 +4,11 @@ from __future__ import annotations import concurrent.futures import csv +import json +import math import os import re +import shutil import subprocess import sys from dataclasses import dataclass @@ -23,7 +26,9 @@ EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4") @dataclass(slots=True, frozen=True) class BatchConfig: - zed_bin: Path + zed_bin: Path | None + ffprobe_bin: Path | None + probe_existing: bool cuda_visible_devices: str | None overwrite: bool fail_fast: bool @@ -72,6 +77,14 @@ class SourceResolution: ignored_partial_dirs: tuple[SegmentScan, ...] +@dataclass(slots=True, frozen=True) +class OutputProbeResult: + output_path: Path + status: str + reason: str = "" + duration_seconds: float | None = None + + def locate_binary(override: Path | None) -> Path: if override is not None: candidate = override.expanduser().resolve() @@ -89,6 +102,19 @@ def locate_binary(override: Path | None) -> Path: raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}") +def locate_ffprobe(override: Path | None) -> Path: + if override is not None: + candidate = override.expanduser().resolve() + if not candidate.is_file(): + raise click.ClickException(f"ffprobe binary not found: {candidate}") + return candidate + + resolved = shutil.which("ffprobe") + if resolved is None: + raise click.ClickException("could not find ffprobe on PATH") + return Path(resolved).resolve() + + def scan_segment_dir(segment_dir: Path) -> SegmentScan: if not segment_dir.is_dir(): return SegmentScan( @@ -246,6 +272,9 @@ def output_path_for(segment_dir: Path) -> Path: def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: + if config.zed_bin is None: + raise RuntimeError("zed_svo_grid_to_mp4 binary is not configured") + command = [ str(config.zed_bin), "--segment-dir", @@ -283,6 +312,80 @@ def env_for_job(config: BatchConfig) -> dict[str, str]: return env +def probe_output(output_path: Path, ffprobe_bin: Path | None) -> OutputProbeResult: + if not output_path.is_file(): + return OutputProbeResult(output_path=output_path, status="missing") + if ffprobe_bin is None: + raise RuntimeError("ffprobe binary is not configured") + + completed = subprocess.run( + [ + str(ffprobe_bin), + "-v", + "error", + "-print_format", + "json", + "-show_entries", + "format=duration,size:stream=codec_type,codec_name,width,height,nb_frames", + str(output_path), + ], + check=False, + capture_output=True, + text=True, + ) + if completed.returncode != 0: + reason = completed.stderr.strip() or completed.stdout.strip() or "ffprobe failed" + return OutputProbeResult(output_path=output_path, status="invalid", reason=reason) + + try: + payload = json.loads(completed.stdout) + except json.JSONDecodeError as error: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"ffprobe returned invalid JSON: {error}", + ) + + streams = payload.get("streams", []) + has_video_stream = any(stream.get("codec_type") == "video" for stream in streams) + if not has_video_stream: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason="ffprobe found no video stream", + ) + + format_payload = payload.get("format", {}) + duration_text = format_payload.get("duration") + if duration_text in (None, ""): + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason="ffprobe did not report a duration", + ) + + try: + duration_seconds = float(duration_text) + except (TypeError, ValueError): + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"ffprobe reported a non-numeric duration: {duration_text!r}", + ) + if not math.isfinite(duration_seconds) or duration_seconds <= 0.0: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"ffprobe reported a non-positive duration: {duration_seconds}", + ) + + return OutputProbeResult( + output_path=output_path, + status="valid", + duration_seconds=duration_seconds, + ) + + def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: command = command_for_job(job, config) completed = subprocess.run( @@ -318,6 +421,41 @@ def summarize_failures(results: list[JobResult]) -> None: click.echo(result.stdout.rstrip(), err=True) +def report_invalid_existing_outputs( + invalid_existing: list[tuple[ConversionJob, OutputProbeResult]], +) -> None: + if not invalid_existing: + return + + click.echo("\nInvalid existing outputs:", err=True) + for job, probe in invalid_existing: + click.echo(f"- {job.segment_dir}", err=True) + click.echo(f" output: {probe.output_path}", err=True) + reason_lines = probe.reason.splitlines() or [probe.reason] + click.echo(f" reason: {reason_lines[0]}", err=True) + for line in reason_lines[1:]: + click.echo(f" {line}", err=True) + + +def report_dry_run_plan( + pending_jobs: list[ConversionJob], + pending_reasons: dict[Path, str], + pending_details: dict[Path, str], +) -> None: + if not pending_jobs: + click.echo("dry-run: no conversions would be launched", err=True) + return + + click.echo("\nDry-run plan:", err=True) + for job in pending_jobs: + reason = pending_reasons[job.segment_dir] + detail = pending_details.get(job.segment_dir) + line = f"- {job.segment_dir} [{reason}]" + if detail: + line = f"{line}: {detail.replace(chr(10), ' | ')}" + click.echo(line, err=True) + + def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: results: list[JobResult] = [] aborted_count = 0 @@ -406,11 +544,32 @@ def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) - type=click.Path(path_type=Path, dir_okay=False), help="Explicit path to the zed_svo_grid_to_mp4 binary.", ) +@click.option( + "--ffprobe-bin", + type=click.Path(path_type=Path, dir_okay=False), + help="Explicit path to ffprobe. Required when probing existing outputs and ffprobe is not on PATH.", +) @click.option( "--cuda-visible-devices", help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", ) @click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.") +@click.option( + "--probe-existing/--trust-existing", + default=False, + show_default=True, + help="Validate existing grid MP4 files with ffprobe before skipping them. Invalid outputs are treated as missing.", +) +@click.option( + "--report-existing", + is_flag=True, + help="Probe existing grid MP4 files with ffprobe, report invalid ones, and do not launch conversions.", +) +@click.option( + "--dry-run", + is_flag=True, + help="Show which segments would be converted after applying skip/probe logic, without launching conversions.", +) @click.option( "--fail-fast/--continue-on-error", default=False, @@ -474,8 +633,12 @@ def main( recursive: bool, jobs: int, zed_bin: Path | None, + ffprobe_bin: Path | None, cuda_visible_devices: str | None, overwrite: bool, + probe_existing: bool, + report_existing: bool, + dry_run: bool, fail_fast: bool, codec: str, encoder_device: str, @@ -492,11 +655,16 @@ def main( """Batch-convert synced four-camera ZED segments into grid MP4 files.""" if b_frames > gop: raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") + if report_existing and dry_run: + raise click.ClickException("--report-existing and --dry-run are mutually exclusive") - binary_path = locate_binary(zed_bin) + ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None + binary_path = None if report_existing else locate_binary(zed_bin) sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) config = BatchConfig( zed_bin=binary_path, + ffprobe_bin=ffprobe_path, + probe_existing=probe_existing or report_existing, cuda_visible_devices=cuda_visible_devices, overwrite=overwrite, fail_fast=fail_fast, @@ -516,11 +684,16 @@ def main( skipped_results: list[JobResult] = [] failed_results: list[JobResult] = [] pending_jobs: list[ConversionJob] = [] + pending_reasons: dict[Path, str] = {} + pending_details: dict[Path, str] = {} + valid_existing: list[OutputProbeResult] = [] + invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = [] + missing_outputs: list[ConversionJob] = [] for segment_dir in sources.segment_dirs: output_path = output_path_for(segment_dir) job = ConversionJob(segment_dir=segment_dir, output_path=output_path) - command = tuple(command_for_job(job, config)) + command = tuple(command_for_job(job, config)) if config.zed_bin is not None else () scan = scan_segment_dir(segment_dir) if not scan.is_valid: failed_results.append( @@ -534,7 +707,47 @@ def main( ) ) continue - if output_path.exists() and not overwrite: + + if report_existing: + probe_result = probe_output(output_path, config.ffprobe_bin) + if probe_result.status == "valid": + valid_existing.append(probe_result) + elif probe_result.status == "invalid": + invalid_existing.append((job, probe_result)) + else: + missing_outputs.append(job) + continue + + if overwrite: + pending_jobs.append(job) + pending_reasons[segment_dir] = "overwrite" + continue + + if config.probe_existing: + probe_result = probe_output(output_path, config.ffprobe_bin) + if probe_result.status == "valid": + valid_existing.append(probe_result) + skipped_results.append( + JobResult( + status="skipped", + segment_dir=segment_dir, + output_path=output_path, + command=command, + ) + ) + continue + if probe_result.status == "invalid": + invalid_existing.append((job, probe_result)) + pending_jobs.append(job) + pending_reasons[segment_dir] = "invalid-existing-output" + pending_details[segment_dir] = probe_result.reason + continue + missing_outputs.append(job) + pending_jobs.append(job) + pending_reasons[segment_dir] = "missing-output" + continue + + if output_path.exists(): skipped_results.append( JobResult( status="skipped", @@ -544,17 +757,52 @@ def main( ) ) continue + pending_jobs.append(job) + pending_reasons[segment_dir] = "missing-output" + + if report_existing: + click.echo( + ( + f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} " + f"invalid={len(invalid_existing)} missing={len(missing_outputs)} " + f"invalid-segments={len(failed_results)}" + ), + err=True, + ) + if sources.ignored_partial_dirs: + click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) + report_invalid_existing_outputs(invalid_existing) + summarize_failures(failed_results) + if failed_results or invalid_existing: + raise SystemExit(1) + return click.echo( ( f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " - f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs}" + f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} " + f"dry_run={'yes' if dry_run else 'no'}" ), err=True, ) if sources.ignored_partial_dirs: click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) + if config.probe_existing: + click.echo( + ( + f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} " + f"missing={len(missing_outputs)}" + ), + err=True, + ) + + if dry_run: + report_dry_run_plan(pending_jobs, pending_reasons, pending_details) + summarize_failures(failed_results) + if failed_results: + raise SystemExit(1) + return results = list(skipped_results) results.extend(failed_results)