diff --git a/README.md b/README.md index 49f0cd1..e8d0bcb 100644 --- a/README.md +++ b/README.md @@ -269,6 +269,7 @@ In bundled multi-camera mode, `--start-frame` and `--end-frame` mean the first a When stderr is attached to a TTY, `zed_batch_svo_to_mcap.py` uses a `progress-table` view by default; otherwise it emits line-oriented start/completion/failure logs plus periodic heartbeat summaries. Use `--progress-ui table` or `--progress-ui text` to override the automatic mode selection. Bundled MCAP export now defaults to `--bundle-policy nearest`. That mode emits one `/bundle` manifest message per bundle timestamp on the common timeline and keeps the original per-camera timestamps on `/zedN/video`, `/zedN/depth`, and optional `/zedN/pose`. Consumers that care about grouping should follow `/bundle` instead of inferring bundle membership from identical message timestamps. Use `--bundle-policy strict` when you want the older thresholded sync behavior, and `--sync-tolerance-ms` only applies in that strict mode. +See [docs/mcap_layout.md](./docs/mcap_layout.md) for the full bundled and single-camera MCAP topic contract. ### Why Mixed Hardware/Software Mode Exists diff --git a/docs/mcap_layout.md b/docs/mcap_layout.md new file mode 100644 index 0000000..2596eb9 --- /dev/null +++ b/docs/mcap_layout.md @@ -0,0 +1,114 @@ +# MCAP Layout + +`cvmmap-streamer` writes two related MCAP layouts: + +- single-camera MCAP export +- bundled multi-camera MCAP export + +This document covers the topic layout, schema types, and timestamp semantics for both. + +## Single-Camera Layout + +Default topics: + +| Topic | Schema / Encoding | Notes | +|------|------|------| +| `/camera/video` | `foxglove.CompressedVideo` | H.264 or H.265 Annex B access units | +| `/camera/depth` | `cvmmap_streamer.DepthMap` | RVL-compressed depth payload | +| `/camera/calibration` | `foxglove.CameraCalibration` | Video intrinsics | +| `/camera/depth_calibration` | `foxglove.CameraCalibration` | Written only when depth resolution differs from video | +| `/camera/pose` | `foxglove.PoseInFrame` | Optional; only when pose export is enabled and tracking is valid | +| `/camera/body` | raw `cvmmap.body_tracking.v1` | Optional raw body-tracking packets; see [mcap_body_tracking.md](./mcap_body_tracking.md) | + +Single-camera export preserves the original per-frame source timestamp on video, depth, and pose messages. + +## Bundled Multi-Camera Layout + +Bundled export namespaces each camera stream and adds one bundle manifest topic: + +| Topic | Schema / Encoding | Notes | +|------|------|------| +| `/bundle` | `cvmmap_streamer.BundleManifest` | One message per emitted bundle | +| `/zedN/video` | `foxglove.CompressedVideo` | Per-camera encoded video | +| `/zedN/depth` | `cvmmap_streamer.DepthMap` | Per-camera depth | +| `/zedN/calibration` | `foxglove.CameraCalibration` | Per-camera video intrinsics | +| `/zedN/depth_calibration` | `foxglove.CameraCalibration` | Written only when exported depth resolution differs from video | +| `/zedN/pose` | `foxglove.PoseInFrame` | Optional per-camera pose | +| `/zedN/body` | raw `cvmmap.body_tracking.v1` | Optional raw body packets; see [mcap_body_tracking.md](./mcap_body_tracking.md) | + +`nearest` is the default bundled policy. It emits one bundle on a common timeline, but it keeps the original per-camera timestamps on `/zedN/video`, `/zedN/depth`, and optional `/zedN/pose`. + +`strict` is still available. In strict mode, bundle membership is thresholded by timestamp skew and `--sync-tolerance-ms` applies. In `nearest` mode, `--sync-tolerance-ms` is not used. + +## Bundle Manifest Contract + +`/bundle` is the authoritative grouping contract for bundled MCAP files. Consumers should not infer grouping from identical MCAP `logTime` values or from matching per-camera timestamps. + +`cvmmap_streamer.BundleManifest` contains: + +- `timestamp`: the nominal bundle timestamp on the common timeline +- `bundle_index`: zero-based emitted bundle index +- `policy`: `BUNDLE_POLICY_NEAREST` or `BUNDLE_POLICY_STRICT` +- `members`: one entry per camera label in the bundle + +Each bundle member contains: + +- `camera_label` +- `timestamp`: the original camera sample timestamp when a payload is present +- `delta_ns`: `member.timestamp - bundle.timestamp` when a payload is present +- `status` +- `corrupted_frames_skipped` + +Current member statuses: + +- `BUNDLE_MEMBER_STATUS_PRESENT` +- `BUNDLE_MEMBER_STATUS_CORRUPTED_GAP` + +`CORRUPTED_GAP` means the exporter skipped one or more unreadable ZED frames and intentionally emitted no video, depth, or pose payload for that camera for this bundle. `corrupted_frames_skipped` reports the size of the skipped run that led to the next recovered readable frame. + +## Timestamp Semantics + +The bundled MCAP contract intentionally separates bundle time from per-camera sample time: + +- `/bundle.logTime` and `/bundle.publishTime` use the nominal bundle timestamp +- `/zedN/video`, `/zedN/depth`, and `/zedN/pose` use the original camera sample timestamp +- calibration and depth-calibration messages use the timestamp of the first emitted sample on that camera + +This means a single bundle can legitimately contain different per-camera timestamps, especially in `nearest` mode. + +## Corruption And Partial Bundles + +Bundled `nearest` export is resilient to ZED `CORRUPTED FRAME` runs: + +- unreadable tail frames are treated as end-of-stream +- mid-stream corruption is skipped until a readable frame is found +- bundles inside the unreadable gap still exist +- the affected camera is marked `CORRUPTED_GAP` in `/bundle` +- no `/zedN/video`, `/zedN/depth`, or `/zedN/pose` message is written for that camera for those bundles + +Bundled `strict` export stays strict: + +- corruption is skipped internally until recovery +- only fully present, threshold-qualified groups are emitted + +## Validation Expectations + +For single-camera MCAP files, the current validation contract is: + +- `/camera/video` must exist and contain at least one message +- `/camera/depth` must exist and contain at least one message +- `/camera/calibration` must exist exactly once +- `/camera/video` and `/camera/depth` message counts must match +- `/camera/depth_calibration` may appear zero or one time +- `/camera/pose` is optional, but it may not outnumber `/camera/video` + +For bundled MCAP files, the current validation contract is: + +- `/bundle` must exist +- every bundle must contain one member per camera label +- for each camera, the count of `BUNDLE_MEMBER_STATUS_PRESENT` members must match the number of `/zedN/video` messages +- for each camera, the count of `BUNDLE_MEMBER_STATUS_PRESENT` members must match the number of `/zedN/depth` messages + +That is why a partially written MCAP with topic presence but mismatched counts is treated as invalid. + +The repository-level Python helper [scripts/mcap_bundle_validator.py](../scripts/mcap_bundle_validator.py) now understands both layouts and reports which one it found before applying the corresponding validation rules. diff --git a/scripts/mcap_bundle_validator.py b/scripts/mcap_bundle_validator.py new file mode 100644 index 0000000..40624ec --- /dev/null +++ b/scripts/mcap_bundle_validator.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +from collections import Counter +from dataclasses import dataclass, field +from pathlib import Path +import re + +import click + +import zed_batch_svo_to_mcap as batch + + +BUNDLE_TOPIC = "/bundle" +CAMERA_PREFIX = "/camera/" +NAMESPACED_TOPIC_PATTERN = re.compile(r"^/([^/]+)/([^/]+)$") + +SINGLE_TOPIC_SCHEMA_NAMES = { + "/camera/video": "foxglove.CompressedVideo", + "/camera/depth": "cvmmap_streamer.DepthMap", + "/camera/calibration": "foxglove.CameraCalibration", + "/camera/depth_calibration": "foxglove.CameraCalibration", + "/camera/pose": "foxglove.PoseInFrame", +} + + +@dataclass(slots=True) +class CameraSummary: + video_messages: int = 0 + depth_messages: int = 0 + pose_messages: int = 0 + calibration_messages: int = 0 + depth_calibration_messages: int = 0 + body_messages: int = 0 + present_members: int = 0 + corrupted_gap_members: int = 0 + unknown_members: int = 0 + + +@dataclass(slots=True) +class McapSummary: + path: Path + layout: str = "unknown" + validation_status: str = "invalid" + validation_reason: str = "" + camera_labels: tuple[str, ...] = () + bundle_count: int = 0 + policy_counts: Counter[str] = field(default_factory=Counter) + camera_stats: dict[str, CameraSummary] = field(default_factory=dict) + schema_mismatches: list[str] = field(default_factory=list) + + +def iter_mcap_paths(inputs: tuple[Path, ...], recursive: bool) -> list[Path]: + discovered: list[Path] = [] + for input_path in inputs: + resolved = input_path.expanduser().resolve() + if resolved.is_file(): + discovered.append(resolved) + continue + if resolved.is_dir(): + pattern = "*.mcap" if not recursive else "**/*.mcap" + discovered.extend(sorted(resolved.glob(pattern))) + continue + raise click.ClickException(f"path does not exist: {resolved}") + return sorted(dict.fromkeys(discovered)) + + +def policy_name_from_message(bundle_message: object) -> str: + descriptor = bundle_message.DESCRIPTOR.enum_types_by_name.get("BundlePolicy") + if descriptor is None: + return str(bundle_message.policy) + value = descriptor.values_by_number.get(bundle_message.policy) + return value.name if value is not None else str(bundle_message.policy) + + +def status_name_from_member(member: object, present_value: int | None) -> str: + if present_value is None: + return "PRESENT" if member.HasField("timestamp") else "UNKNOWN" + field_descriptor = member.DESCRIPTOR.fields_by_name.get("status") + descriptor = field_descriptor.enum_type if field_descriptor is not None else None + if descriptor is None: + return "PRESENT" if member.status == present_value else "UNKNOWN" + value = descriptor.values_by_number.get(member.status) + return value.name if value is not None else str(member.status) + + +def record_single_camera_topic( + summary: McapSummary, + topic: str, + schema_name: str | None, +) -> None: + stats = summary.camera_stats.setdefault("camera", CameraSummary()) + if topic == "/camera/video": + stats.video_messages += 1 + elif topic == "/camera/depth": + stats.depth_messages += 1 + elif topic == "/camera/pose": + stats.pose_messages += 1 + elif topic == "/camera/calibration": + stats.calibration_messages += 1 + elif topic == "/camera/depth_calibration": + stats.depth_calibration_messages += 1 + elif topic == "/camera/body": + stats.body_messages += 1 + + expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic) + if expected_schema is not None and schema_name != expected_schema: + summary.schema_mismatches.append( + f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'" + ) + + +def probe_single_camera_output(path: Path) -> batch.OutputProbeResult: + base_probe = batch.probe_output(path, ("camera",), bundle_topic=None) + if base_probe.status != "valid": + return base_probe + + reader_module = batch.load_mcap_reader() + stats = CameraSummary() + schema_mismatches: list[str] = [] + + try: + with path.open("rb") as stream: + reader = reader_module.make_reader(stream) + for schema, channel, _message in reader.iter_messages(): + topic = channel.topic + schema_name = schema.name if schema is not None else None + if topic == "/camera/video": + stats.video_messages += 1 + elif topic == "/camera/depth": + stats.depth_messages += 1 + elif topic == "/camera/pose": + stats.pose_messages += 1 + elif topic == "/camera/calibration": + stats.calibration_messages += 1 + elif topic == "/camera/depth_calibration": + stats.depth_calibration_messages += 1 + elif topic == "/camera/body": + stats.body_messages += 1 + + expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic) + if expected_schema is not None and schema_name != expected_schema: + schema_mismatches.append( + f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'" + ) + except Exception as error: # noqa: BLE001 + return batch.OutputProbeResult(output_path=path, status="invalid", reason=str(error)) + + if schema_mismatches: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason=schema_mismatches[0], + ) + if stats.video_messages == 0: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason="single-camera MCAP has no /camera/video messages", + ) + if stats.depth_messages == 0: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason="single-camera MCAP has no /camera/depth messages", + ) + if stats.video_messages != stats.depth_messages: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason=( + "single-camera video/depth count mismatch: " + f"video_messages={stats.video_messages} depth_messages={stats.depth_messages}" + ), + ) + if stats.calibration_messages != 1: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason=( + "single-camera calibration count mismatch: " + f"/camera/calibration={stats.calibration_messages}" + ), + ) + if stats.depth_calibration_messages not in (0, 1): + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason=( + "single-camera depth calibration count mismatch: " + f"/camera/depth_calibration={stats.depth_calibration_messages}" + ), + ) + if stats.pose_messages > stats.video_messages: + return batch.OutputProbeResult( + output_path=path, + status="invalid", + reason=( + "single-camera pose count exceeds video count: " + f"pose_messages={stats.pose_messages} video_messages={stats.video_messages}" + ), + ) + return batch.OutputProbeResult(output_path=path, status="valid") + + +def summarize_mcap(path: Path) -> McapSummary: + reader_module = batch.load_mcap_reader() + summary = McapSummary(path=path) + camera_labels: set[str] = set() + saw_single_camera_topic = False + saw_namespaced_camera_topic = False + + with path.open("rb") as stream: + reader = reader_module.make_reader(stream) + for schema, channel, message in reader.iter_messages(): + topic = channel.topic + schema_name = schema.name if schema is not None else None + if topic == BUNDLE_TOPIC: + summary.layout = "bundled" + if schema is None or schema.name != "cvmmap_streamer.BundleManifest": + summary.validation_status = "invalid" + summary.validation_reason = f"bundle topic '{BUNDLE_TOPIC}' is missing the BundleManifest schema" + continue + + bundle_class, present_value = batch.load_bundle_manifest_type(schema.data) + bundle = bundle_class() + bundle.ParseFromString(message.data) + summary.bundle_count += 1 + summary.policy_counts[policy_name_from_message(bundle)] += 1 + + for member in bundle.members: + label = str(member.camera_label) + camera_labels.add(label) + stats = summary.camera_stats.setdefault(label, CameraSummary()) + status_name = status_name_from_member(member, present_value) + if status_name == "BUNDLE_MEMBER_STATUS_PRESENT" or status_name == "PRESENT": + stats.present_members += 1 + elif status_name == "BUNDLE_MEMBER_STATUS_CORRUPTED_GAP": + stats.corrupted_gap_members += 1 + else: + stats.unknown_members += 1 + continue + + if topic.startswith(CAMERA_PREFIX): + saw_single_camera_topic = True + if summary.layout == "unknown": + summary.layout = "single-camera" + record_single_camera_topic(summary, topic, schema_name) + continue + + match = NAMESPACED_TOPIC_PATTERN.match(topic) + if not match: + continue + label, stream_kind = match.groups() + if label == "camera": + continue + saw_namespaced_camera_topic = True + if summary.layout == "unknown": + summary.layout = "bundled" + camera_labels.add(label) + stats = summary.camera_stats.setdefault(label, CameraSummary()) + if stream_kind == "video": + stats.video_messages += 1 + elif stream_kind == "depth": + stats.depth_messages += 1 + elif stream_kind == "pose": + stats.pose_messages += 1 + elif stream_kind == "calibration": + stats.calibration_messages += 1 + elif stream_kind == "depth_calibration": + stats.depth_calibration_messages += 1 + elif stream_kind == "body": + stats.body_messages += 1 + + if saw_single_camera_topic and saw_namespaced_camera_topic: + summary.layout = "mixed" + summary.validation_status = "invalid" + summary.validation_reason = "MCAP mixes single-camera and bundled topic layouts" + return summary + + if summary.layout == "single-camera": + summary.camera_labels = ("camera",) + probe = probe_single_camera_output(path) + summary.validation_status = probe.status + summary.validation_reason = probe.reason + if summary.schema_mismatches and summary.validation_status == "valid": + summary.validation_status = "invalid" + summary.validation_reason = summary.schema_mismatches[0] + return summary + + summary.camera_labels = tuple(sorted(camera_labels)) + if summary.camera_labels: + probe = batch.probe_output( + path, + summary.camera_labels, + bundle_topic=BUNDLE_TOPIC if summary.layout == "bundled" else None, + ) + summary.validation_status = probe.status + summary.validation_reason = probe.reason + else: + summary.validation_status = "invalid" + summary.validation_reason = "could not infer a supported MCAP layout from topics" + return summary + + +def print_summary(summary: McapSummary) -> None: + status_text = summary.validation_status + layout_text = summary.layout + cameras_text = ",".join(summary.camera_labels) if summary.camera_labels else "-" + policy_text = ",".join( + f"{policy}={count}" + for policy, count in sorted(summary.policy_counts.items()) + ) or "-" + click.echo( + f"{status_text}: {summary.path} [{layout_text}] bundles={summary.bundle_count} " + f"cameras={cameras_text} policies={policy_text}" + ) + for label in summary.camera_labels: + stats = summary.camera_stats[label] + click.echo( + " " + f"{label}: video={stats.video_messages} depth={stats.depth_messages} pose={stats.pose_messages} " + f"calibration={stats.calibration_messages} depth_calibration={stats.depth_calibration_messages} " + f"body={stats.body_messages} present={stats.present_members} " + f"corrupted_gap={stats.corrupted_gap_members} unknown={stats.unknown_members}" + ) + if summary.validation_reason: + click.echo(f" reason: {summary.validation_reason}") + + +@click.command() +@click.argument("paths", nargs=-1, type=click.Path(path_type=Path)) +@click.option("--recursive", is_flag=True, help="Recursively discover *.mcap files under directory inputs.") +def main(paths: tuple[Path, ...], recursive: bool) -> None: + """Summarize and validate single-camera or bundled MCAP files.""" + if not paths: + raise click.ClickException("provide at least one MCAP file or directory") + + mcap_paths = iter_mcap_paths(paths, recursive=recursive) + if not mcap_paths: + raise click.ClickException("no .mcap files matched the provided inputs") + + invalid_count = 0 + for path in mcap_paths: + summary = summarize_mcap(path) + print_summary(summary) + if summary.validation_status != "valid": + invalid_count += 1 + + if invalid_count: + raise SystemExit(1) + + +if __name__ == "__main__": + main()