diff --git a/README.md b/README.md index 2993be3..eb541cc 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ The repo also includes an offline conversion tool for the left ZED color stream: ```bash CUDA_VISIBLE_DEVICES=GPU-9cc7b26e-90d4-0c49-4d4c-060e528ffba6 \ ./build/bin/zed_svo_to_mp4 \ - --input /workspaces/data/kindergarten/bar/2026-03-18T11-59-41/2026-03-18T11-59-41_zed1.svo2 \ + --input \ --encoder-device auto \ --preset balanced \ --quality 20 \ @@ -83,11 +83,40 @@ Python dependencies for the batch wrapper are managed with `uv`: uv sync ``` +Expected multi-camera dataset layout: + +```text +/ +├── svo2_segments_sorted.csv +├── bar/ +│ └── 2026-03-18T11-59-41/ +│ ├── 2026-03-18T11-59-41_zed1.svo2 +│ ├── 2026-03-18T11-59-41_zed2.svo2 +│ ├── 2026-03-18T11-59-41_zed3.svo2 +│ └── 2026-03-18T11-59-41_zed4.svo2 +└── jump/ + └── experiment/ + └── 1/ + └── 2026-03-18T11-26-23/ + ├── 2026-03-18T11-26-23_zed1.svo2 + ├── 2026-03-18T11-26-23_zed2.svo2 + ├── 2026-03-18T11-26-23_zed3.svo2 + └── 2026-03-18T11-26-23_zed4.svo2 +``` + +Placeholders used below: +- ``: dataset root containing multi-camera segment directories +- ``: one multi-camera segment directory containing `*_zedN.svo` or `*_zedN.svo2` +- ``, ``: explicit segment directories +- ``: CSV file with a `segment_dir` column, for example `config/svo2_segments_sorted.sample.csv` +- ``: one single-camera `.svo` or `.svo2` file +- ``: TOML file such as `config/zed_pose_config.toml` + Use the wrapper to recurse through a folder, run `zed_svo_to_mp4` on every matched `.svo2`, and show one aggregate tqdm progress bar: ```bash uv run python scripts/zed_batch_svo_to_mp4.py \ - /workspaces/data/kindergarten/bar \ + /bar \ --pattern '*.svo2' \ --recursive \ --jobs 2 \ @@ -105,7 +134,7 @@ Use the grid converter to merge four synced ZED recordings into a 2x2 CCTV-style ```bash ./build/bin/zed_svo_grid_to_mp4 \ - --segment-dir /workspaces/data/kindergarten/bar/2026-03-18T11-59-41 \ + --segment-dir \ --encoder-device auto \ --codec h265 \ --duration-seconds 2 @@ -117,7 +146,7 @@ Use the batch wrapper to run `zed_svo_grid_to_mp4` over many segment directories ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - /workspaces/data/kindergarten \ + \ --recursive \ --jobs 2 \ --encoder-device auto \ @@ -128,8 +157,8 @@ You can also provide the exact segments to convert: ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - --segment-dir /workspaces/data/kindergarten/jump/external/recording/2026-03-18T11-23-22 \ - --segment-dir /workspaces/data/kindergarten/jump/experiment/1/2026-03-18T11-26-23 \ + --segment-dir \ + --segment-dir \ --jobs 2 ``` @@ -137,7 +166,7 @@ Or preserve a precomputed CSV ordering: ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - --segments-csv /workspaces/data/kindergarten/svo2_segments_sorted.csv \ + --segments-csv \ --jobs 2 \ --duration-seconds 2 ``` @@ -148,7 +177,7 @@ When you suspect a previous run left behind partial MP4 files, opt into `ffprobe ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - /workspaces/data/kindergarten \ + \ --probe-existing \ --jobs 2 ``` @@ -157,7 +186,7 @@ Use `--report-existing` to audit existing outputs without launching conversions. ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - /workspaces/data/kindergarten \ + \ --report-existing ``` @@ -165,7 +194,7 @@ Use `--dry-run` to preview what the batch wrapper would convert after applying s ```bash uv run python scripts/zed_batch_svo_grid_to_mp4.py \ - /workspaces/data/kindergarten \ + \ --probe-existing \ --dry-run ``` @@ -174,7 +203,7 @@ uv run python scripts/zed_batch_svo_grid_to_mp4.py \ The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory. -Repeated rows for the same `segment_dir` are allowed; the wrapper converts each unique segment once, preserving the first-seen CSV order. This makes `/workspaces/data/kindergarten/svo2_segments_sorted.csv` a valid input even though it stores one row per camera file: +Repeated rows for the same `segment_dir` are allowed; the wrapper converts each unique segment once, preserving the first-seen CSV order. The repo includes a small example at `config/svo2_segments_sorted.sample.csv`: ```csv timestamp,activity,group_path,segment_dir,camera,relative_path @@ -182,6 +211,67 @@ timestamp,activity,group_path,segment_dir,camera,relative_path 2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed2,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2 ``` +### Batch ZED Segments To MCAP + +Use the wrapper to recurse through a dataset root, run `zed_svo_to_mcap --segment-dir` on every matched multi-camera segment, and show one aggregate tqdm progress bar: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + \ + --recursive \ + --jobs 2 \ + --cuda-visible-devices GPU-9cc7b26e-90d4-0c49-4d4c-060e528ffba6 \ + --end-frame 29 +``` + +You can also preserve the precomputed kindergarten CSV ordering: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + --segments-csv \ + --jobs 2 \ + --end-frame 29 +``` + +Enable per-camera pose export when the segment has valid tracking: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + --segment-dir \ + --with-pose \ + --pose-config +``` + +The batch MCAP wrapper writes `/.mcap` by default, skips existing outputs unless told otherwise, and returns a nonzero exit code if any segment fails. +The repo includes a minimal pose config at `config/zed_pose_config.toml` so MCAP conversion does not depend on a separate `cv-mmap` checkout. +In bundled multi-camera mode, `--end-frame` means the last emitted synced frame-group index from the common start timestamp. + +Use `--probe-existing` to validate existing MCAPs before skipping them. Invalid outputs are treated as missing and requeued: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + \ + --probe-existing \ + --jobs 2 +``` + +Use `--report-existing` to audit existing MCAPs without launching conversions: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + \ + --report-existing +``` + +Use `--dry-run` to preview what would be converted after applying skip or probe logic: + +```bash +uv run python scripts/zed_batch_svo_to_mcap.py \ + --segments-csv \ + --probe-existing \ + --dry-run +``` + ### Mandatory Acceptance (Standalone) Run the full mandatory acceptance suite. This executes the complete protocol/codec matrix without requiring external servers. diff --git a/config/svo2_segments_sorted.sample.csv b/config/svo2_segments_sorted.sample.csv new file mode 100644 index 0000000..66ab3d0 --- /dev/null +++ b/config/svo2_segments_sorted.sample.csv @@ -0,0 +1,7 @@ +timestamp,activity,group_path,segment_dir,camera,relative_path +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed1,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed1.svo2 +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed2,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2 +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed3,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed3.svo2 +2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed4,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed4.svo2 +2026-03-18T11-26-23,jump,jump/experiment/1,jump/experiment/1/2026-03-18T11-26-23,zed1,jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed1.svo2 +2026-03-18T11-26-23,jump,jump/experiment/1,jump/experiment/1/2026-03-18T11-26-23,zed2,jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed2.svo2 diff --git a/config/zed_pose_config.toml b/config/zed_pose_config.toml new file mode 100644 index 0000000..00e3ec1 --- /dev/null +++ b/config/zed_pose_config.toml @@ -0,0 +1,18 @@ +# Minimal pose-tracking config for zed_svo_to_mcap. +# The converter currently reads only: +# - zed.coordinate_system +# - zed.body_tracking.reference_frame +# - zed.body_tracking.set_floor_as_origin + +[zed] +# Native ZED 3D/body coordinate system used when reading positional tracking. +# Supported values in this repo are IMAGE and RIGHT_HANDED_Y_UP. +coordinate_system = "IMAGE" + +[zed.body_tracking] +# Reference frame used for per-camera pose estimation. +# Supported values are CAMERA and WORLD. +reference_frame = "CAMERA" + +# When true, WORLD origin is placed on the floor during positional tracking. +set_floor_as_origin = false diff --git a/scripts/zed_batch_svo_to_mcap.py b/scripts/zed_batch_svo_to_mcap.py new file mode 100644 index 0000000..b354abc --- /dev/null +++ b/scripts/zed_batch_svo_to_mcap.py @@ -0,0 +1,789 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import concurrent.futures +import csv +import importlib +import os +import re +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path + +import click +from tqdm import tqdm + + +SCRIPT_PATH = Path(__file__).resolve() +REPO_ROOT = SCRIPT_PATH.parents[1] +WORKSPACE_ROOT = REPO_ROOT.parent +MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap" +SEGMENT_FILE_PATTERN = re.compile(r".*_zed([0-9]+)\.svo2?$", re.IGNORECASE) + + +@dataclass(slots=True, frozen=True) +class BatchConfig: + zed_bin: Path | None + probe_existing: bool + cuda_visible_devices: str | None + overwrite: bool + fail_fast: bool + codec: str + encoder_device: str + mcap_compression: str + depth_mode: str + with_pose: bool + pose_config: Path | None + world_frame_id: str | None + end_frame: int | None + sync_tolerance_ms: float | None + + +@dataclass(slots=True, frozen=True) +class ConversionJob: + segment_dir: Path + output_path: Path + camera_labels: tuple[str, ...] + + +@dataclass(slots=True, frozen=True) +class JobResult: + status: str + segment_dir: Path + output_path: Path + command: tuple[str, ...] + return_code: int = 0 + stdout: str = "" + stderr: str = "" + + +@dataclass(slots=True, frozen=True) +class SegmentScan: + segment_dir: Path + matched_files: int + camera_labels: tuple[str, ...] + is_valid: bool + reason: str | None = None + + +@dataclass(slots=True, frozen=True) +class SourceResolution: + mode: str + segment_dirs: tuple[Path, ...] + ignored_partial_dirs: tuple[SegmentScan, ...] + + +@dataclass(slots=True, frozen=True) +class OutputProbeResult: + output_path: Path + status: str + reason: str = "" + + +_MCAP_READER_MODULE = None + + +def locate_binary(override: Path | None) -> Path: + if override is not None: + candidate = override.expanduser().resolve() + if not candidate.is_file(): + raise click.ClickException(f"binary not found: {candidate}") + return candidate + + candidates = ( + REPO_ROOT / "build" / "bin" / "zed_svo_to_mcap", + REPO_ROOT / "build" / "zed_svo_to_mcap", + ) + for candidate in candidates: + if candidate.is_file(): + return candidate + raise click.ClickException(f"could not find zed_svo_to_mcap under {REPO_ROOT / 'build'}") + + +def sorted_camera_labels(labels: set[str]) -> tuple[str, ...]: + return tuple(sorted(labels, key=lambda label: int(label[3:]))) + + +def scan_segment_dir(segment_dir: Path) -> SegmentScan: + if not segment_dir.is_dir(): + return SegmentScan( + segment_dir=segment_dir, + matched_files=0, + camera_labels=(), + is_valid=False, + reason=f"segment directory does not exist: {segment_dir}", + ) + + matched_by_camera: dict[str, list[Path]] = {} + for child in segment_dir.iterdir(): + if not child.is_file(): + continue + match = SEGMENT_FILE_PATTERN.fullmatch(child.name) + if match is None: + continue + label = f"zed{int(match.group(1))}" + matched_by_camera.setdefault(label, []).append(child) + + matched_files = sum(len(paths) for paths in matched_by_camera.values()) + camera_labels = sorted_camera_labels(set(matched_by_camera)) + duplicate_cameras = [label for label, paths in sorted(matched_by_camera.items()) if len(paths) > 1] + + if duplicate_cameras: + duplicate_text = ", ".join(duplicate_cameras) + return SegmentScan( + segment_dir=segment_dir, + matched_files=matched_files, + camera_labels=camera_labels, + is_valid=False, + reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}", + ) + if len(camera_labels) < 2: + return SegmentScan( + segment_dir=segment_dir, + matched_files=matched_files, + camera_labels=camera_labels, + is_valid=False, + reason=f"expected at least 2 camera inputs under {segment_dir}, found {len(camera_labels)}", + ) + + return SegmentScan( + segment_dir=segment_dir, + matched_files=matched_files, + camera_labels=camera_labels, + is_valid=True, + ) + + +def dedupe_paths(paths: list[Path]) -> list[Path]: + ordered: list[Path] = [] + seen: set[Path] = set() + for path in paths: + resolved = path.expanduser().resolve() + if resolved in seen: + continue + seen.add(resolved) + ordered.append(resolved) + return ordered + + +def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution: + if not root.is_dir(): + raise click.ClickException(f"input directory does not exist: {root}") + + candidate_dirs = {root.resolve()} + iterator = root.rglob("*") if recursive else root.iterdir() + for path in iterator: + if path.is_dir(): + candidate_dirs.add(path.resolve()) + + valid_dirs: list[Path] = [] + ignored_partial_dirs: list[SegmentScan] = [] + for segment_dir in sorted(candidate_dirs): + scan = scan_segment_dir(segment_dir) + if scan.is_valid: + valid_dirs.append(segment_dir) + elif scan.matched_files > 0: + ignored_partial_dirs.append(scan) + + if not valid_dirs: + raise click.ClickException(f"no multi-camera segments found under {root}") + + return SourceResolution( + mode="discovery", + segment_dirs=tuple(valid_dirs), + ignored_partial_dirs=tuple(ignored_partial_dirs), + ) + + +def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]: + csv_path = csv_path.expanduser().resolve() + if not csv_path.is_file(): + raise click.ClickException(f"CSV not found: {csv_path}") + + if csv_root is not None: + base_dir = csv_root.expanduser().resolve() + if not base_dir.is_dir(): + raise click.ClickException(f"CSV root is not a directory: {base_dir}") + else: + base_dir = csv_path.parent + + segment_dirs: list[Path] = [] + seen: set[Path] = set() + with csv_path.open(newline="") as stream: + reader = csv.DictReader(stream) + if reader.fieldnames is None or "segment_dir" not in reader.fieldnames: + raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header") + + for row_number, row in enumerate(reader, start=2): + raw_segment_dir = (row.get("segment_dir") or "").strip() + if not raw_segment_dir: + raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value") + segment_dir = Path(raw_segment_dir) + resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir + resolved = resolved.expanduser().resolve() + if resolved in seen: + continue + seen.add(resolved) + segment_dirs.append(resolved) + + if not segment_dirs: + raise click.ClickException(f"{csv_path} did not contain any segment_dir rows") + return tuple(segment_dirs) + + +def resolve_sources( + input_dir: Path | None, + segment_dirs: tuple[Path, ...], + segments_csv: Path | None, + csv_root: Path | None, + recursive: bool, +) -> SourceResolution: + source_count = sum( + ( + 1 if input_dir is not None else 0, + 1 if segment_dirs else 0, + 1 if segments_csv is not None else 0, + ) + ) + if source_count != 1: + raise click.ClickException( + "provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv" + ) + + if input_dir is not None: + return discover_segment_dirs(input_dir.expanduser().resolve(), recursive) + + if segment_dirs: + ordered_dirs = dedupe_paths(list(segment_dirs)) + return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=()) + + return SourceResolution( + mode="segments-csv", + segment_dirs=parse_segments_csv(segments_csv, csv_root), + ignored_partial_dirs=(), + ) + + +def output_path_for(segment_dir: Path) -> Path: + return segment_dir / f"{segment_dir.name}.mcap" + + +def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]: + if config.zed_bin is None: + raise RuntimeError("zed_svo_to_mcap binary is not configured") + + command = [ + str(config.zed_bin), + "--segment-dir", + str(job.segment_dir), + "--codec", + config.codec, + "--encoder-device", + config.encoder_device, + "--mcap-compression", + config.mcap_compression, + "--depth-mode", + config.depth_mode, + ] + if config.with_pose: + command.append("--with-pose") + if config.pose_config is not None: + command.extend(["--pose-config", str(config.pose_config)]) + if config.world_frame_id is not None: + command.extend(["--world-frame-id", config.world_frame_id]) + if config.end_frame is not None: + command.extend(["--end-frame", str(config.end_frame)]) + if config.sync_tolerance_ms is not None: + command.extend(["--sync-tolerance-ms", str(config.sync_tolerance_ms)]) + return command + + +def env_for_job(config: BatchConfig) -> dict[str, str]: + env = dict(os.environ) + if config.cuda_visible_devices is not None: + env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices + return env + + +def load_mcap_reader(): + global _MCAP_READER_MODULE + if _MCAP_READER_MODULE is not None: + return _MCAP_READER_MODULE + + if str(MCAP_PYTHON_ROOT) not in sys.path: + sys.path.insert(0, str(MCAP_PYTHON_ROOT)) + try: + _MCAP_READER_MODULE = importlib.import_module("mcap.reader") + except ModuleNotFoundError as error: + raise click.ClickException( + f"could not import mcap.reader from {MCAP_PYTHON_ROOT}" + ) from error + return _MCAP_READER_MODULE + + +def required_topics_for(camera_labels: tuple[str, ...]) -> set[str]: + topics: set[str] = set() + for label in camera_labels: + topics.add(f"/{label}/video") + topics.add(f"/{label}/depth") + topics.add(f"/{label}/calibration") + return topics + + +def probe_output(output_path: Path, camera_labels: tuple[str, ...]) -> OutputProbeResult: + if not output_path.is_file(): + return OutputProbeResult(output_path=output_path, status="missing") + + reader_module = load_mcap_reader() + expected_topics = required_topics_for(camera_labels) + found_topics: set[str] = set() + + try: + with output_path.open("rb") as stream: + reader = reader_module.make_reader(stream) + for _schema, channel, _message in reader.iter_messages(): + if channel.topic in expected_topics: + found_topics.add(channel.topic) + if found_topics == expected_topics: + return OutputProbeResult(output_path=output_path, status="valid") + except Exception as error: # noqa: BLE001 + return OutputProbeResult(output_path=output_path, status="invalid", reason=str(error)) + + missing_topics = sorted(expected_topics - found_topics) + if missing_topics: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason="missing expected topics: " + ", ".join(missing_topics), + ) + return OutputProbeResult(output_path=output_path, status="valid") + + +def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult: + command = command_for_job(job, config) + completed = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + env=env_for_job(config), + ) + status = "converted" if completed.returncode == 0 else "failed" + return JobResult( + status=status, + segment_dir=job.segment_dir, + output_path=job.output_path, + command=tuple(command), + return_code=completed.returncode, + stdout=completed.stdout, + stderr=completed.stderr, + ) + + +def summarize_failures(results: list[JobResult]) -> None: + failed_results = [result for result in results if result.status == "failed"] + if not failed_results: + return + + click.echo("\nFailed conversions:", err=True) + for result in failed_results: + click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True) + if result.stderr.strip(): + click.echo(result.stderr.rstrip(), err=True) + elif result.stdout.strip(): + click.echo(result.stdout.rstrip(), err=True) + + +def report_invalid_existing_outputs( + invalid_existing: list[tuple[ConversionJob, OutputProbeResult]], +) -> None: + if not invalid_existing: + return + + click.echo("\nInvalid existing outputs:", err=True) + for job, probe in invalid_existing: + click.echo(f"- {job.segment_dir}", err=True) + click.echo(f" output: {probe.output_path}", err=True) + reason_lines = probe.reason.splitlines() or [probe.reason] + click.echo(f" reason: {reason_lines[0]}", err=True) + for line in reason_lines[1:]: + click.echo(f" {line}", err=True) + + +def report_dry_run_plan( + pending_jobs: list[ConversionJob], + pending_reasons: dict[Path, str], + pending_details: dict[Path, str], +) -> None: + if not pending_jobs: + click.echo("dry-run: no conversions would be launched", err=True) + return + + click.echo("\nDry-run plan:", err=True) + for job in pending_jobs: + reason = pending_reasons[job.segment_dir] + detail = pending_details.get(job.segment_dir) + line = f"- {job.segment_dir} [{reason}]" + if detail: + line = f"{line}: {detail.replace(chr(10), ' | ')}" + click.echo(line, err=True) + + +def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]: + results: list[JobResult] = [] + aborted_count = 0 + if not jobs: + return results, aborted_count + + future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {} + job_iter = iter(jobs) + stop_submitting = False + + with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor: + with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress: + + def submit_next() -> bool: + if stop_submitting: + return False + try: + job = next(job_iter) + except StopIteration: + return False + future = executor.submit(run_conversion, job, config) + future_to_job[future] = job + return True + + for _ in range(min(jobs_limit, len(jobs))): + submit_next() + + while future_to_job: + done, _ = concurrent.futures.wait( + future_to_job, + return_when=concurrent.futures.FIRST_COMPLETED, + ) + for future in done: + job = future_to_job.pop(future) + result = future.result() + results.append(result) + progress.update(1) + + if result.status == "failed": + tqdm.write( + f"failed: {job.segment_dir} (exit {result.return_code})", + file=sys.stderr, + ) + if config.fail_fast: + stop_submitting = True + + if not stop_submitting: + submit_next() + + if stop_submitting: + remaining = sum(1 for _ in job_iter) + aborted_count = remaining + progress.total = progress.n + len(future_to_job) + progress.refresh() + + return results, aborted_count + + +@click.command() +@click.argument( + "input_dir", + required=False, + type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), +) +@click.option( + "--segment-dir", + "segment_dirs", + multiple=True, + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.", +) +@click.option( + "--segments-csv", + type=click.Path(path_type=Path, dir_okay=False), + help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.", +) +@click.option( + "--csv-root", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.", +) +@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.") +@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.") +@click.option( + "--zed-bin", + type=click.Path(path_type=Path, dir_okay=False), + help="Explicit path to the zed_svo_to_mcap binary.", +) +@click.option( + "--cuda-visible-devices", + help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.", +) +@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing MCAP files.") +@click.option( + "--probe-existing/--trust-existing", + default=False, + show_default=True, + help="Validate existing MCAP files before skipping them. Invalid outputs are treated as missing.", +) +@click.option( + "--report-existing", + is_flag=True, + help="Probe existing MCAP files, report invalid ones, and do not launch conversions.", +) +@click.option( + "--dry-run", + is_flag=True, + help="Show which segments would be converted after applying skip/probe logic, without launching conversions.", +) +@click.option( + "--fail-fast/--continue-on-error", + default=False, + show_default=True, + help="Stop submitting new work after the first failed conversion.", +) +@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True) +@click.option( + "--encoder-device", + type=click.Choice(("auto", "nvidia", "software")), + default="auto", + show_default=True, +) +@click.option( + "--mcap-compression", + type=click.Choice(("none", "lz4", "zstd")), + default="zstd", + show_default=True, +) +@click.option( + "--depth-mode", + type=click.Choice(("neural", "quality", "performance", "ultra")), + default="quality", + show_default=True, +) +@click.option("--with-pose", is_flag=True, help="Enable per-camera positional tracking export when available.") +@click.option( + "--pose-config", + type=click.Path(path_type=Path, dir_okay=False), + help="TOML config passed to zed_svo_to_mcap for pose tracking settings.", +) +@click.option( + "--world-frame-id", + default=None, + help="Optional pose reference frame id passed through to zed_svo_to_mcap.", +) +@click.option( + "--end-frame", + type=click.IntRange(min=0), + default=None, + help="Last synced frame group to export (inclusive) in bundled multi-camera mode.", +) +@click.option( + "--sync-tolerance-ms", + type=click.FloatRange(min=0.0, min_open=True), + default=None, + help="Override the maximum timestamp delta used for bundled multi-camera sync.", +) +def main( + input_dir: Path | None, + segment_dirs: tuple[Path, ...], + segments_csv: Path | None, + csv_root: Path | None, + recursive: bool, + jobs: int, + zed_bin: Path | None, + cuda_visible_devices: str | None, + overwrite: bool, + probe_existing: bool, + report_existing: bool, + dry_run: bool, + fail_fast: bool, + codec: str, + encoder_device: str, + mcap_compression: str, + depth_mode: str, + with_pose: bool, + pose_config: Path | None, + world_frame_id: str | None, + end_frame: int | None, + sync_tolerance_ms: float | None, +) -> None: + """Batch-convert multi-camera ZED segments into bundled MCAP files.""" + if report_existing and dry_run: + raise click.ClickException("--report-existing and --dry-run are mutually exclusive") + + binary_path = None if report_existing else locate_binary(zed_bin) + sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive) + config = BatchConfig( + zed_bin=binary_path, + probe_existing=probe_existing or report_existing, + cuda_visible_devices=cuda_visible_devices, + overwrite=overwrite, + fail_fast=fail_fast, + codec=codec, + encoder_device=encoder_device, + mcap_compression=mcap_compression, + depth_mode=depth_mode, + with_pose=with_pose, + pose_config=pose_config.expanduser().resolve() if pose_config is not None else None, + world_frame_id=world_frame_id, + end_frame=end_frame, + sync_tolerance_ms=sync_tolerance_ms, + ) + + skipped_results: list[JobResult] = [] + failed_results: list[JobResult] = [] + pending_jobs: list[ConversionJob] = [] + pending_reasons: dict[Path, str] = {} + pending_details: dict[Path, str] = {} + valid_existing: list[OutputProbeResult] = [] + invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = [] + missing_outputs: list[ConversionJob] = [] + + for segment_dir in sources.segment_dirs: + scan = scan_segment_dir(segment_dir) + output_path = output_path_for(segment_dir) + job = ConversionJob( + segment_dir=segment_dir, + output_path=output_path, + camera_labels=scan.camera_labels, + ) + command = tuple(command_for_job(job, config)) if config.zed_bin is not None else () + if not scan.is_valid: + failed_results.append( + JobResult( + status="failed", + segment_dir=segment_dir, + output_path=output_path, + command=command, + return_code=2, + stderr=scan.reason or "", + ) + ) + continue + + if report_existing: + probe_result = probe_output(output_path, job.camera_labels) + if probe_result.status == "valid": + valid_existing.append(probe_result) + elif probe_result.status == "invalid": + invalid_existing.append((job, probe_result)) + else: + missing_outputs.append(job) + continue + + if overwrite: + pending_jobs.append(job) + pending_reasons[segment_dir] = "overwrite" + continue + + if config.probe_existing: + probe_result = probe_output(output_path, job.camera_labels) + if probe_result.status == "valid": + valid_existing.append(probe_result) + skipped_results.append( + JobResult( + status="skipped", + segment_dir=segment_dir, + output_path=output_path, + command=command, + ) + ) + continue + if probe_result.status == "invalid": + invalid_existing.append((job, probe_result)) + pending_jobs.append(job) + pending_reasons[segment_dir] = "invalid-existing-output" + pending_details[segment_dir] = probe_result.reason + continue + missing_outputs.append(job) + pending_jobs.append(job) + pending_reasons[segment_dir] = "missing-output" + continue + + if output_path.exists(): + skipped_results.append( + JobResult( + status="skipped", + segment_dir=segment_dir, + output_path=output_path, + command=command, + ) + ) + continue + + missing_outputs.append(job) + pending_jobs.append(job) + pending_reasons[segment_dir] = "missing-output" + + if report_existing: + click.echo( + ( + f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} " + f"invalid={len(invalid_existing)} missing={len(missing_outputs)} " + f"invalid-segments={len(failed_results)}" + ), + err=True, + ) + if sources.ignored_partial_dirs: + click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) + report_invalid_existing_outputs(invalid_existing) + summarize_failures(failed_results) + if failed_results or invalid_existing: + raise SystemExit(1) + return + + click.echo( + ( + f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} " + f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} " + f"dry_run={'yes' if dry_run else 'no'}" + ), + err=True, + ) + if sources.ignored_partial_dirs: + click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True) + if config.probe_existing: + click.echo( + ( + f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} " + f"missing={len(missing_outputs)}" + ), + err=True, + ) + + if dry_run: + report_dry_run_plan(pending_jobs, pending_reasons, pending_details) + summarize_failures(failed_results) + if failed_results: + raise SystemExit(1) + return + + results = list(skipped_results) + results.extend(failed_results) + conversion_results, aborted_count = run_batch(pending_jobs, config, jobs) + results.extend(conversion_results) + + converted_count = sum(1 for result in results if result.status == "converted") + skipped_count = sum(1 for result in results if result.status == "skipped") + failed_count = sum(1 for result in results if result.status == "failed") + + click.echo( + ( + f"summary: matched={len(sources.segment_dirs)} converted={converted_count} " + f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}" + ), + err=True, + ) + summarize_failures(results) + + if failed_count > 0 or aborted_count > 0: + raise SystemExit(1) + + +if __name__ == "__main__": + main() diff --git a/src/tools/zed_svo_to_mcap.cpp b/src/tools/zed_svo_to_mcap.cpp index f59fe8d..49d23f2 100644 --- a/src/tools/zed_svo_to_mcap.cpp +++ b/src/tools/zed_svo_to_mcap.cpp @@ -1570,8 +1570,8 @@ int run_multi_source( const cvmmap_streamer::McapCompression compression, const sl::DEPTH_MODE depth_mode, const PoseTrackingOptions &pose_tracking) { - if (options.start_frame != 0 || options.has_end_frame) { - spdlog::error("multi-camera mode does not support --start-frame or --end-frame"); + if (options.start_frame != 0) { + spdlog::error("multi-camera mode does not support --start-frame"); return exit_code(ToolExitCode::UsageError); } bool interrupted{false}; @@ -1678,6 +1678,9 @@ int run_multi_source( return exit_code(ToolExitCode::RuntimeError); } emitted_groups += 1; + if (options.has_end_frame && emitted_groups > options.end_frame) { + break; + } auto advance = advance_after_emit(streams); if (!advance) {