Compare commits

...

3 Commits

Author SHA1 Message Date
crosstyan 4f016d9cef refactor(cli): restructure runtime parsing around CLI11 validators
Refactor runtime option parsing to use structured CLI11 option bindings,
validators, and grouped help output instead of the previous raw-string
collection and manual field-by-field parsing.

This introduces reusable canonicalization helpers, endpoint validation,
non-empty validators, and boolean-assignment checks so invalid CLI input is
rejected earlier and help/default rendering comes directly from the option
definitions.

The static help text is updated to advertise the new keep-stream-on-reset
flag added by the streaming work.
2026-04-09 12:18:12 +08:00
crosstyan 965b03c053 fix(stream): preserve live outputs and disable idle exit by default
Preserve RTP/RTMP session continuity across upstream stream_reset events by
forcing a keyframe on restart, remapping live timestamps, and keeping live
outputs open when the runtime requests reset continuity.

Disable idle auto-exit by default by changing ingest_idle_timeout_ms to 0,
removing validation that rejected 0, and only enforcing idle shutdown when a
positive timeout is configured in pipeline and ingest loops.

Also suppress libavformat FLV trailer header backfill attempts on RTMP sockets
and update the RTP output tester for the newer publisher create signature.

Docs are updated to state that 0 disables the idle timeout.
2026-04-09 12:17:54 +08:00
crosstyan 0a3da46f19 Redesign batch segment source selection 2026-04-08 17:23:12 +08:00
16 changed files with 1452 additions and 661 deletions
+13 -11
View File
@@ -162,7 +162,7 @@ Use the batch wrapper to run `zed_svo_grid_to_mp4` over many segment directories
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--recursive \
--jobs 2 \
--encoder-device auto \
@@ -173,8 +173,8 @@ You can also provide the exact segments to convert:
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
--segment-dir <SEGMENT_DIR_A> \
--segment-dir <SEGMENT_DIR_B> \
--segment <SEGMENT_DIR_A> \
--segment <SEGMENT_DIR_B> \
--jobs 2
```
@@ -193,7 +193,7 @@ When you suspect a previous run left behind partial MP4 files, opt into `ffprobe
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--probe-existing \
--jobs 2
```
@@ -202,7 +202,7 @@ Use `--report-existing` to audit existing outputs without launching conversions.
```bash
uv run python scripts/zed_batch_svo_grid_to_mp4.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--report-existing
```
@@ -236,7 +236,7 @@ Use the wrapper to recurse through a dataset root, run `zed_svo_to_mcap --segmen
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--recursive \
--jobs 2 \
--cuda-visible-devices GPU-9cc7b26e-90d4-0c49-4d4c-060e528ffba6 \
@@ -258,7 +258,7 @@ Enable per-camera pose export when the segment has valid tracking:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
--segment-dir <SEGMENT_DIR> \
--segment <SEGMENT_DIR> \
--with-pose \
--pose-config <POSE_CONFIG>
```
@@ -275,6 +275,8 @@ Single-source `zed_svo_to_mcap` now writes the one-camera `copy` shape by defaul
For the simple non-GUI path, use `scripts/mcap_rgbd_example.py` and [docs/mcap_recipes.md](./docs/mcap_recipes.md). That helper supports current `bundled` and `copy` MCAPs, and it also accepts the legacy `/camera/*` shape by treating it as a single-camera stream with the literal label `camera`.
For calibration-based depth/RGB mapping, use `scripts/mcap_depth_alignment.py` and [docs/depth_alignment.md](./docs/depth_alignment.md). That helper explains the current affine mapping implied by the exported calibration topics and can export example aligned-depth and overlay PNGs from a chosen MCAP frame.
### MCAP RGBD Viewer
The repo includes an example RGB+depth viewer at `scripts/mcap_rgbd_viewer.py`. It supports legacy standalone `/camera/*` MCAPs, bundled `/bundle` + `/zedN/*` MCAPs, and `copy` MCAPs with namespaced `/{label}/*` topics and no `/bundle`, including the default single-source output from `zed_svo_to_mcap`.
@@ -322,7 +324,7 @@ That is why the batch wrapper supports mixed pools such as two NVENC workers plu
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--recursive \
--overwrite \
--hardware-jobs 2 \
@@ -340,7 +342,7 @@ Use `--probe-existing` to validate existing MCAPs before skipping them. Invalid
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--probe-existing \
--jobs 2
```
@@ -349,7 +351,7 @@ Use `--report-existing` to audit existing MCAPs without launching conversions:
```bash
uv run python scripts/zed_batch_svo_to_mcap.py \
<DATASET_ROOT> \
--dataset-root <DATASET_ROOT> \
--report-existing
```
@@ -510,7 +512,7 @@ Run the fault injection and latency validation suite.
| Flag | Description | Default |
|------|-------------|---------|
| `--ingest-max-frames N` | Process at most N frames then exit | 0 (unlimited) |
| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds | 0 (disabled) |
| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds; 0 disables the timeout | 0 (disabled) |
## Architecture
@@ -108,8 +108,9 @@ struct LatencyConfig {
std::size_t queue_size{1};
bool realtime_sync{true};
bool force_idr_on_reset{true};
bool keep_stream_on_reset{false};
std::uint32_t ingest_max_frames{0};
std::uint32_t ingest_idle_timeout_ms{1000};
std::uint32_t ingest_idle_timeout_ms{0};
std::uint32_t ingest_consumer_delay_ms{0};
std::uint32_t snapshot_copy_delay_us{0};
std::uint32_t emit_stall_ms{0};
@@ -18,6 +18,7 @@ struct RawVideoFrame {
ipc::FrameInfo info{};
std::uint64_t source_timestamp_ns{0};
std::size_t row_stride_bytes{0};
bool force_keyframe{false};
std::span<const std::uint8_t> bytes{};
};
+1
View File
@@ -9,6 +9,7 @@ dependencies = [
"opencv-python-headless>=4.11",
"progress-table>=3.2",
"protobuf>=5.29",
"tqdm>=4.67",
"zstandard>=0.23",
]
+255
View File
@@ -0,0 +1,255 @@
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Generic, Protocol, TypeVar
import click
from click.core import ParameterSource
class SegmentScanLike(Protocol):
segment_dir: Path
matched_files: int
is_valid: bool
ScanT = TypeVar("ScanT", bound=SegmentScanLike)
@dataclass(slots=True, frozen=True)
class SourceResolution(Generic[ScanT]):
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[ScanT, ...]
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def discover_segment_dirs(
root: Path,
recursive: bool,
*,
scan_segment_dir: Callable[[Path], ScanT],
no_matches_message: Callable[[Path], str],
) -> SourceResolution[ScanT]:
resolved_root = root.expanduser().resolve()
if not resolved_root.is_dir():
raise click.ClickException(f"dataset root does not exist: {resolved_root}")
candidate_dirs = {resolved_root}
iterator = resolved_root.rglob("*") if recursive else resolved_root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[ScanT] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(no_matches_message(resolved_root))
return SourceResolution(
mode="dataset-root",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def raise_if_recursive_flag_is_incompatible(
ctx: click.Context,
dataset_root: Path | None,
*,
dataset_root_flag: str = "--dataset-root",
) -> None:
if ctx.get_parameter_source("recursive") is ParameterSource.DEFAULT:
return
if dataset_root is None:
raise click.ClickException(f"--recursive/--no-recursive can only be used with {dataset_root_flag}")
def raise_for_legacy_source_args(
legacy_input_dir: Path | None,
legacy_segment_dirs: tuple[Path, ...],
*,
dataset_root_flag: str = "--dataset-root",
segment_flag: str = "--segment",
) -> None:
if legacy_input_dir is not None:
resolved = legacy_input_dir.expanduser().resolve()
raise click.ClickException(
f"positional dataset paths are no longer supported; use {dataset_root_flag} {resolved}"
)
if legacy_segment_dirs:
resolved = legacy_segment_dirs[0].expanduser().resolve()
raise click.ClickException(
f"--segment-dir is no longer supported in this batch wrapper; use {segment_flag} {resolved} "
f"for an explicit segment directory, or {dataset_root_flag} <DATASET_ROOT> --recursive for discovery"
)
def raise_for_legacy_extra_args(
extra_args: list[str],
*,
dataset_root_flag: str = "--dataset-root",
) -> None:
if not extra_args:
return
first = extra_args[0]
if first.startswith("-"):
extras_text = " ".join(extra_args)
raise click.ClickException(f"unexpected extra arguments: {extras_text}")
resolved = Path(first).expanduser().resolve()
raise click.ClickException(
f"positional dataset paths are no longer supported; use {dataset_root_flag} {resolved}"
)
def raise_if_segment_path_looks_like_dataset_root(
segment_dir: Path,
*,
scan_segment_dir: Callable[[Path], ScanT],
dataset_root_flag: str = "--dataset-root",
segment_flag: str = "--segment",
) -> None:
resolved = segment_dir.expanduser().resolve()
if not resolved.is_dir():
return
scan = scan_segment_dir(resolved)
if scan.is_valid or scan.matched_files > 0:
return
nested_segments = _find_nested_valid_segment_dirs(resolved, scan_segment_dir=scan_segment_dir)
if not nested_segments:
return
example = nested_segments[0]
raise click.ClickException(
f"{resolved} looks like a dataset root, not a segment directory. "
f"{segment_flag} expects a directory that directly contains *_zedN.svo or *_zedN.svo2 files. "
f"Use {dataset_root_flag} {resolved} to discover nested segments such as {example}"
)
def resolve_sources(
dataset_root: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
*,
scan_segment_dir: Callable[[Path], ScanT],
no_matches_message: Callable[[Path], str],
) -> SourceResolution[ScanT]:
source_count = sum(
(
1 if dataset_root is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: --dataset-root, --segment, or --segments-csv"
)
if dataset_root is not None:
return discover_segment_dirs(
dataset_root,
recursive,
scan_segment_dir=scan_segment_dir,
no_matches_message=no_matches_message,
)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
for segment_dir in ordered_dirs:
raise_if_segment_path_looks_like_dataset_root(
segment_dir,
scan_segment_dir=scan_segment_dir,
)
return SourceResolution(mode="segments", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def _find_nested_valid_segment_dirs(
root: Path,
*,
scan_segment_dir: Callable[[Path], ScanT],
limit: int = 3,
) -> tuple[Path, ...]:
matches: list[Path] = []
for path in sorted(root.rglob("*")):
if not path.is_dir():
continue
resolved = path.resolve()
if resolved == root:
continue
scan = scan_segment_dir(resolved)
if scan.is_valid:
matches.append(resolved)
if len(matches) >= limit:
break
return tuple(matches)
+45 -128
View File
@@ -3,7 +3,6 @@
from __future__ import annotations
import concurrent.futures
import csv
import json
import math
import os
@@ -17,6 +16,11 @@ from pathlib import Path
import click
from tqdm import tqdm
try:
from scripts import zed_batch_segment_sources as segment_sources
except ModuleNotFoundError:
import zed_batch_segment_sources as segment_sources
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
@@ -70,13 +74,6 @@ class SegmentScan:
reason: str | None = None
@dataclass(slots=True, frozen=True)
class SourceResolution:
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[SegmentScan, ...]
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
@@ -157,116 +154,6 @@ def scan_segment_dir(segment_dir: Path) -> SegmentScan:
return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True)
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(f"no complete four-camera segments found under {root}")
return SourceResolution(
mode="discovery",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def resolve_sources(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
) -> SourceResolution:
source_count = sum(
(
1 if input_dir is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv"
)
if input_dir is not None:
return discover_segment_dirs(input_dir.expanduser().resolve(), recursive)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def output_path_for(segment_dir: Path) -> Path:
return segment_dir / f"{segment_dir.name}_grid.mp4"
@@ -514,30 +401,45 @@ def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -
return results, aborted_count
@click.command()
@click.argument(
"input_dir",
required=False,
@click.command(context_settings={"allow_extra_args": True})
@click.option(
"--dataset-root",
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
help="Dataset root containing segment directories. Mutually exclusive with --segment and --segments-csv.",
)
@click.option(
"--segment",
"segment_dirs",
multiple=True,
type=click.Path(exists=True, path_type=Path, file_okay=False, dir_okay=True),
help=(
"Explicit segment directory. Repeatable. The directory must directly contain "
"*_zedN.svo or *_zedN.svo2 files. Mutually exclusive with --dataset-root and --segments-csv."
),
)
@click.option(
"--segment-dir",
"segment_dirs",
"legacy_segment_dirs",
multiple=True,
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.",
hidden=True,
)
@click.option(
"--segments-csv",
type=click.Path(path_type=Path, dir_okay=False),
help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.",
help="CSV file containing a segment_dir column. Mutually exclusive with --dataset-root and --segment.",
)
@click.option(
"--csv-root",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.")
@click.option(
"--recursive/--no-recursive",
default=True,
show_default=True,
help="Recurse when discovering segment directories from --dataset-root.",
)
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
@@ -625,9 +527,12 @@ def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -
show_default=True,
help="Scale each tile relative to the source resolution.",
)
@click.pass_context
def main(
input_dir: Path | None,
ctx: click.Context,
dataset_root: Path | None,
segment_dirs: tuple[Path, ...],
legacy_segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
@@ -653,14 +558,26 @@ def main(
tile_scale: float,
) -> None:
"""Batch-convert synced four-camera ZED segments into grid MP4 files."""
segment_sources.raise_for_legacy_extra_args(ctx.args)
segment_sources.raise_for_legacy_source_args(None, legacy_segment_dirs)
segment_sources.raise_if_recursive_flag_is_incompatible(ctx, dataset_root)
if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
sources = segment_sources.resolve_sources(
dataset_root,
segment_dirs,
segments_csv,
csv_root,
recursive,
scan_segment_dir=scan_segment_dir,
no_matches_message=lambda root: f"no complete four-camera segments found under {root}",
)
ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None
binary_path = None if report_existing else locate_binary(zed_bin)
sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive)
config = BatchConfig(
zed_bin=binary_path,
ffprobe_bin=ffprobe_path,
+47 -130
View File
@@ -3,7 +3,6 @@
from __future__ import annotations
import concurrent.futures
import csv
import importlib
import os
import re
@@ -17,6 +16,11 @@ from pathlib import Path
import click
from progress_table import ProgressTable
try:
from scripts import zed_batch_segment_sources as segment_sources
except ModuleNotFoundError:
import zed_batch_segment_sources as segment_sources
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
@@ -82,13 +86,6 @@ class SegmentScan:
reason: str | None = None
@dataclass(slots=True, frozen=True)
class SourceResolution:
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[SegmentScan, ...]
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
@@ -339,116 +336,6 @@ def scan_segment_dir(segment_dir: Path) -> SegmentScan:
)
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def discover_segment_dirs(root: Path, recursive: bool) -> SourceResolution:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(f"no multi-camera segments found under {root}")
return SourceResolution(
mode="discovery",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def resolve_sources(
input_dir: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
) -> SourceResolution:
source_count = sum(
(
1 if input_dir is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: INPUT_DIR, --segment-dir, or --segments-csv"
)
if input_dir is not None:
return discover_segment_dirs(input_dir.expanduser().resolve(), recursive)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
return SourceResolution(mode="segment-dir", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def output_path_for(segment_dir: Path) -> Path:
return segment_dir / f"{segment_dir.name}.mcap"
@@ -469,7 +356,7 @@ def display_name_for_segment(
input_root: Path | None,
common_parent: Path | None,
) -> str:
if source_mode == "discovery" and input_root is not None:
if source_mode == "dataset-root" and input_root is not None:
try:
return str(segment_dir.relative_to(input_root))
except ValueError:
@@ -1071,30 +958,45 @@ def build_worker_slots(
return worker_slots
@click.command()
@click.argument(
"input_dir",
required=False,
@click.command(context_settings={"allow_extra_args": True})
@click.option(
"--dataset-root",
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
help="Dataset root containing segment directories. Mutually exclusive with --segment and --segments-csv.",
)
@click.option(
"--segment",
"segment_dirs",
multiple=True,
type=click.Path(exists=True, path_type=Path, file_okay=False, dir_okay=True),
help=(
"Explicit segment directory. Repeatable. The directory must directly contain "
"*_zedN.svo or *_zedN.svo2 files. Mutually exclusive with --dataset-root and --segments-csv."
),
)
@click.option(
"--segment-dir",
"segment_dirs",
"legacy_segment_dirs",
multiple=True,
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Explicit segment directory. Repeatable. Mutually exclusive with INPUT_DIR and --segments-csv.",
hidden=True,
)
@click.option(
"--segments-csv",
type=click.Path(path_type=Path, dir_okay=False),
help="CSV file containing a segment_dir column. Mutually exclusive with INPUT_DIR and --segment-dir.",
help="CSV file containing a segment_dir column. Mutually exclusive with --dataset-root and --segment.",
)
@click.option(
"--csv-root",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Recurse when discovering segment directories from INPUT_DIR.")
@click.option(
"--recursive/--no-recursive",
default=True,
show_default=True,
help="Recurse when discovering segment directories from --dataset-root.",
)
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--hardware-jobs",
@@ -1231,9 +1133,12 @@ def build_worker_slots(
show_default=True,
help="Progress output mode. Auto uses a table on TTY and text logging otherwise.",
)
@click.pass_context
def main(
input_dir: Path | None,
ctx: click.Context,
dataset_root: Path | None,
segment_dirs: tuple[Path, ...],
legacy_segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
@@ -1266,6 +1171,10 @@ def main(
progress_ui: str,
) -> None:
"""Batch-convert multi-camera ZED segments into grouped MCAP files."""
segment_sources.raise_for_legacy_extra_args(ctx.args)
segment_sources.raise_for_legacy_source_args(None, legacy_segment_dirs)
segment_sources.raise_if_recursive_flag_is_incompatible(ctx, dataset_root)
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
if bundle_policy == "copy":
@@ -1276,8 +1185,16 @@ def main(
if bundle_topic != "/bundle":
raise click.ClickException("--bundle-topic cannot be customized with --bundle-policy copy")
sources = segment_sources.resolve_sources(
dataset_root,
segment_dirs,
segments_csv,
csv_root,
recursive,
scan_segment_dir=scan_segment_dir,
no_matches_message=lambda root: f"no multi-camera segments found under {root}",
)
binary_path = None if report_existing else locate_binary(zed_bin)
sources = resolve_sources(input_dir, segment_dirs, segments_csv, csv_root, recursive)
worker_slots = build_worker_slots(
jobs=jobs,
encoder_device=encoder_device,
@@ -1307,7 +1224,7 @@ def main(
sync_tolerance_ms=sync_tolerance_ms,
progress_ui=progress_ui,
)
input_root = input_dir.expanduser().resolve() if input_dir is not None else None
input_root = dataset_root.expanduser().resolve() if dataset_root is not None else None
display_parent = common_segment_parent(sources.segment_dirs)
skipped_results: list[JobResult] = []
+451 -230
View File
@@ -48,15 +48,31 @@ std::string normalize_cli_error(std::string raw_message) {
return trim_copy(std::move(raw_message));
}
std::expected<std::uint32_t, std::string> parse_u32(std::string_view raw, std::string_view field_name) {
std::uint32_t value{0};
const auto *begin = raw.data();
const auto *end = raw.data() + raw.size();
const auto result = std::from_chars(begin, end, value, 10);
if (result.ec != std::errc{} || result.ptr != end) {
return std::unexpected("invalid value for " + std::string(field_name) + ": '" + std::string(raw) + "'");
}
return value;
template <typename Parser>
CLI::Validator canonicalize_option(Parser parser) {
return CLI::Validator(
[parser = std::move(parser)](std::string &value) {
auto canonical = parser(value);
if (!canonical) {
return canonical.error();
}
value = std::move(*canonical);
return std::string{};
},
std::string{},
std::string{});
}
CLI::Validator require_non_empty(std::string_view option_name) {
return CLI::Validator(
[label = std::string(option_name)](std::string &value) {
if (!value.empty()) {
return std::string{};
}
return "invalid value for " + label + ": must not be empty";
},
std::string{},
std::string{});
}
std::expected<std::uint16_t, std::string> parse_u16(std::string_view raw, std::string_view field_name) {
@@ -70,31 +86,6 @@ std::expected<std::uint16_t, std::string> parse_u16(std::string_view raw, std::s
return value;
}
std::expected<std::size_t, std::string> parse_size(std::string_view raw, std::string_view field_name) {
unsigned long long parsed{0};
const auto *begin = raw.data();
const auto *end = raw.data() + raw.size();
const auto result = std::from_chars(begin, end, parsed, 10);
if (result.ec != std::errc{} || result.ptr != end) {
return std::unexpected("invalid value for " + std::string(field_name) + ": '" + std::string(raw) + "'");
}
if (parsed > static_cast<unsigned long long>(std::numeric_limits<std::size_t>::max())) {
return std::unexpected("value out of range for " + std::string(field_name) + ": '" + std::string(raw) + "'");
}
return static_cast<std::size_t>(parsed);
}
std::expected<bool, std::string> parse_bool(std::string_view raw, std::string_view field_name) {
if (raw == "true" || raw == "1") {
return true;
}
if (raw == "false" || raw == "0") {
return false;
}
return std::unexpected(
"invalid value for " + std::string(field_name) + ": '" + std::string(raw) + "' (expected: true|false|1|0)");
}
std::expected<CodecType, std::string> parse_codec(std::string_view raw) {
if (raw == "h264") {
return CodecType::H264;
@@ -181,6 +172,62 @@ std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::str
return std::unexpected("invalid mcap compression: '" + std::string(raw) + "' (expected: none|lz4|zstd)");
}
std::expected<std::string, std::string> canonicalize_codec(std::string_view raw) {
auto parsed = parse_codec(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_run_mode(std::string_view raw) {
auto parsed = parse_run_mode(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_rtmp_transport(std::string_view raw) {
auto parsed = parse_rtmp_transport(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_encoder_backend(std::string_view raw) {
auto parsed = parse_encoder_backend(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_encoder_device(std::string_view raw) {
auto parsed = parse_encoder_device(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_input_video_source(std::string_view raw) {
auto parsed = parse_input_video_source(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::string, std::string> canonicalize_mcap_compression(std::string_view raw) {
auto parsed = parse_mcap_compression_impl(raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
return std::string(to_string(*parsed));
}
std::expected<std::pair<std::string, std::uint16_t>, std::string> parse_rtp_endpoint(std::string_view endpoint) {
if (endpoint.empty()) {
return std::unexpected("invalid RTP config: endpoint must not be empty");
@@ -203,6 +250,48 @@ std::expected<std::pair<std::string, std::uint16_t>, std::string> parse_rtp_endp
return std::pair{std::string(host), *parsed_port};
}
CLI::Validator validate_rtp_endpoint() {
return CLI::Validator(
[](std::string &value) {
auto parsed = parse_rtp_endpoint(value);
if (!parsed) {
return parsed.error();
}
return std::string{};
},
std::string{},
std::string{});
}
std::optional<std::string> find_disallowed_boolean_assignment(int argc, char **argv) {
struct FlagPair {
std::string_view positive;
std::string_view negative;
};
constexpr std::array<FlagPair, 6> kFlagPairs{{
{"--rtmp", "--no-rtmp"},
{"--rtp", "--no-rtp"},
{"--mcap", "--no-mcap"},
{"--realtime-sync", "--no-realtime-sync"},
{"--force-idr-on-reset", "--no-force-idr-on-reset"},
{"--keep-stream-on-reset", "--no-keep-stream-on-reset"},
}};
for (int i = 1; i < argc; ++i) {
const std::string_view arg{argv[i]};
for (const auto &pair : kFlagPairs) {
if (arg.rfind(std::string(pair.positive) + "=", 0) == 0 ||
arg.rfind(std::string(pair.negative) + "=", 0) == 0) {
return "invalid boolean flag syntax: " + std::string(arg) + " (use " + std::string(pair.positive) +
" or " + std::string(pair.negative) + ")";
}
}
}
return std::nullopt;
}
template <typename T>
std::optional<T> toml_value(const toml::table &table, std::string_view path) {
auto node = table.at_path(path);
@@ -428,6 +517,9 @@ std::expected<void, std::string> apply_toml_file(RuntimeConfig &config, const st
if (auto value = toml_value<bool>(table, "latency.force_idr_on_reset")) {
config.latency.force_idr_on_reset = *value;
}
if (auto value = toml_value<bool>(table, "latency.keep_stream_on_reset")) {
config.latency.keep_stream_on_reset = *value;
}
{
auto value = toml_nonnegative_integral<std::uint32_t>(
table,
@@ -599,87 +691,257 @@ std::string_view to_string(McapCompression compression) {
std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **argv) {
RuntimeConfig config = RuntimeConfig::defaults();
const RuntimeConfig defaults = config;
std::string config_path_raw{};
std::string input_uri_raw{};
std::string input_nats_url_raw{};
std::string input_video_source_raw{};
std::string run_mode_raw{};
std::string codec_raw{};
std::string encoder_backend_raw{};
std::string encoder_device_raw{};
std::string rtmp_transport_raw{};
std::string rtmp_ffmpeg_path_raw{};
std::vector<std::string> rtmp_urls_raw{};
std::string rtp_endpoint_raw{};
std::string rtp_payload_type_raw{};
std::string rtp_sdp_raw{};
std::string mcap_path_raw{};
std::string mcap_topic_raw{};
std::string mcap_depth_topic_raw{};
std::string mcap_calibration_topic_raw{};
std::string mcap_depth_calibration_topic_raw{};
std::string mcap_pose_topic_raw{};
std::string mcap_body_topic_raw{};
std::string mcap_frame_id_raw{};
std::string mcap_compression_raw{};
std::string queue_size_raw{};
std::string gop_raw{};
std::string b_frames_raw{};
std::string realtime_sync_raw{};
std::string force_idr_on_reset_raw{};
std::string ingest_max_frames_raw{};
std::string ingest_idle_timeout_raw{};
std::string ingest_consumer_delay_raw{};
std::string snapshot_copy_delay_raw{};
std::string emit_stall_raw{};
bool rtmp_enabled{false};
bool rtp_enabled{false};
bool mcap_enabled{false};
std::optional<std::string> config_path_override{};
std::optional<std::string> input_uri_override{};
std::optional<std::string> input_nats_url_override{};
std::optional<std::string> input_video_source_override{};
std::optional<std::string> run_mode_override{};
std::optional<std::string> codec_override{};
std::optional<std::string> encoder_backend_override{};
std::optional<std::string> encoder_device_override{};
std::optional<bool> rtmp_enabled_override{};
std::vector<std::string> rtmp_urls_override{};
std::optional<std::string> rtmp_transport_override{};
std::optional<std::string> rtmp_ffmpeg_path_override{};
std::optional<bool> rtp_enabled_override{};
std::optional<std::string> rtp_endpoint_override{};
std::optional<std::uint16_t> rtp_payload_type_override{};
std::optional<std::string> rtp_sdp_override{};
std::optional<bool> mcap_enabled_override{};
std::optional<std::string> mcap_path_override{};
std::optional<std::string> mcap_topic_override{};
std::optional<std::string> mcap_depth_topic_override{};
std::optional<std::string> mcap_calibration_topic_override{};
std::optional<std::string> mcap_depth_calibration_topic_override{};
std::optional<std::string> mcap_pose_topic_override{};
std::optional<std::string> mcap_body_topic_override{};
std::optional<std::string> mcap_frame_id_override{};
std::optional<std::string> mcap_compression_override{};
std::optional<std::size_t> queue_size_override{};
std::optional<std::uint32_t> gop_override{};
std::optional<std::uint32_t> b_frames_override{};
std::optional<bool> realtime_sync_override{};
std::optional<bool> force_idr_on_reset_override{};
std::optional<bool> keep_stream_on_reset_override{};
std::optional<std::uint32_t> ingest_max_frames_override{};
std::optional<std::uint32_t> ingest_idle_timeout_override{};
std::optional<std::uint32_t> ingest_consumer_delay_override{};
std::optional<std::uint32_t> snapshot_copy_delay_override{};
std::optional<std::uint32_t> emit_stall_override{};
bool version_requested{false};
CLI::App app{"cvmmap-streamer runtime options"};
app.allow_extras(false);
app.set_help_flag("--help,-h", "show this message");
app.set_help_flag("--help,-h", "Show this message");
app.get_formatter()->column_width(36);
app.add_option("--config", config_path_raw);
app.add_option("--input-uri", input_uri_raw);
app.add_option("--nats-url", input_nats_url_raw);
app.add_option("--input-video-source", input_video_source_raw);
app.add_option("--run-mode", run_mode_raw);
app.add_option("--codec", codec_raw);
app.add_option("--encoder-backend", encoder_backend_raw);
app.add_option("--encoder-device", encoder_device_raw);
app.add_flag("--rtmp", rtmp_enabled);
app.add_option("--rtmp-url", rtmp_urls_raw);
app.add_option("--rtmp-transport", rtmp_transport_raw);
app.add_option("--rtmp-ffmpeg", rtmp_ffmpeg_path_raw);
app.add_flag("--rtp", rtp_enabled);
app.add_option("--rtp-endpoint", rtp_endpoint_raw);
app.add_option("--rtp-payload-type", rtp_payload_type_raw);
auto *rtp_sdp = app.add_option("--rtp-sdp", rtp_sdp_raw);
app.add_option("--sdp", rtp_sdp_raw)->excludes(rtp_sdp);
app.add_flag("--mcap", mcap_enabled);
app.add_option("--mcap-path", mcap_path_raw);
app.add_option("--mcap-topic", mcap_topic_raw);
app.add_option("--mcap-depth-topic", mcap_depth_topic_raw);
app.add_option("--mcap-calibration-topic", mcap_calibration_topic_raw);
app.add_option("--mcap-depth-calibration-topic", mcap_depth_calibration_topic_raw);
app.add_option("--mcap-pose-topic", mcap_pose_topic_raw);
app.add_option("--mcap-body-topic", mcap_body_topic_raw);
app.add_option("--mcap-frame-id", mcap_frame_id_raw);
app.add_option("--mcap-compression", mcap_compression_raw);
app.add_option("--queue-size", queue_size_raw);
app.add_option("--gop", gop_raw);
app.add_option("--b-frames", b_frames_raw);
app.add_option("--realtime-sync", realtime_sync_raw);
app.add_option("--force-idr-on-reset", force_idr_on_reset_raw);
app.add_option("--ingest-max-frames", ingest_max_frames_raw);
app.add_option("--ingest-idle-timeout-ms", ingest_idle_timeout_raw);
app.add_option("--ingest-consumer-delay-ms", ingest_consumer_delay_raw);
app.add_option("--snapshot-copy-delay-us", snapshot_copy_delay_raw);
app.add_option("--emit-stall-ms", emit_stall_raw);
app.add_flag("--version", version_requested);
app.add_option("--config", config_path_override, "Load runtime config from TOML")
->group("General")
->type_name("PATH")
->check(CLI::ExistingFile);
app.add_flag("--version", version_requested, "Show version information")
->group("General")
->disable_flag_override();
app.add_option("--input-uri", input_uri_override, "cv-mmap source URI")
->group("Input")
->type_name("URI")
->check(require_non_empty("--input-uri"))
->default_str(defaults.input.uri);
app.add_option("--nats-url", input_nats_url_override, "NATS server URL for control traffic")
->group("Input")
->type_name("URL")
->check(require_non_empty("--nats-url"))
->default_str(defaults.input.nats_url);
app.add_option("--input-video-source", input_video_source_override, "Preferred upstream video source")
->group("Input")
->type_name("SOURCE")
->transform(canonicalize_option(canonicalize_input_video_source))
->default_str(std::string(to_string(defaults.input.video_source)));
app.add_option("--run-mode", run_mode_override, "Execution mode")
->group("General")
->type_name("MODE")
->transform(canonicalize_option(canonicalize_run_mode))
->default_str(std::string(to_string(defaults.run_mode)));
app.add_option("--codec", codec_override, "Output video codec")
->group("Encoder")
->type_name("CODEC")
->transform(canonicalize_option(canonicalize_codec))
->default_str(std::string(to_string(defaults.encoder.codec)));
app.add_option("--encoder-backend", encoder_backend_override, "Encoder backend implementation")
->group("Encoder")
->type_name("BACKEND")
->transform(canonicalize_option(canonicalize_encoder_backend))
->default_str(std::string(to_string(defaults.encoder.backend)));
app.add_option("--encoder-device", encoder_device_override, "Preferred encoder device")
->group("Encoder")
->type_name("DEVICE")
->transform(canonicalize_option(canonicalize_encoder_device))
->default_str(std::string(to_string(defaults.encoder.device)));
app.add_option("--gop", gop_override, "Encoder GOP length in frames")
->group("Encoder")
->type_name("FRAMES")
->check(CLI::PositiveNumber)
->default_str(std::to_string(defaults.encoder.gop));
app.add_option("--b-frames", b_frames_override, "Encoder B-frame count")
->group("Encoder")
->type_name("COUNT")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.encoder.b_frames));
app.add_flag("--rtmp,!--no-rtmp", rtmp_enabled_override, "Enable or disable RTMP output")
->group("RTMP Output")
->default_str(defaults.outputs.rtmp.enabled ? "true" : "false")
->disable_flag_override();
app.add_option("--rtmp-url", rtmp_urls_override, "RTMP destination URL; repeat to publish to multiple sinks")
->group("RTMP Output")
->type_name("URL")
->check(require_non_empty("--rtmp-url"));
app.add_option("--rtmp-transport", rtmp_transport_override, "RTMP transport backend")
->group("RTMP Output")
->type_name("MODE")
->transform(canonicalize_option(canonicalize_rtmp_transport))
->default_str(std::string(to_string(defaults.outputs.rtmp.transport)));
app.add_option("--rtmp-ffmpeg", rtmp_ffmpeg_path_override, "ffmpeg binary path for ffmpeg_process transport")
->group("RTMP Output")
->type_name("PATH")
->check(require_non_empty("--rtmp-ffmpeg"))
->default_str(defaults.outputs.rtmp.ffmpeg_path);
app.add_flag("--rtp,!--no-rtp", rtp_enabled_override, "Enable or disable RTP output")
->group("RTP Output")
->default_str(defaults.outputs.rtp.enabled ? "true" : "false")
->disable_flag_override();
app.add_option("--rtp-endpoint", rtp_endpoint_override, "RTP destination in <host>:<port> format")
->group("RTP Output")
->type_name("HOST:PORT")
->check(validate_rtp_endpoint());
app.add_option("--rtp-payload-type", rtp_payload_type_override, "Dynamic RTP payload type")
->group("RTP Output")
->type_name("PT")
->check(CLI::Range(96, 127))
->default_str(std::to_string(defaults.outputs.rtp.payload_type));
auto *rtp_sdp = app.add_option("--rtp-sdp", rtp_sdp_override, "Write an SDP sidecar file for RTP output")
->group("RTP Output")
->type_name("PATH")
->check(require_non_empty("--rtp-sdp"));
app.add_option("--sdp", rtp_sdp_override, "Alias for --rtp-sdp")
->group("RTP Output")
->type_name("PATH")
->check(require_non_empty("--sdp"))
->excludes(rtp_sdp);
app.add_flag("--mcap,!--no-mcap", mcap_enabled_override, "Enable or disable MCAP recording")
->group("MCAP Record")
->default_str(defaults.record.mcap.enabled ? "true" : "false")
->disable_flag_override();
app.add_option("--mcap-path", mcap_path_override, "MCAP output file path")
->group("MCAP Record")
->type_name("PATH")
->check(require_non_empty("--mcap-path"))
->default_str(defaults.record.mcap.path);
app.add_option("--mcap-topic", mcap_topic_override, "Foxglove compressed video topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-topic"))
->default_str(defaults.record.mcap.topic);
app.add_option("--mcap-depth-topic", mcap_depth_topic_override, "Depth image topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-depth-topic"))
->default_str(defaults.record.mcap.depth_topic);
app.add_option("--mcap-calibration-topic", mcap_calibration_topic_override, "RGB camera calibration topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-calibration-topic"))
->default_str(defaults.record.mcap.calibration_topic);
app.add_option(
"--mcap-depth-calibration-topic",
mcap_depth_calibration_topic_override,
"Depth camera calibration topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-depth-calibration-topic"))
->default_str(defaults.record.mcap.depth_calibration_topic);
app.add_option("--mcap-pose-topic", mcap_pose_topic_override, "Pose topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-pose-topic"))
->default_str(defaults.record.mcap.pose_topic);
app.add_option("--mcap-body-topic", mcap_body_topic_override, "Body tracking topic name")
->group("MCAP Record")
->type_name("TOPIC")
->check(require_non_empty("--mcap-body-topic"))
->default_str(defaults.record.mcap.body_topic);
app.add_option("--mcap-frame-id", mcap_frame_id_override, "Frame ID written into MCAP messages")
->group("MCAP Record")
->type_name("ID")
->check(require_non_empty("--mcap-frame-id"))
->default_str(defaults.record.mcap.frame_id);
app.add_option("--mcap-compression", mcap_compression_override, "MCAP chunk compression mode")
->group("MCAP Record")
->type_name("MODE")
->transform(canonicalize_option(canonicalize_mcap_compression))
->default_str(std::string(to_string(defaults.record.mcap.compression)));
app.add_option("--queue-size", queue_size_override, "Pipeline queue depth")
->group("Latency")
->type_name("FRAMES")
->check(CLI::PositiveNumber)
->default_str(std::to_string(defaults.latency.queue_size));
app.add_flag("--realtime-sync,!--no-realtime-sync", realtime_sync_override, "Enable or disable realtime pacing")
->group("Latency")
->default_str(defaults.latency.realtime_sync ? "true" : "false")
->disable_flag_override();
app.add_flag(
"--force-idr-on-reset,!--no-force-idr-on-reset",
force_idr_on_reset_override,
"Force a keyframe after upstream stream_reset")
->group("Latency")
->default_str(defaults.latency.force_idr_on_reset ? "true" : "false")
->disable_flag_override();
app.add_flag(
"--keep-stream-on-reset,!--no-keep-stream-on-reset",
keep_stream_on_reset_override,
"Keep RTMP/RTP live outputs open across upstream stream_reset")
->group("Latency")
->default_str(defaults.latency.keep_stream_on_reset ? "true" : "false")
->disable_flag_override();
app.add_option("--ingest-max-frames", ingest_max_frames_override, "Maximum frames to ingest before exit; 0 disables the limit")
->group("Latency")
->type_name("FRAMES")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.latency.ingest_max_frames));
app.add_option("--ingest-idle-timeout-ms", ingest_idle_timeout_override, "Stop ingest if no consumer activity occurs for this long; 0 disables the timeout")
->group("Latency")
->type_name("MS")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.latency.ingest_idle_timeout_ms));
app.add_option(
"--ingest-consumer-delay-ms",
ingest_consumer_delay_override,
"Artificial ingest-side consumer delay")
->group("Latency")
->type_name("MS")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.latency.ingest_consumer_delay_ms));
app.add_option("--snapshot-copy-delay-us", snapshot_copy_delay_override, "Artificial delay before snapshot copy completion")
->group("Latency")
->type_name("US")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.latency.snapshot_copy_delay_us));
app.add_option("--emit-stall-ms", emit_stall_override, "Artificial stall before emitting downstream frames")
->group("Latency")
->type_name("MS")
->check(CLI::NonNegativeNumber)
->default_str(std::to_string(defaults.latency.emit_stall_ms));
if (auto invalid_boolean_assignment = find_disallowed_boolean_assignment(argc, argv)) {
return std::unexpected(*invalid_boolean_assignment);
}
try {
app.parse(argc, argv);
@@ -691,202 +953,164 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
return std::unexpected(normalize_cli_error(e.what()));
}
if (!config_path_raw.empty()) {
auto load_result = apply_toml_file(config, config_path_raw);
if (config_path_override) {
auto load_result = apply_toml_file(config, *config_path_override);
if (!load_result) {
return std::unexpected(load_result.error());
}
}
if (!input_uri_raw.empty()) {
config.input.uri = input_uri_raw;
if (input_uri_override) {
config.input.uri = *input_uri_override;
}
if (!input_nats_url_raw.empty()) {
config.input.nats_url = input_nats_url_raw;
if (input_nats_url_override) {
config.input.nats_url = *input_nats_url_override;
}
if (!input_video_source_raw.empty()) {
auto parsed = parse_input_video_source(input_video_source_raw);
if (input_video_source_override) {
auto parsed = parse_input_video_source(*input_video_source_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.input.video_source = *parsed;
}
if (!run_mode_raw.empty()) {
auto parsed = parse_run_mode(run_mode_raw);
if (run_mode_override) {
auto parsed = parse_run_mode(*run_mode_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.run_mode = *parsed;
}
if (!codec_raw.empty()) {
auto parsed = parse_codec(codec_raw);
if (codec_override) {
auto parsed = parse_codec(*codec_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.encoder.codec = *parsed;
}
if (!encoder_backend_raw.empty()) {
auto parsed = parse_encoder_backend(encoder_backend_raw);
if (encoder_backend_override) {
auto parsed = parse_encoder_backend(*encoder_backend_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.encoder.backend = *parsed;
}
if (!encoder_device_raw.empty()) {
auto parsed = parse_encoder_device(encoder_device_raw);
if (encoder_device_override) {
auto parsed = parse_encoder_device(*encoder_device_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.encoder.device = *parsed;
}
config.outputs.rtmp.enabled = config.outputs.rtmp.enabled || rtmp_enabled;
if (!rtmp_urls_raw.empty()) {
if (!rtmp_urls_override.empty()) {
config.outputs.rtmp.enabled = true;
config.outputs.rtmp.urls = std::move(rtmp_urls_raw);
config.outputs.rtmp.urls = std::move(rtmp_urls_override);
}
if (!rtmp_transport_raw.empty()) {
auto parsed = parse_rtmp_transport(rtmp_transport_raw);
if (rtmp_transport_override) {
auto parsed = parse_rtmp_transport(*rtmp_transport_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.outputs.rtmp.transport = *parsed;
}
if (!rtmp_ffmpeg_path_raw.empty()) {
config.outputs.rtmp.ffmpeg_path = rtmp_ffmpeg_path_raw;
if (rtmp_ffmpeg_path_override) {
config.outputs.rtmp.ffmpeg_path = *rtmp_ffmpeg_path_override;
}
config.outputs.rtp.enabled = config.outputs.rtp.enabled || rtp_enabled;
if (!rtp_endpoint_raw.empty()) {
config.outputs.rtp.enabled = true;
config.outputs.rtp.endpoint = rtp_endpoint_raw;
if (rtmp_enabled_override) {
config.outputs.rtmp.enabled = *rtmp_enabled_override;
}
if (!rtp_payload_type_raw.empty()) {
auto value = parse_u32(rtp_payload_type_raw, "--rtp-payload-type");
if (!value) {
return std::unexpected(value.error());
}
if (*value > std::numeric_limits<std::uint8_t>::max()) {
return std::unexpected("value out of range for --rtp-payload-type: '" + rtp_payload_type_raw + "'");
}
if (rtp_endpoint_override) {
config.outputs.rtp.enabled = true;
config.outputs.rtp.payload_type = static_cast<std::uint8_t>(*value);
config.outputs.rtp.endpoint = *rtp_endpoint_override;
}
if (!rtp_sdp_raw.empty()) {
if (rtp_payload_type_override) {
config.outputs.rtp.enabled = true;
config.outputs.rtp.sdp_path = rtp_sdp_raw;
config.outputs.rtp.payload_type = static_cast<std::uint8_t>(*rtp_payload_type_override);
}
if (rtp_sdp_override) {
config.outputs.rtp.enabled = true;
config.outputs.rtp.sdp_path = *rtp_sdp_override;
}
if (rtp_enabled_override) {
config.outputs.rtp.enabled = *rtp_enabled_override;
}
config.record.mcap.enabled = config.record.mcap.enabled || mcap_enabled;
if (!mcap_path_raw.empty()) {
if (mcap_path_override) {
config.record.mcap.enabled = true;
config.record.mcap.path = mcap_path_raw;
config.record.mcap.path = *mcap_path_override;
}
if (!mcap_topic_raw.empty()) {
if (mcap_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.topic = mcap_topic_raw;
config.record.mcap.topic = *mcap_topic_override;
}
if (!mcap_depth_topic_raw.empty()) {
if (mcap_depth_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.depth_topic = mcap_depth_topic_raw;
config.record.mcap.depth_topic = *mcap_depth_topic_override;
}
if (!mcap_calibration_topic_raw.empty()) {
if (mcap_calibration_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.calibration_topic = mcap_calibration_topic_raw;
config.record.mcap.calibration_topic = *mcap_calibration_topic_override;
}
if (!mcap_depth_calibration_topic_raw.empty()) {
if (mcap_depth_calibration_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.depth_calibration_topic = mcap_depth_calibration_topic_raw;
config.record.mcap.depth_calibration_topic = *mcap_depth_calibration_topic_override;
}
if (!mcap_pose_topic_raw.empty()) {
if (mcap_pose_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.pose_topic = mcap_pose_topic_raw;
config.record.mcap.pose_topic = *mcap_pose_topic_override;
}
if (!mcap_body_topic_raw.empty()) {
if (mcap_body_topic_override) {
config.record.mcap.enabled = true;
config.record.mcap.body_topic = mcap_body_topic_raw;
config.record.mcap.body_topic = *mcap_body_topic_override;
}
if (!mcap_frame_id_raw.empty()) {
if (mcap_frame_id_override) {
config.record.mcap.enabled = true;
config.record.mcap.frame_id = mcap_frame_id_raw;
config.record.mcap.frame_id = *mcap_frame_id_override;
}
if (!mcap_compression_raw.empty()) {
auto parsed = parse_mcap_compression(mcap_compression_raw);
if (mcap_compression_override) {
auto parsed = parse_mcap_compression(*mcap_compression_override);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.record.mcap.enabled = true;
config.record.mcap.compression = *parsed;
}
if (mcap_enabled_override) {
config.record.mcap.enabled = *mcap_enabled_override;
}
if (!queue_size_raw.empty()) {
auto parsed = parse_size(queue_size_raw, "--queue-size");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.queue_size = *parsed;
if (queue_size_override) {
config.latency.queue_size = *queue_size_override;
}
if (!gop_raw.empty()) {
auto parsed = parse_u32(gop_raw, "--gop");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.encoder.gop = *parsed;
if (gop_override) {
config.encoder.gop = *gop_override;
}
if (!b_frames_raw.empty()) {
auto parsed = parse_u32(b_frames_raw, "--b-frames");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.encoder.b_frames = *parsed;
if (b_frames_override) {
config.encoder.b_frames = *b_frames_override;
}
if (!realtime_sync_raw.empty()) {
auto parsed = parse_bool(realtime_sync_raw, "--realtime-sync");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.realtime_sync = *parsed;
if (realtime_sync_override) {
config.latency.realtime_sync = *realtime_sync_override;
}
if (!force_idr_on_reset_raw.empty()) {
auto parsed = parse_bool(force_idr_on_reset_raw, "--force-idr-on-reset");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.force_idr_on_reset = *parsed;
if (force_idr_on_reset_override) {
config.latency.force_idr_on_reset = *force_idr_on_reset_override;
}
if (!ingest_max_frames_raw.empty()) {
auto parsed = parse_u32(ingest_max_frames_raw, "--ingest-max-frames");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.ingest_max_frames = *parsed;
if (keep_stream_on_reset_override) {
config.latency.keep_stream_on_reset = *keep_stream_on_reset_override;
}
if (!ingest_idle_timeout_raw.empty()) {
auto parsed = parse_u32(ingest_idle_timeout_raw, "--ingest-idle-timeout-ms");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.ingest_idle_timeout_ms = *parsed;
if (ingest_max_frames_override) {
config.latency.ingest_max_frames = *ingest_max_frames_override;
}
if (!ingest_consumer_delay_raw.empty()) {
auto parsed = parse_u32(ingest_consumer_delay_raw, "--ingest-consumer-delay-ms");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.ingest_consumer_delay_ms = *parsed;
if (ingest_idle_timeout_override) {
config.latency.ingest_idle_timeout_ms = *ingest_idle_timeout_override;
}
if (!snapshot_copy_delay_raw.empty()) {
auto parsed = parse_u32(snapshot_copy_delay_raw, "--snapshot-copy-delay-us");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.snapshot_copy_delay_us = *parsed;
if (ingest_consumer_delay_override) {
config.latency.ingest_consumer_delay_ms = *ingest_consumer_delay_override;
}
if (!emit_stall_raw.empty()) {
auto parsed = parse_u32(emit_stall_raw, "--emit-stall-ms");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.latency.emit_stall_ms = *parsed;
if (snapshot_copy_delay_override) {
config.latency.snapshot_copy_delay_us = *snapshot_copy_delay_override;
}
if (emit_stall_override) {
config.latency.emit_stall_ms = *emit_stall_override;
}
finalize_rtp_endpoint(config);
@@ -968,10 +1192,6 @@ std::expected<void, std::string> validate_runtime_config(const RuntimeConfig &co
if (config.encoder.b_frames > config.encoder.gop) {
return std::unexpected("invalid encoder config: b_frames must be <= gop");
}
if (config.latency.ingest_idle_timeout_ms == 0) {
return std::unexpected("invalid ingest config: ingest_idle_timeout_ms must be >= 1");
}
return {};
}
@@ -1005,6 +1225,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
ss << ", latency.queue_size=" << config.latency.queue_size;
ss << ", latency.realtime_sync=" << (config.latency.realtime_sync ? "true" : "false");
ss << ", latency.force_idr_on_reset=" << (config.latency.force_idr_on_reset ? "true" : "false");
ss << ", latency.keep_stream_on_reset=" << (config.latency.keep_stream_on_reset ? "true" : "false");
ss << ", latency.ingest_max_frames=" << config.latency.ingest_max_frames;
ss << ", latency.ingest_idle_timeout_ms=" << config.latency.ingest_idle_timeout_ms;
ss << ", latency.ingest_consumer_delay_ms=" << config.latency.ingest_consumer_delay_ms;
+2 -1
View File
@@ -275,6 +275,7 @@ int run_ingest_loop(const RuntimeConfig &config) {
}
});
const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0;
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
@@ -283,7 +284,7 @@ int run_ingest_loop(const RuntimeConfig &config) {
const auto recv_result = subscriber.recv(message, zmq::recv_flags::none);
if (!recv_result) {
const auto now = std::chrono::steady_clock::now();
if (now - last_event >= idle_timeout) {
if (idle_timeout_enabled && now - last_event >= idle_timeout) {
spdlog::info(
"ingest idle timeout reached ({} ms), stopping",
config.latency.ingest_idle_timeout_ms);
+6
View File
@@ -217,6 +217,12 @@ public:
first_source_timestamp_ns_ = frame.source_timestamp_ns;
}
frame_->pict_type = frame.force_keyframe ? AV_PICTURE_TYPE_I : AV_PICTURE_TYPE_NONE;
if (frame.force_keyframe) {
frame_->flags |= AV_FRAME_FLAG_KEY;
} else {
frame_->flags &= ~AV_FRAME_FLAG_KEY;
}
frame_->pts = static_cast<std::int64_t>(frame.source_timestamp_ns - *first_source_timestamp_ns_);
const auto send_result = avcodec_send_frame(context_, frame_);
if (send_result < 0) {
+2 -1
View File
@@ -9,7 +9,7 @@ namespace cvmmap_streamer {
namespace {
constexpr std::array<std::string_view, 33> kHelpLines{
constexpr std::array<std::string_view, 34> kHelpLines{
"Usage:",
" --help, -h\tshow this message",
"",
@@ -23,6 +23,7 @@ constexpr std::array<std::string_view, 33> kHelpLines{
" --encoder-device <device>\tauto|nvidia|software",
" --gop <frames>\tencoder GOP length",
" --b-frames <count>\tencoder B-frame count",
" --keep-stream-on-reset <bool>\tkeep RTMP/RTP sessions alive across upstream stream_reset events",
" --rtp\t\tenable RTP output",
" --rtp-endpoint <host:port>\tRTP destination",
" --rtp-payload-type <pt>\tRTP payload type (96-127)",
+331 -157
View File
@@ -62,6 +62,10 @@ constexpr int exit_code(PipelineExitCode code) {
}
struct ResolvedInputEndpoints {
std::string instance_name;
std::string namespace_name;
std::string ipc_prefix;
std::string base_name;
std::string shm_name;
std::string zmq_endpoint;
std::string nats_target_key;
@@ -71,17 +75,21 @@ struct ResolvedInputEndpoints {
std::expected<ResolvedInputEndpoints, std::string> resolve_input_endpoints(const RuntimeConfig &config) {
try {
auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri);
spdlog::info(
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'",
config.input.uri,
target.shm_name,
target.zmq_addr,
target.nats_target_key);
return ResolvedInputEndpoints{
.shm_name = target.shm_name,
.zmq_endpoint = target.zmq_addr,
.nats_target_key = target.nats_target_key,
};
spdlog::info(
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'",
config.input.uri,
target.shm_name,
target.zmq_addr,
target.nats_target_key);
return ResolvedInputEndpoints{
.instance_name = target.instance,
.namespace_name = target.namespace_name,
.ipc_prefix = target.prefix,
.base_name = target.base_name,
.shm_name = target.shm_name,
.zmq_endpoint = target.zmq_addr,
.nats_target_key = target.nats_target_key,
};
} catch (const std::exception &e) {
return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what());
}
@@ -327,6 +335,81 @@ bool stream_info_equal(
lhs.bitstream_format == rhs.bitstream_format;
}
struct LiveOutputContinuityState {
bool boundary_pending{false};
bool encoded_keyframe_required{false};
std::optional<std::uint64_t> last_output_pts_ns{};
std::optional<std::uint64_t> segment_input_start_pts_ns{};
std::uint64_t segment_output_start_pts_ns{0};
std::uint64_t nominal_frame_interval_ns{33'333'333ull};
void reset() {
boundary_pending = false;
encoded_keyframe_required = false;
last_output_pts_ns.reset();
segment_input_start_pts_ns.reset();
segment_output_start_pts_ns = 0;
nominal_frame_interval_ns = 33'333'333ull;
}
void note_reset_boundary() {
boundary_pending = true;
encoded_keyframe_required = true;
segment_input_start_pts_ns.reset();
}
void note_new_session(const encode::EncodedStreamInfo &stream_info) {
reset();
update_nominal_frame_interval(stream_info);
}
void update_nominal_frame_interval(const encode::EncodedStreamInfo &stream_info) {
if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) {
nominal_frame_interval_ns = 33'333'333ull;
return;
}
const auto numerator = static_cast<std::uint64_t>(stream_info.frame_rate_den) * 1'000'000'000ull;
const auto denominator = static_cast<std::uint64_t>(stream_info.frame_rate_num);
if (denominator == 0) {
nominal_frame_interval_ns = 33'333'333ull;
return;
}
const auto interval = numerator / denominator;
nominal_frame_interval_ns = interval == 0 ? 1ull : interval;
}
[[nodiscard]]
std::uint64_t remap_pts(const std::uint64_t input_pts_ns) {
if (!last_output_pts_ns) {
last_output_pts_ns = input_pts_ns;
segment_input_start_pts_ns = input_pts_ns;
segment_output_start_pts_ns = input_pts_ns;
boundary_pending = false;
return input_pts_ns;
}
if (boundary_pending || !segment_input_start_pts_ns) {
segment_input_start_pts_ns = input_pts_ns;
segment_output_start_pts_ns = *last_output_pts_ns + nominal_frame_interval_ns;
boundary_pending = false;
}
const auto input_delta =
input_pts_ns >= *segment_input_start_pts_ns
? input_pts_ns - *segment_input_start_pts_ns
: 0ull;
auto remapped_pts = segment_output_start_pts_ns + input_delta;
if (remapped_pts <= *last_output_pts_ns) {
remapped_pts = *last_output_pts_ns + nominal_frame_interval_ns;
}
last_output_pts_ns = remapped_pts;
return remapped_pts;
}
};
[[nodiscard]]
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
if (frame.header.timestamp_ns != 0) {
@@ -556,6 +639,7 @@ Status publish_access_units(
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output,
McapRecorderState *mcap_recorder,
LiveOutputContinuityState *live_output_continuity,
metrics::IngestEmitLatencyTracker &latency_tracker) {
for (auto &access_unit : access_units) {
if (access_unit.annexb_bytes.empty()) {
@@ -567,11 +651,29 @@ Status publish_access_units(
latency_tracker.note_emit_stall();
}
const auto should_rewrite_live_timestamps =
live_output_continuity != nullptr &&
(rtp_publisher != nullptr || rtmp_output != nullptr);
auto live_access_unit = access_unit;
const bool live_boundary_packet =
should_rewrite_live_timestamps &&
live_output_continuity->boundary_pending;
if (should_rewrite_live_timestamps) {
live_access_unit.stream_pts_ns = live_output_continuity->remap_pts(access_unit.stream_pts_ns);
}
if (live_boundary_packet) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY first live packet keyframe={} source_pts_ns={} remapped_pts_ns={}",
access_unit.keyframe ? "true" : "false",
access_unit.stream_pts_ns,
live_access_unit.stream_pts_ns);
}
if (rtp_publisher != nullptr) {
rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns);
rtp_publisher->publish_access_unit(live_access_unit.annexb_bytes, live_access_unit.stream_pts_ns);
}
if (rtmp_output != nullptr) {
auto publish = (*rtmp_output)->publish_access_unit(access_unit);
auto publish = (*rtmp_output)->publish_access_unit(live_access_unit);
if (!publish) {
return std::unexpected(publish.error());
}
@@ -607,6 +709,7 @@ Status drain_encoder(
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output,
McapRecorderState *mcap_recorder,
LiveOutputContinuityState *live_output_continuity,
metrics::IngestEmitLatencyTracker &latency_tracker) {
auto drained = flushing ? backend->flush() : backend->drain();
if (!drained) {
@@ -619,6 +722,7 @@ Status drain_encoder(
rtp_publisher,
rtmp_output,
mcap_recorder,
live_output_continuity,
latency_tracker);
}
@@ -678,9 +782,17 @@ int run_pipeline(const RuntimeConfig &config) {
input_endpoints->nats_target_key,
config.input.nats_url);
cvmmap::NatsControlService recorder_service(
config.input.uri,
input_endpoints->nats_target_key,
config.input.nats_url);
cvmmap::NatsControlServiceOptions{
.instance_name = input_endpoints->instance_name,
.namespace_name = input_endpoints->namespace_name,
.ipc_prefix = input_endpoints->ipc_prefix,
.base_name = input_endpoints->base_name,
.target_key = input_endpoints->nats_target_key,
.shm_name = input_endpoints->shm_name,
.zmq_addr = input_endpoints->zmq_endpoint,
.backend = std::string((*source)->backend_name()),
.nats_url = config.input.nats_url,
});
std::mutex nats_event_mutex{};
std::deque<std::vector<std::uint8_t>> pending_body_packets{};
std::deque<int32_t> pending_status_codes{};
@@ -737,6 +849,10 @@ int run_pipeline(const RuntimeConfig &config) {
PipelineStats stats{};
metrics::IngestEmitLatencyTracker latency_tracker{};
LiveOutputContinuityState live_output_continuity{};
const bool keep_live_outputs_on_reset =
config.latency.keep_stream_on_reset &&
(config.outputs.rtp.enabled || config.outputs.rtmp.enabled);
bool producer_offline{false};
bool started{false};
bool using_encoded_input{false};
@@ -766,6 +882,7 @@ int run_pipeline(const RuntimeConfig &config) {
warned_unknown_depth_unit = false;
using_encoded_input = false;
active_stream_info.reset();
live_output_continuity.reset();
rtp_publisher.reset();
rtmp_output.reset();
};
@@ -784,26 +901,55 @@ int run_pipeline(const RuntimeConfig &config) {
const auto start_outputs_from_stream_info =
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
rtp_publisher.reset();
rtmp_output.reset();
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
if (!created) {
return unexpected_error(
ERR_INTERNAL,
"pipeline RTP publisher init failed: " + created.error());
const bool preserving_live_outputs =
keep_live_outputs_on_reset &&
live_output_continuity.boundary_pending &&
active_stream_info.has_value() &&
stream_info_equal(*active_stream_info, stream_info) &&
((config.outputs.rtp.enabled && rtp_publisher.has_value()) ||
(config.outputs.rtmp.enabled && rtmp_output.has_value()));
if (preserving_live_outputs) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY preserving live outputs codec={} width={} height={}",
to_string(stream_info.codec),
stream_info.width,
stream_info.height);
live_output_continuity.update_nominal_frame_interval(stream_info);
} else {
if (keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() &&
!stream_info_equal(*active_stream_info, stream_info)) {
spdlog::warn(
"PIPELINE_RESET_CONTINUITY fallback to output rebuild reason='stream_info_change' old_codec={} new_codec={} old={}x{} new={}x{}",
to_string(active_stream_info->codec),
to_string(stream_info.codec),
active_stream_info->width,
active_stream_info->height,
stream_info.width,
stream_info.height);
}
rtp_publisher.emplace(std::move(*created));
}
if (config.outputs.rtmp.enabled) {
auto created = protocol::make_rtmp_output(config, stream_info);
if (!created) {
return unexpected_error(
created.error().code,
"pipeline RTMP output init failed: " + format_error(created.error()));
rtp_publisher.reset();
rtmp_output.reset();
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
if (!created) {
return unexpected_error(
ERR_INTERNAL,
"pipeline RTP publisher init failed: " + created.error());
}
rtp_publisher.emplace(std::move(*created));
}
rtmp_output.emplace(std::move(*created));
if (config.outputs.rtmp.enabled) {
auto created = protocol::make_rtmp_output(config, stream_info);
if (!created) {
return unexpected_error(
created.error().code,
"pipeline RTMP output init failed: " + format_error(created.error()));
}
rtmp_output.emplace(std::move(*created));
}
live_output_continuity.note_new_session(stream_info);
}
update_mcap_stream_info(mcap_recorder, stream_info);
if (config.record.mcap.enabled) {
std::lock_guard lock(mcap_recorder.mutex);
@@ -865,35 +1011,49 @@ int run_pipeline(const RuntimeConfig &config) {
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
};
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0;
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
while (true) {
std::deque<std::vector<std::uint8_t>> body_packets;
std::deque<int32_t> status_codes;
{
std::lock_guard lock(nats_event_mutex);
body_packets.swap(pending_body_packets);
status_codes.swap(pending_status_codes);
}
const bool had_nats_events = !body_packets.empty() || !status_codes.empty();
if (had_nats_events) {
last_event = std::chrono::steady_clock::now();
}
for (const auto status_code : status_codes) {
stats.status_messages += 1;
switch (static_cast<cvmmap::ModuleStatus>(status_code)) {
case cvmmap::ModuleStatus::Online:
spdlog::info("pipeline status event status=online");
producer_offline = false;
break;
case cvmmap::ModuleStatus::Offline:
spdlog::info("pipeline status event status=offline");
producer_offline = true;
break;
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
while (true) {
std::deque<std::vector<std::uint8_t>> body_packets;
std::deque<int32_t> status_codes;
{
std::lock_guard lock(nats_event_mutex);
body_packets.swap(pending_body_packets);
status_codes.swap(pending_status_codes);
}
const bool had_nats_events = !body_packets.empty() || !status_codes.empty();
if (had_nats_events) {
last_event = std::chrono::steady_clock::now();
}
for (const auto status_code : status_codes) {
stats.status_messages += 1;
switch (static_cast<cvmmap::ModuleStatus>(status_code)) {
case cvmmap::ModuleStatus::Online:
spdlog::info("pipeline status event status=online");
producer_offline = false;
break;
case cvmmap::ModuleStatus::Offline:
spdlog::info("pipeline status event status=offline");
producer_offline = true;
break;
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
if (keep_live_outputs_on_reset) {
live_output_continuity.note_reset_boundary();
if (!using_encoded_input && backend) {
(*backend)->shutdown();
started = false;
}
restart_pending = false;
restart_target_info.reset();
spdlog::info(
"PIPELINE_RESET_CONTINUITY armed outputs_active={} encoded_input={}",
(rtp_publisher.has_value() || rtmp_output.has_value()) ? "true" : "false",
using_encoded_input ? "true" : "false");
} else {
if (backend) {
(*backend)->shutdown();
}
@@ -905,86 +1065,88 @@ int run_pipeline(const RuntimeConfig &config) {
active_info.reset();
rtp_publisher.reset();
rtmp_output.reset();
break;
}
break;
}
for (const auto &body_bytes_vec : body_packets) {
const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(),
body_bytes_vec.size());
if (body_bytes.empty()) {
continue;
}
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error());
restart_backend(reason, active_info);
break;
}
}
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
}
std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
if (now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
}
for (const auto &body_bytes_vec : body_packets) {
const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(),
body_bytes_vec.size());
if (body_bytes.empty()) {
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error());
restart_backend(reason, active_info);
break;
}
}
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
}
std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
if (idle_timeout_enabled && now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
if (!recv_result) {
continue;
}
@@ -1075,6 +1237,14 @@ int run_pipeline(const RuntimeConfig &config) {
restart_backend(reason, snapshot->metadata.info);
continue;
}
if (keep_live_outputs_on_reset && live_output_continuity.encoded_keyframe_required && !access_unit->keyframe) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY dropping encoded access unit until keyframe pts_ns={} source_timestamp_ns={}",
access_unit->stream_pts_ns,
access_unit->source_timestamp_ns);
continue;
}
live_output_continuity.encoded_keyframe_required = false;
std::vector<encode::EncodedAccessUnit> access_units{};
access_units.push_back(std::move(*access_unit));
auto publish = publish_access_units(
@@ -1084,6 +1254,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!publish) {
const auto reason = format_error(publish.error());
@@ -1094,6 +1265,7 @@ int run_pipeline(const RuntimeConfig &config) {
auto push = (*backend)->push_frame(encode::RawVideoFrame{
.info = snapshot->metadata.info,
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
.force_keyframe = keep_live_outputs_on_reset && live_output_continuity.boundary_pending,
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
});
if (!push) {
@@ -1103,30 +1275,30 @@ int run_pipeline(const RuntimeConfig &config) {
}
}
if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) {
spdlog::warn(
"pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata");
warned_unknown_depth_unit = true;
}
}
auto depth_map = make_depth_map_view(*snapshot);
if (!depth_map) {
const auto reason = "pipeline depth snapshot invalid: " + depth_map.error();
restart_backend(reason, active_info);
continue;
}
auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map);
if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error());
restart_backend(reason, active_info);
continue;
if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) {
spdlog::warn(
"pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata");
warned_unknown_depth_unit = true;
}
}
auto depth_map = make_depth_map_view(*snapshot);
if (!depth_map) {
const auto reason = "pipeline depth snapshot invalid: " + depth_map.error();
restart_backend(reason, active_info);
continue;
}
auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map);
if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error());
restart_backend(reason, active_info);
continue;
}
}
stats.pushed_frames += 1;
if (!want_encoded_input) {
auto drain = drain_encoder(
@@ -1137,6 +1309,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
@@ -1166,6 +1339,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error()));
+13 -1
View File
@@ -325,6 +325,7 @@ private:
const encode::EncodedStreamInfo &stream_info) {
Session session{};
session.url = url;
AVDictionary *muxer_options{nullptr};
const auto alloc_result = avformat_alloc_output_context2(&session.format_context, nullptr, "flv", url.c_str());
if (alloc_result < 0 || session.format_context == nullptr) {
@@ -369,7 +370,18 @@ private:
}
}
const auto header_result = avformat_write_header(session.format_context, nullptr);
// RTMP sockets are non-seekable, so the FLV muxer must not try to backfill
// duration/filesize metadata during trailer write.
const auto options_result = av_dict_set(&muxer_options, "flvflags", "no_duration_filesize", 0);
if (options_result < 0) {
av_dict_free(&muxer_options);
close_session(session);
return unexpected_error(
ERR_ALLOCATION_FAILED,
"failed to configure FLV muxer flags for '" + url + "': " + av_error_string(options_result));
}
const auto header_result = avformat_write_header(session.format_context, &muxer_options);
av_dict_free(&muxer_options);
if (header_result < 0) {
close_session(session);
return unexpected_error(
+1 -1
View File
@@ -163,7 +163,7 @@ int main(int argc, char **argv) {
return exit_code(TesterExitCode::BackendInitError);
}
auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config);
auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config, *codec);
if (!publisher) {
spdlog::error("failed to initialize RTP publisher: {}", publisher.error());
return exit_code(TesterExitCode::PublisherInitError);
+268
View File
@@ -0,0 +1,268 @@
from __future__ import annotations
import dataclasses
import tempfile
import unittest
from pathlib import Path
import click
from click.testing import CliRunner
from scripts import zed_batch_segment_sources as segment_sources
from scripts.zed_batch_svo_grid_to_mp4 import main as grid_main
from scripts.zed_batch_svo_to_mcap import main as mcap_main
@dataclasses.dataclass(slots=True, frozen=True)
class FakeScan:
segment_dir: Path
matched_files: int
is_valid: bool
reason: str | None = None
def fake_scan(segment_dir: Path) -> FakeScan:
if not segment_dir.is_dir():
return FakeScan(segment_dir=segment_dir, matched_files=0, is_valid=False, reason="missing directory")
if (segment_dir / "valid.segment").is_file():
return FakeScan(segment_dir=segment_dir, matched_files=2, is_valid=True)
if (segment_dir / "partial.segment").is_file():
return FakeScan(segment_dir=segment_dir, matched_files=1, is_valid=False, reason="partial segment")
return FakeScan(segment_dir=segment_dir, matched_files=0, is_valid=False, reason="no camera files")
def create_multicamera_segment(parent: Path, segment_name: str) -> Path:
segment_dir = parent / segment_name
segment_dir.mkdir(parents=True)
for camera_index in range(1, 5):
(segment_dir / f"{segment_name}_zed{camera_index}.svo2").write_bytes(b"")
return segment_dir
class SharedSourceResolutionTests(unittest.TestCase):
def test_dataset_root_recursive_discovers_nested_segments(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
segment_dir = dataset_root / "run" / "2026-04-08T11-50-32"
segment_dir.mkdir(parents=True)
(segment_dir / "valid.segment").write_text("", encoding="utf-8")
sources = segment_sources.resolve_sources(
dataset_root,
(),
None,
None,
True,
scan_segment_dir=fake_scan,
no_matches_message=lambda root: f"no segments under {root}",
)
self.assertEqual(sources.mode, "dataset-root")
self.assertEqual(sources.segment_dirs, (segment_dir.resolve(),))
def test_dataset_root_without_recursive_does_not_descend(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
segment_dir = dataset_root / "run" / "2026-04-08T11-50-32"
segment_dir.mkdir(parents=True)
(segment_dir / "valid.segment").write_text("", encoding="utf-8")
with self.assertRaises(click.ClickException) as error:
segment_sources.resolve_sources(
dataset_root,
(),
None,
None,
False,
scan_segment_dir=fake_scan,
no_matches_message=lambda root: f"no segments under {root}",
)
self.assertIn("no segments under", str(error.exception))
def test_explicit_segments_are_deduped(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
segment_dir = Path(tmp) / "2026-04-08T11-50-32"
segment_dir.mkdir()
(segment_dir / "valid.segment").write_text("", encoding="utf-8")
sources = segment_sources.resolve_sources(
None,
(segment_dir, segment_dir),
None,
None,
True,
scan_segment_dir=fake_scan,
no_matches_message=lambda root: f"no segments under {root}",
)
self.assertEqual(sources.mode, "segments")
self.assertEqual(sources.segment_dirs, (segment_dir.resolve(),))
def test_segments_csv_uses_segment_dir_column(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
segment_dir = tmp_path / "segments" / "2026-04-08T11-50-32"
segment_dir.mkdir(parents=True)
(segment_dir / "valid.segment").write_text("", encoding="utf-8")
csv_path = tmp_path / "segments.csv"
csv_path.write_text("segment_dir\nsegments/2026-04-08T11-50-32\n", encoding="utf-8")
sources = segment_sources.resolve_sources(
None,
(),
csv_path,
None,
True,
scan_segment_dir=fake_scan,
no_matches_message=lambda root: f"no segments under {root}",
)
self.assertEqual(sources.mode, "segments-csv")
self.assertEqual(sources.segment_dirs, (segment_dir.resolve(),))
def test_segment_path_like_dataset_root_has_hint(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
segment_dir = dataset_root / "run" / "2026-04-08T11-50-32"
segment_dir.mkdir(parents=True)
(segment_dir / "valid.segment").write_text("", encoding="utf-8")
with self.assertRaises(click.ClickException) as error:
segment_sources.resolve_sources(
None,
(dataset_root,),
None,
None,
True,
scan_segment_dir=fake_scan,
no_matches_message=lambda root: f"no segments under {root}",
)
message = str(error.exception)
self.assertIn("looks like a dataset root", message)
self.assertIn("--dataset-root", message)
class BatchCliSmokeTests(unittest.TestCase):
def setUp(self) -> None:
self.runner = CliRunner()
def test_mcap_dataset_root_flag_discovers_segments(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
create_multicamera_segment(dataset_root / "run", "2026-04-08T11-50-32")
result = self.runner.invoke(
mcap_main,
[
"--dataset-root",
str(dataset_root),
"--recursive",
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertEqual(result.exit_code, 0, result.output)
self.assertIn("source=dataset-root matched=1 pending=1", result.output)
def test_mcap_segment_flag_rejects_dataset_root_with_hint(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
create_multicamera_segment(dataset_root / "run", "2026-04-08T11-50-32")
result = self.runner.invoke(
mcap_main,
[
"--segment",
str(dataset_root),
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertNotEqual(result.exit_code, 0)
self.assertIn("looks like a dataset root", result.output)
self.assertIn("--dataset-root", result.output)
def test_mcap_rejects_legacy_positional_dataset_root(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
dataset_root = Path(tmp) / "dataset"
create_multicamera_segment(dataset_root / "run", "2026-04-08T11-50-32")
result = self.runner.invoke(
mcap_main,
[
str(dataset_root),
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertNotEqual(result.exit_code, 0)
self.assertIn("positional dataset paths are no longer supported", result.output)
self.assertIn("--dataset-root", result.output)
def test_mcap_rejects_recursive_without_dataset_root(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
segment_dir = create_multicamera_segment(Path(tmp), "2026-04-08T11-50-32")
result = self.runner.invoke(
mcap_main,
[
"--segment",
str(segment_dir),
"--no-recursive",
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertNotEqual(result.exit_code, 0)
self.assertIn("--recursive/--no-recursive can only be used with --dataset-root", result.output)
def test_grid_segment_flag_discovers_one_segment(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
segment_dir = create_multicamera_segment(Path(tmp), "2026-04-08T11-50-32")
result = self.runner.invoke(
grid_main,
[
"--segment",
str(segment_dir),
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertEqual(result.exit_code, 0, result.output)
self.assertIn("source=segments matched=1 pending=1", result.output)
def test_grid_rejects_legacy_segment_dir_flag(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
segment_dir = create_multicamera_segment(Path(tmp), "2026-04-08T11-50-32")
result = self.runner.invoke(
grid_main,
[
"--segment-dir",
str(segment_dir),
"--dry-run",
"--zed-bin",
"/bin/true",
],
)
self.assertNotEqual(result.exit_code, 0)
self.assertIn("--segment-dir is no longer supported", result.output)
self.assertIn("--segment", result.output)
if __name__ == "__main__":
unittest.main()
Generated
+14
View File
@@ -40,6 +40,7 @@ dependencies = [
{ name = "opencv-python-headless" },
{ name = "progress-table" },
{ name = "protobuf" },
{ name = "tqdm" },
{ name = "zstandard" },
]
@@ -59,6 +60,7 @@ requires-dist = [
{ name = "progress-table", specifier = ">=3.2" },
{ name = "protobuf", specifier = ">=5.29" },
{ name = "rvl-impl", marker = "python_full_version >= '3.12' and extra == 'viewer'", git = "https://github.com/crosstyan/rvl-impl.git?rev=74308bcaf184cb39428237e8f4f99a67a6de22d9" },
{ name = "tqdm", specifier = ">=4.67" },
{ name = "zstandard", specifier = ">=0.23" },
]
provides-extras = ["viewer"]
@@ -327,6 +329,18 @@ name = "rvl-impl"
version = "0.1.0"
source = { git = "https://github.com/crosstyan/rvl-impl.git?rev=74308bcaf184cb39428237e8f4f99a67a6de22d9#74308bcaf184cb39428237e8f4f99a67a6de22d9" }
[[package]]
name = "tqdm"
version = "4.67.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/09/a9/6ba95a270c6f1fbcd8dac228323f2777d886cb206987444e4bce66338dd4/tqdm-4.67.3.tar.gz", hash = "sha256:7d825f03f89244ef73f1d4ce193cb1774a8179fd96f31d7e1dcde62092b960bb", size = 169598, upload-time = "2026-02-03T17:35:53.048Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/16/e1/3079a9ff9b8e11b846c6ac5c8b5bfb7ff225eee721825310c91b3b50304f/tqdm-4.67.3-py3-none-any.whl", hash = "sha256:ee1e4c0e59148062281c49d80b25b67771a127c85fc9676d3be5f243206826bf", size = 78374, upload-time = "2026-02-03T17:35:50.982Z" },
]
[[package]]
name = "wcwidth"
version = "0.6.0"