Add ffprobe checks for grid batch outputs

This commit is contained in:
2026-03-19 10:58:53 +00:00
parent c2e5cc561f
commit 0cf0b2566d
2 changed files with 279 additions and 5 deletions
+26
View File
@@ -144,6 +144,32 @@ uv run python scripts/zed_batch_svo_grid_to_mp4.py \
The batch grid wrapper mirrors the grid encoder options, skips existing `<segment>/<segment>_grid.mp4` outputs by default, and returns a nonzero exit code if any segment fails. The batch grid wrapper mirrors the grid encoder options, skips existing `<segment>/<segment>_grid.mp4` outputs by default, and returns a nonzero exit code if any segment fails.
When you suspect a previous run left behind partial MP4 files, opt into `ffprobe` validation so broken existing outputs are treated as missing instead of skipped:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
--probe-existing \
--jobs 2
```
Use `--report-existing` to audit existing outputs without launching conversions. The report prints invalid existing files only, while the summary still includes valid and missing counts. This is useful for the partial-write failure mode currently seen as `moov atom not found` in some kindergarten grid MP4s:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
--report-existing
```
Use `--dry-run` to preview what the batch wrapper would convert after applying skip logic. Combine it with `--probe-existing` when you want to see which broken existing outputs would be requeued:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
--probe-existing \
--dry-run
```
#### Expected CSV Input Format #### Expected CSV Input Format
The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory. The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory.
+254 -6
View File
@@ -4,8 +4,11 @@ from __future__ import annotations
import concurrent.futures import concurrent.futures
import csv import csv
import json
import math
import os import os
import re import re
import shutil
import subprocess import subprocess
import sys import sys
from dataclasses import dataclass from dataclasses import dataclass
@@ -23,7 +26,9 @@ EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4")
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
class BatchConfig: class BatchConfig:
zed_bin: Path zed_bin: Path | None
ffprobe_bin: Path | None
probe_existing: bool
cuda_visible_devices: str | None cuda_visible_devices: str | None
overwrite: bool overwrite: bool
fail_fast: bool fail_fast: bool
@@ -72,6 +77,14 @@ class SourceResolution:
ignored_partial_dirs: tuple[SegmentScan, ...] ignored_partial_dirs: tuple[SegmentScan, ...]
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
status: str
reason: str = ""
duration_seconds: float | None = None
def locate_binary(override: Path | None) -> Path: def locate_binary(override: Path | None) -> Path:
if override is not None: if override is not None:
candidate = override.expanduser().resolve() candidate = override.expanduser().resolve()
@@ -89,6 +102,19 @@ def locate_binary(override: Path | None) -> Path:
raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}") raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}")
def locate_ffprobe(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"ffprobe binary not found: {candidate}")
return candidate
resolved = shutil.which("ffprobe")
if resolved is None:
raise click.ClickException("could not find ffprobe on PATH")
return Path(resolved).resolve()
def scan_segment_dir(segment_dir: Path) -> SegmentScan: def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir(): if not segment_dir.is_dir():
return SegmentScan( return SegmentScan(
@@ -246,6 +272,9 @@ def output_path_for(segment_dir: Path) -> Path:
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
if config.zed_bin is None:
raise RuntimeError("zed_svo_grid_to_mp4 binary is not configured")
command = [ command = [
str(config.zed_bin), str(config.zed_bin),
"--segment-dir", "--segment-dir",
@@ -283,6 +312,80 @@ def env_for_job(config: BatchConfig) -> dict[str, str]:
return env return env
def probe_output(output_path: Path, ffprobe_bin: Path | None) -> OutputProbeResult:
if not output_path.is_file():
return OutputProbeResult(output_path=output_path, status="missing")
if ffprobe_bin is None:
raise RuntimeError("ffprobe binary is not configured")
completed = subprocess.run(
[
str(ffprobe_bin),
"-v",
"error",
"-print_format",
"json",
"-show_entries",
"format=duration,size:stream=codec_type,codec_name,width,height,nb_frames",
str(output_path),
],
check=False,
capture_output=True,
text=True,
)
if completed.returncode != 0:
reason = completed.stderr.strip() or completed.stdout.strip() or "ffprobe failed"
return OutputProbeResult(output_path=output_path, status="invalid", reason=reason)
try:
payload = json.loads(completed.stdout)
except json.JSONDecodeError as error:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe returned invalid JSON: {error}",
)
streams = payload.get("streams", [])
has_video_stream = any(stream.get("codec_type") == "video" for stream in streams)
if not has_video_stream:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe found no video stream",
)
format_payload = payload.get("format", {})
duration_text = format_payload.get("duration")
if duration_text in (None, ""):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe did not report a duration",
)
try:
duration_seconds = float(duration_text)
except (TypeError, ValueError):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-numeric duration: {duration_text!r}",
)
if not math.isfinite(duration_seconds) or duration_seconds <= 0.0:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-positive duration: {duration_seconds}",
)
return OutputProbeResult(
output_path=output_path,
status="valid",
duration_seconds=duration_seconds,
)
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config) command = command_for_job(job, config)
completed = subprocess.run( completed = subprocess.run(
@@ -318,6 +421,41 @@ def summarize_failures(results: list[JobResult]) -> None:
click.echo(result.stdout.rstrip(), err=True) click.echo(result.stdout.rstrip(), err=True)
def report_invalid_existing_outputs(
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]],
) -> None:
if not invalid_existing:
return
click.echo("\nInvalid existing outputs:", err=True)
for job, probe in invalid_existing:
click.echo(f"- {job.segment_dir}", err=True)
click.echo(f" output: {probe.output_path}", err=True)
reason_lines = probe.reason.splitlines() or [probe.reason]
click.echo(f" reason: {reason_lines[0]}", err=True)
for line in reason_lines[1:]:
click.echo(f" {line}", err=True)
def report_dry_run_plan(
pending_jobs: list[ConversionJob],
pending_reasons: dict[Path, str],
pending_details: dict[Path, str],
) -> None:
if not pending_jobs:
click.echo("dry-run: no conversions would be launched", err=True)
return
click.echo("\nDry-run plan:", err=True)
for job in pending_jobs:
reason = pending_reasons[job.segment_dir]
detail = pending_details.get(job.segment_dir)
line = f"- {job.segment_dir} [{reason}]"
if detail:
line = f"{line}: {detail.replace(chr(10), ' | ')}"
click.echo(line, err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = [] results: list[JobResult] = []
aborted_count = 0 aborted_count = 0
@@ -406,11 +544,32 @@ def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -
type=click.Path(path_type=Path, dir_okay=False), type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_grid_to_mp4 binary.", help="Explicit path to the zed_svo_grid_to_mp4 binary.",
) )
@click.option(
"--ffprobe-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to ffprobe. Required when probing existing outputs and ffprobe is not on PATH.",
)
@click.option( @click.option(
"--cuda-visible-devices", "--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
) )
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.") @click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.")
@click.option(
"--probe-existing/--trust-existing",
default=False,
show_default=True,
help="Validate existing grid MP4 files with ffprobe before skipping them. Invalid outputs are treated as missing.",
)
@click.option(
"--report-existing",
is_flag=True,
help="Probe existing grid MP4 files with ffprobe, report invalid ones, and do not launch conversions.",
)
@click.option(
"--dry-run",
is_flag=True,
help="Show which segments would be converted after applying skip/probe logic, without launching conversions.",
)
@click.option( @click.option(
"--fail-fast/--continue-on-error", "--fail-fast/--continue-on-error",
default=False, default=False,
@@ -474,8 +633,12 @@ def main(
recursive: bool, recursive: bool,
jobs: int, jobs: int,
zed_bin: Path | None, zed_bin: Path | None,
ffprobe_bin: Path | None,
cuda_visible_devices: str | None, cuda_visible_devices: str | None,
overwrite: bool, overwrite: bool,
probe_existing: bool,
report_existing: bool,
dry_run: bool,
fail_fast: bool, fail_fast: bool,
codec: str, codec: str,
encoder_device: str, encoder_device: str,
@@ -492,11 +655,16 @@ def main(
"""Batch-convert synced four-camera ZED segments into grid MP4 files.""" """Batch-convert synced four-camera ZED segments into grid MP4 files."""
if b_frames > gop: if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames") raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
binary_path = locate_binary(zed_bin) ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None
binary_path = None if report_existing else locate_binary(zed_bin)
sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive)
config = BatchConfig( config = BatchConfig(
zed_bin=binary_path, zed_bin=binary_path,
ffprobe_bin=ffprobe_path,
probe_existing=probe_existing or report_existing,
cuda_visible_devices=cuda_visible_devices, cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite, overwrite=overwrite,
fail_fast=fail_fast, fail_fast=fail_fast,
@@ -516,11 +684,16 @@ def main(
skipped_results: list[JobResult] = [] skipped_results: list[JobResult] = []
failed_results: list[JobResult] = [] failed_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = [] pending_jobs: list[ConversionJob] = []
pending_reasons: dict[Path, str] = {}
pending_details: dict[Path, str] = {}
valid_existing: list[OutputProbeResult] = []
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = []
missing_outputs: list[ConversionJob] = []
for segment_dir in sources.segment_dirs: for segment_dir in sources.segment_dirs:
output_path = output_path_for(segment_dir) output_path = output_path_for(segment_dir)
job = ConversionJob(segment_dir=segment_dir, output_path=output_path) job = ConversionJob(segment_dir=segment_dir, output_path=output_path)
command = tuple(command_for_job(job, config)) command = tuple(command_for_job(job, config)) if config.zed_bin is not None else ()
scan = scan_segment_dir(segment_dir) scan = scan_segment_dir(segment_dir)
if not scan.is_valid: if not scan.is_valid:
failed_results.append( failed_results.append(
@@ -534,7 +707,26 @@ def main(
) )
) )
continue continue
if output_path.exists() and not overwrite:
if report_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
elif probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
else:
missing_outputs.append(job)
continue
if overwrite:
pending_jobs.append(job)
pending_reasons[segment_dir] = "overwrite"
continue
if config.probe_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
skipped_results.append( skipped_results.append(
JobResult( JobResult(
status="skipped", status="skipped",
@@ -544,17 +736,73 @@ def main(
) )
) )
continue continue
if probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
pending_jobs.append(job) pending_jobs.append(job)
pending_reasons[segment_dir] = "invalid-existing-output"
pending_details[segment_dir] = probe_result.reason
continue
missing_outputs.append(job)
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
continue
if output_path.exists():
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
if report_existing:
click.echo( click.echo(
( (
f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} "
f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs}" f"invalid={len(invalid_existing)} missing={len(missing_outputs)} "
f"invalid-segments={len(failed_results)}"
), ),
err=True, err=True,
) )
if sources.ignored_partial_dirs: if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
report_invalid_existing_outputs(invalid_existing)
summarize_failures(failed_results)
if failed_results or invalid_existing:
raise SystemExit(1)
return
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} "
f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} "
f"dry_run={'yes' if dry_run else 'no'}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
if config.probe_existing:
click.echo(
(
f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} "
f"missing={len(missing_outputs)}"
),
err=True,
)
if dry_run:
report_dry_run_plan(pending_jobs, pending_reasons, pending_details)
summarize_failures(failed_results)
if failed_results:
raise SystemExit(1)
return
results = list(skipped_results) results = list(skipped_results)
results.extend(failed_results) results.extend(failed_results)