feat: add batch MCAP export tooling for ZED segments

Add a Python batch wrapper around zed_svo_to_mcap for multi-camera
segment exports. The new script supports dataset discovery, repeated
segment-dir inputs, CSV-driven ordering, skip/probe/report flows, dry-run,
and CUDA environment passthrough so kindergarten-style datasets can be
converted into one bundled MCAP per segment.

Extend zed_svo_to_mcap so bundled multi-camera mode accepts --end-frame
with synced-group semantics. In this mode the value is interpreted as the
last emitted synced frame-group index from the common synced start, while
--start-frame remains unsupported.

Vendor a minimal pose-config TOML and a sample segments CSV into this repo
so the MCAP workflow is self-contained. Update the README to document the
batch MCAP flow, use portable placeholders instead of machine-specific
absolute paths, and describe the expected dataset layout explicitly.
This commit is contained in:
2026-03-20 09:19:56 +00:00
parent 8d9bd1b815
commit 1691274e85
5 changed files with 920 additions and 13 deletions
+101 -11
View File
@@ -65,7 +65,7 @@ The repo also includes an offline conversion tool for the left ZED color stream:
```bash
CUDA_VISIBLE_DEVICES=GPU-9cc7b26e-90d4-0c49-4d4c-060e528ffba6 \
./build/bin/zed_svo_to_mp4 \
--input /workspaces/data/kindergarten/bar/2026-03-18T11-59-41/2026-03-18T11-59-41_zed1.svo2 \
--input <SVO_INPUT> \
--encoder-device auto \
--preset balanced \
--quality 20 \
@@ -83,11 +83,40 @@ Python dependencies for the batch wrapper are managed with `uv`:
uv sync
```
Expected multi-camera dataset layout:
```text
<DATASET_ROOT>/
├── svo2_segments_sorted.csv
├── bar/
│ └── 2026-03-18T11-59-41/
│ ├── 2026-03-18T11-59-41_zed1.svo2
│ ├── 2026-03-18T11-59-41_zed2.svo2
│ ├── 2026-03-18T11-59-41_zed3.svo2
│ └── 2026-03-18T11-59-41_zed4.svo2
└── jump/
└── experiment/
└── 1/
└── 2026-03-18T11-26-23/
├── 2026-03-18T11-26-23_zed1.svo2
├── 2026-03-18T11-26-23_zed2.svo2
├── 2026-03-18T11-26-23_zed3.svo2
└── 2026-03-18T11-26-23_zed4.svo2
```
Placeholders used below:
- `<DATASET_ROOT>`: dataset root containing multi-camera segment directories
- `<SEGMENT_DIR>`: one multi-camera segment directory containing `*_zedN.svo` or `*_zedN.svo2`
- `<SEGMENT_DIR_A>`, `<SEGMENT_DIR_B>`: explicit segment directories
- `<SEGMENTS_CSV>`: CSV file with a `segment_dir` column, for example `config/svo2_segments_sorted.sample.csv`
- `<SVO_INPUT>`: one single-camera `.svo` or `.svo2` file
- `<POSE_CONFIG>`: TOML file such as `config/zed_pose_config.toml`
Use the wrapper to recurse through a folder, run `zed_svo_to_mp4` on every matched `.svo2`, and show one aggregate tqdm progress bar:
```bash
uv run python scripts/zed_batch_svo_to_mp4.py \
/workspaces/data/kindergarten/bar \
<DATASET_ROOT>/bar \
--pattern '*.svo2' \
--recursive \
--jobs 2 \
@@ -105,7 +134,7 @@ Use the grid converter to merge four synced ZED recordings into a 2x2 CCTV-style
```bash
./build/bin/zed_svo_grid_to_mp4 \
--segment-dir /workspaces/data/kindergarten/bar/2026-03-18T11-59-41 \
--segment-dir <SEGMENT_DIR> \
--encoder-device auto \
--codec h265 \
--duration-seconds 2
@@ -117,7 +146,7 @@ Use the batch wrapper to run `zed_svo_grid_to_mp4` over many segment directories
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
<DATASET_ROOT> \
--recursive \
--jobs 2 \
--encoder-device auto \
@@ -128,8 +157,8 @@ You can also provide the exact segments to convert:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
--segment-dir /workspaces/data/kindergarten/jump/external/recording/2026-03-18T11-23-22 \
--segment-dir /workspaces/data/kindergarten/jump/experiment/1/2026-03-18T11-26-23 \
--segment-dir <SEGMENT_DIR_A> \
--segment-dir <SEGMENT_DIR_B> \
--jobs 2
```
@@ -137,7 +166,7 @@ Or preserve a precomputed CSV ordering:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
--segments-csv /workspaces/data/kindergarten/svo2_segments_sorted.csv \
--segments-csv <SEGMENTS_CSV> \
--jobs 2 \
--duration-seconds 2
```
@@ -148,7 +177,7 @@ When you suspect a previous run left behind partial MP4 files, opt into `ffprobe
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
<DATASET_ROOT> \
--probe-existing \
--jobs 2
```
@@ -157,7 +186,7 @@ Use `--report-existing` to audit existing outputs without launching conversions.
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
<DATASET_ROOT> \
--report-existing
```
@@ -165,7 +194,7 @@ Use `--dry-run` to preview what the batch wrapper would convert after applying s
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
/workspaces/data/kindergarten \
<DATASET_ROOT> \
--probe-existing \
--dry-run
```
@@ -174,7 +203,7 @@ uv run python scripts/zed_batch_svo_grid_to_mp4.py \
The `--segments-csv` input expects a header row with at least a `segment_dir` column. Extra columns are allowed and ignored by the batch wrapper. `segment_dir` values may be absolute paths or paths relative to the CSV file's parent directory. Use `--csv-root` to override that base directory.
Repeated rows for the same `segment_dir` are allowed; the wrapper converts each unique segment once, preserving the first-seen CSV order. This makes `/workspaces/data/kindergarten/svo2_segments_sorted.csv` a valid input even though it stores one row per camera file:
Repeated rows for the same `segment_dir` are allowed; the wrapper converts each unique segment once, preserving the first-seen CSV order. The repo includes a small example at `config/svo2_segments_sorted.sample.csv`:
```csv
timestamp,activity,group_path,segment_dir,camera,relative_path
@@ -182,6 +211,67 @@ timestamp,activity,group_path,segment_dir,camera,relative_path
2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed2,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2
```
### Batch ZED Segments To MCAP
Use the wrapper to recurse through a dataset root, run `zed_svo_to_mcap --segment-dir` on every matched multi-camera segment, and show one aggregate tqdm progress bar:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--recursive \
--jobs 2 \
--cuda-visible-devices GPU-9cc7b26e-90d4-0c49-4d4c-060e528ffba6 \
--end-frame 29
```
You can also preserve the precomputed kindergarten CSV ordering:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
--segments-csv <SEGMENTS_CSV> \
--jobs 2 \
--end-frame 29
```
Enable per-camera pose export when the segment has valid tracking:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
--segment-dir <SEGMENT_DIR> \
--with-pose \
--pose-config <POSE_CONFIG>
```
The batch MCAP wrapper writes `<segment>/<segment>.mcap` by default, skips existing outputs unless told otherwise, and returns a nonzero exit code if any segment fails.
The repo includes a minimal pose config at `config/zed_pose_config.toml` so MCAP conversion does not depend on a separate `cv-mmap` checkout.
In bundled multi-camera mode, `--end-frame` means the last emitted synced frame-group index from the common start timestamp.
Use `--probe-existing` to validate existing MCAPs before skipping them. Invalid outputs are treated as missing and requeued:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--probe-existing \
--jobs 2
```
Use `--report-existing` to audit existing MCAPs without launching conversions:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--report-existing
```
Use `--dry-run` to preview what would be converted after applying skip or probe logic:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
--segments-csv <SEGMENTS_CSV> \
--probe-existing \
--dry-run
```
### Mandatory Acceptance (Standalone)
Run the full mandatory acceptance suite. This executes the complete protocol/codec matrix without requiring external servers.
+7
View File
@@ -0,0 +1,7 @@
timestamp,activity,group_path,segment_dir,camera,relative_path
2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed1,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed1.svo2
2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed2,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2
2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed3,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed3.svo2
2026-03-18T11-23-22,jump,jump/external/recording,jump/external/recording/2026-03-18T11-23-22,zed4,jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed4.svo2
2026-03-18T11-26-23,jump,jump/experiment/1,jump/experiment/1/2026-03-18T11-26-23,zed1,jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed1.svo2
2026-03-18T11-26-23,jump,jump/experiment/1,jump/experiment/1/2026-03-18T11-26-23,zed2,jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed2.svo2
1 timestamp activity group_path segment_dir camera relative_path
2 2026-03-18T11-23-22 jump jump/external/recording jump/external/recording/2026-03-18T11-23-22 zed1 jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed1.svo2
3 2026-03-18T11-23-22 jump jump/external/recording jump/external/recording/2026-03-18T11-23-22 zed2 jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed2.svo2
4 2026-03-18T11-23-22 jump jump/external/recording jump/external/recording/2026-03-18T11-23-22 zed3 jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed3.svo2
5 2026-03-18T11-23-22 jump jump/external/recording jump/external/recording/2026-03-18T11-23-22 zed4 jump/external/recording/2026-03-18T11-23-22/2026-03-18T11-23-22_zed4.svo2
6 2026-03-18T11-26-23 jump jump/experiment/1 jump/experiment/1/2026-03-18T11-26-23 zed1 jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed1.svo2
7 2026-03-18T11-26-23 jump jump/experiment/1 jump/experiment/1/2026-03-18T11-26-23 zed2 jump/experiment/1/2026-03-18T11-26-23/2026-03-18T11-26-23_zed2.svo2
+18
View File
@@ -0,0 +1,18 @@
# Minimal pose-tracking config for zed_svo_to_mcap.
# The converter currently reads only:
# - zed.coordinate_system
# - zed.body_tracking.reference_frame
# - zed.body_tracking.set_floor_as_origin
[zed]
# Native ZED 3D/body coordinate system used when reading positional tracking.
# Supported values in this repo are IMAGE and RIGHT_HANDED_Y_UP.
coordinate_system = "IMAGE"
[zed.body_tracking]
# Reference frame used for per-camera pose estimation.
# Supported values are CAMERA and WORLD.
reference_frame = "CAMERA"
# When true, WORLD origin is placed on the floor during positional tracking.
set_floor_as_origin = false
+789
View File
@@ -0,0 +1,789 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import csv
import importlib
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import click
from tqdm import tqdm
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
WORKSPACE_ROOT = REPO_ROOT.parent
MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap"
SEGMENT_FILE_PATTERN = re.compile(r".*_zed([0-9]+)\.svo2?$", re.IGNORECASE)
@dataclass(slots=True, frozen=True)
class BatchConfig:
zed_bin: Path | None
probe_existing: bool
cuda_visible_devices: str | None
overwrite: bool
fail_fast: bool
codec: str
encoder_device: str
mcap_compression: str
depth_mode: str
with_pose: bool
pose_config: Path | None
world_frame_id: str | None
end_frame: int | None
sync_tolerance_ms: float | None
@dataclass(slots=True, frozen=True)
class ConversionJob:
segment_dir: Path
output_path: Path
camera_labels: tuple[str, ...]
@dataclass(slots=True, frozen=True)
class JobResult:
status: str
segment_dir: Path
output_path: Path
command: tuple[str, ...]
return_code: int = 0
stdout: str = ""
stderr: str = ""
@dataclass(slots=True, frozen=True)
class SegmentScan:
segment_dir: Path
matched_files: int
camera_labels: tuple[str, ...]
is_valid: bool
reason: str | None = None
@dataclass(slots=True, frozen=True)
class SourceResolution:
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[SegmentScan, ...]
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
status: str
reason: str = ""
_MCAP_READER_MODULE = None
def locate_binary(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / "zed_svo_to_mcap",
REPO_ROOT / "build" / "zed_svo_to_mcap",
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find zed_svo_to_mcap under {REPO_ROOT / 'build'}")
def sorted_camera_labels(labels: set[str]) -> tuple[str, ...]:
return tuple(sorted(labels, key=lambda label: int(label[3:])))
def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir():
return SegmentScan(
segment_dir=segment_dir,
matched_files=0,
camera_labels=(),
is_valid=False,
reason=f"segment directory does not exist: {segment_dir}",
)
matched_by_camera: dict[str, list[Path]] = {}
for child in segment_dir.iterdir():
if not child.is_file():
continue
match = SEGMENT_FILE_PATTERN.fullmatch(child.name)
if match is None:
continue
label = f"zed{int(match.group(1))}"
matched_by_camera.setdefault(label, []).append(child)
matched_files = sum(len(paths) for paths in matched_by_camera.values())
camera_labels = sorted_camera_labels(set(matched_by_camera))
duplicate_cameras = [label for label, paths in sorted(matched_by_camera.items()) if len(paths) > 1]
if duplicate_cameras:
duplicate_text = ", ".join(duplicate_cameras)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}",
)
if len(camera_labels) < 2:
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"expected at least 2 camera inputs under {segment_dir}, found {len(camera_labels)}",
)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=True,
)
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(f"no multi-camera segments found under {root}")
return SourceResolution(
mode="discovery",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def resolve_sources(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
) -> SourceResolution:
source_count = sum(
(
1 if input_dir is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv"
)
if input_dir is not None:
return discover_segment_dirs(input_dir.expanduser().resolve(), recursive)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def output_path_for(segment_dir: Path) -> Path:
return segment_dir / f"{segment_dir.name}.mcap"
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
if config.zed_bin is None:
raise RuntimeError("zed_svo_to_mcap binary is not configured")
command = [
str(config.zed_bin),
"--segment-dir",
str(job.segment_dir),
"--codec",
config.codec,
"--encoder-device",
config.encoder_device,
"--mcap-compression",
config.mcap_compression,
"--depth-mode",
config.depth_mode,
]
if config.with_pose:
command.append("--with-pose")
if config.pose_config is not None:
command.extend(["--pose-config", str(config.pose_config)])
if config.world_frame_id is not None:
command.extend(["--world-frame-id", config.world_frame_id])
if config.end_frame is not None:
command.extend(["--end-frame", str(config.end_frame)])
if config.sync_tolerance_ms is not None:
command.extend(["--sync-tolerance-ms", str(config.sync_tolerance_ms)])
return command
def env_for_job(config: BatchConfig) -> dict[str, str]:
env = dict(os.environ)
if config.cuda_visible_devices is not None:
env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices
return env
def load_mcap_reader():
global _MCAP_READER_MODULE
if _MCAP_READER_MODULE is not None:
return _MCAP_READER_MODULE
if str(MCAP_PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(MCAP_PYTHON_ROOT))
try:
_MCAP_READER_MODULE = importlib.import_module("mcap.reader")
except ModuleNotFoundError as error:
raise click.ClickException(
f"could not import mcap.reader from {MCAP_PYTHON_ROOT}"
) from error
return _MCAP_READER_MODULE
def required_topics_for(camera_labels: tuple[str, ...]) -> set[str]:
topics: set[str] = set()
for label in camera_labels:
topics.add(f"/{label}/video")
topics.add(f"/{label}/depth")
topics.add(f"/{label}/calibration")
return topics
def probe_output(output_path: Path, camera_labels: tuple[str, ...]) -> OutputProbeResult:
if not output_path.is_file():
return OutputProbeResult(output_path=output_path, status="missing")
reader_module = load_mcap_reader()
expected_topics = required_topics_for(camera_labels)
found_topics: set[str] = set()
try:
with output_path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for _schema, channel, _message in reader.iter_messages():
if channel.topic in expected_topics:
found_topics.add(channel.topic)
if found_topics == expected_topics:
return OutputProbeResult(output_path=output_path, status="valid")
except Exception as error: # noqa: BLE001
return OutputProbeResult(output_path=output_path, status="invalid", reason=str(error))
missing_topics = sorted(expected_topics - found_topics)
if missing_topics:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="missing expected topics: " + ", ".join(missing_topics),
)
return OutputProbeResult(output_path=output_path, status="valid")
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
env=env_for_job(config),
)
status = "converted" if completed.returncode == 0 else "failed"
return JobResult(
status=status,
segment_dir=job.segment_dir,
output_path=job.output_path,
command=tuple(command),
return_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def summarize_failures(results: list[JobResult]) -> None:
failed_results = [result for result in results if result.status == "failed"]
if not failed_results:
return
click.echo("\nFailed conversions:", err=True)
for result in failed_results:
click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True)
if result.stderr.strip():
click.echo(result.stderr.rstrip(), err=True)
elif result.stdout.strip():
click.echo(result.stdout.rstrip(), err=True)
def report_invalid_existing_outputs(
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]],
) -> None:
if not invalid_existing:
return
click.echo("\nInvalid existing outputs:", err=True)
for job, probe in invalid_existing:
click.echo(f"- {job.segment_dir}", err=True)
click.echo(f" output: {probe.output_path}", err=True)
reason_lines = probe.reason.splitlines() or [probe.reason]
click.echo(f" reason: {reason_lines[0]}", err=True)
for line in reason_lines[1:]:
click.echo(f" {line}", err=True)
def report_dry_run_plan(
pending_jobs: list[ConversionJob],
pending_reasons: dict[Path, str],
pending_details: dict[Path, str],
) -> None:
if not pending_jobs:
click.echo("dry-run: no conversions would be launched", err=True)
return
click.echo("\nDry-run plan:", err=True)
for job in pending_jobs:
reason = pending_reasons[job.segment_dir]
detail = pending_details.get(job.segment_dir)
line = f"- {job.segment_dir} [{reason}]"
if detail:
line = f"{line}: {detail.replace(chr(10), ' | ')}"
click.echo(line, err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = []
aborted_count = 0
if not jobs:
return results, aborted_count
future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {}
job_iter = iter(jobs)
stop_submitting = False
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor:
with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress:
def submit_next() -> bool:
if stop_submitting:
return False
try:
job = next(job_iter)
except StopIteration:
return False
future = executor.submit(run_conversion, job, config)
future_to_job[future] = job
return True
for _ in range(min(jobs_limit, len(jobs))):
submit_next()
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
job = future_to_job.pop(future)
result = future.result()
results.append(result)
progress.update(1)
if result.status == "failed":
tqdm.write(
f"failed: {job.segment_dir} (exit {result.return_code})",
file=sys.stderr,
)
if config.fail_fast:
stop_submitting = True
if not stop_submitting:
submit_next()
if stop_submitting:
remaining = sum(1 for _ in job_iter)
aborted_count = remaining
progress.total = progress.n + len(future_to_job)
progress.refresh()
return results, aborted_count
@click.command()
@click.argument(
"input_dir",
required=False,
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
)
@click.option(
"--segment-dir",
"segment_dirs",
multiple=True,
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.",
)
@click.option(
"--segments-csv",
type=click.Path(path_type=Path, dir_okay=False),
help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.",
)
@click.option(
"--csv-root",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.")
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_to_mcap binary.",
)
@click.option(
"--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
)
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing MCAP files.")
@click.option(
"--probe-existing/--trust-existing",
default=False,
show_default=True,
help="Validate existing MCAP files before skipping them. Invalid outputs are treated as missing.",
)
@click.option(
"--report-existing",
is_flag=True,
help="Probe existing MCAP files, report invalid ones, and do not launch conversions.",
)
@click.option(
"--dry-run",
is_flag=True,
help="Show which segments would be converted after applying skip/probe logic, without launching conversions.",
)
@click.option(
"--fail-fast/--continue-on-error",
default=False,
show_default=True,
help="Stop submitting new work after the first failed conversion.",
)
@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True)
@click.option(
"--encoder-device",
type=click.Choice(("auto", "nvidia", "software")),
default="auto",
show_default=True,
)
@click.option(
"--mcap-compression",
type=click.Choice(("none", "lz4", "zstd")),
default="zstd",
show_default=True,
)
@click.option(
"--depth-mode",
type=click.Choice(("neural", "quality", "performance", "ultra")),
default="quality",
show_default=True,
)
@click.option("--with-pose", is_flag=True, help="Enable per-camera positional tracking export when available.")
@click.option(
"--pose-config",
type=click.Path(path_type=Path, dir_okay=False),
help="TOML config passed to zed_svo_to_mcap for pose tracking settings.",
)
@click.option(
"--world-frame-id",
default=None,
help="Optional pose reference frame id passed through to zed_svo_to_mcap.",
)
@click.option(
"--end-frame",
type=click.IntRange(min=0),
default=None,
help="Last synced frame group to export (inclusive) in bundled multi-camera mode.",
)
@click.option(
"--sync-tolerance-ms",
type=click.FloatRange(min=0.0, min_open=True),
default=None,
help="Override the maximum timestamp delta used for bundled multi-camera sync.",
)
def main(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
jobs: int,
zed_bin: Path | None,
cuda_visible_devices: str | None,
overwrite: bool,
probe_existing: bool,
report_existing: bool,
dry_run: bool,
fail_fast: bool,
codec: str,
encoder_device: str,
mcap_compression: str,
depth_mode: str,
with_pose: bool,
pose_config: Path | None,
world_frame_id: str | None,
end_frame: int | None,
sync_tolerance_ms: float | None,
) -> None:
"""Batch-convert multi-camera ZED segments into bundled MCAP files."""
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
binary_path = None if report_existing else locate_binary(zed_bin)
sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive)
config = BatchConfig(
zed_bin=binary_path,
probe_existing=probe_existing or report_existing,
cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite,
fail_fast=fail_fast,
codec=codec,
encoder_device=encoder_device,
mcap_compression=mcap_compression,
depth_mode=depth_mode,
with_pose=with_pose,
pose_config=pose_config.expanduser().resolve() if pose_config is not None else None,
world_frame_id=world_frame_id,
end_frame=end_frame,
sync_tolerance_ms=sync_tolerance_ms,
)
skipped_results: list[JobResult] = []
failed_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = []
pending_reasons: dict[Path, str] = {}
pending_details: dict[Path, str] = {}
valid_existing: list[OutputProbeResult] = []
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = []
missing_outputs: list[ConversionJob] = []
for segment_dir in sources.segment_dirs:
scan = scan_segment_dir(segment_dir)
output_path = output_path_for(segment_dir)
job = ConversionJob(
segment_dir=segment_dir,
output_path=output_path,
camera_labels=scan.camera_labels,
)
command = tuple(command_for_job(job, config)) if config.zed_bin is not None else ()
if not scan.is_valid:
failed_results.append(
JobResult(
status="failed",
segment_dir=segment_dir,
output_path=output_path,
command=command,
return_code=2,
stderr=scan.reason or "",
)
)
continue
if report_existing:
probe_result = probe_output(output_path, job.camera_labels)
if probe_result.status == "valid":
valid_existing.append(probe_result)
elif probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
else:
missing_outputs.append(job)
continue
if overwrite:
pending_jobs.append(job)
pending_reasons[segment_dir] = "overwrite"
continue
if config.probe_existing:
probe_result = probe_output(output_path, job.camera_labels)
if probe_result.status == "valid":
valid_existing.append(probe_result)
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
if probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
pending_jobs.append(job)
pending_reasons[segment_dir] = "invalid-existing-output"
pending_details[segment_dir] = probe_result.reason
continue
missing_outputs.append(job)
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
continue
if output_path.exists():
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
missing_outputs.append(job)
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
if report_existing:
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} "
f"invalid={len(invalid_existing)} missing={len(missing_outputs)} "
f"invalid-segments={len(failed_results)}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
report_invalid_existing_outputs(invalid_existing)
summarize_failures(failed_results)
if failed_results or invalid_existing:
raise SystemExit(1)
return
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} "
f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} "
f"dry_run={'yes' if dry_run else 'no'}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
if config.probe_existing:
click.echo(
(
f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} "
f"missing={len(missing_outputs)}"
),
err=True,
)
if dry_run:
report_dry_run_plan(pending_jobs, pending_reasons, pending_details)
summarize_failures(failed_results)
if failed_results:
raise SystemExit(1)
return
results = list(skipped_results)
results.extend(failed_results)
conversion_results, aborted_count = run_batch(pending_jobs, config, jobs)
results.extend(conversion_results)
converted_count = sum(1 for result in results if result.status == "converted")
skipped_count = sum(1 for result in results if result.status == "skipped")
failed_count = sum(1 for result in results if result.status == "failed")
click.echo(
(
f"summary: matched={len(sources.segment_dirs)} converted={converted_count} "
f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}"
),
err=True,
)
summarize_failures(results)
if failed_count > 0 or aborted_count > 0:
raise SystemExit(1)
if __name__ == "__main__":
main()
+5 -2
View File
@@ -1570,8 +1570,8 @@ int run_multi_source(
const cvmmap_streamer::McapCompression compression,
const sl::DEPTH_MODE depth_mode,
const PoseTrackingOptions &pose_tracking) {
if (options.start_frame != 0 || options.has_end_frame) {
spdlog::error("multi-camera mode does not support --start-frame or --end-frame");
if (options.start_frame != 0) {
spdlog::error("multi-camera mode does not support --start-frame");
return exit_code(ToolExitCode::UsageError);
}
bool interrupted{false};
@@ -1678,6 +1678,9 @@ int run_multi_source(
return exit_code(ToolExitCode::RuntimeError);
}
emitted_groups += 1;
if (options.has_end_frame && emitted_groups > options.end_frame) {
break;
}
auto advance = advance_after_emit(streams);
if (!advance) {