#!/usr/bin/env python3 from __future__ import annotations from collections import Counter from dataclasses import dataclass, field from pathlib import Path import re import click import zed_batch_svo_to_mcap as batch BUNDLE_TOPIC = "/bundle" CAMERA_PREFIX = "/camera/" NAMESPACED_TOPIC_PATTERN = re.compile(r"^/([^/]+)/([^/]+)$") SINGLE_TOPIC_SCHEMA_NAMES = { "/camera/video": "foxglove.CompressedVideo", "/camera/depth": "cvmmap_streamer.DepthMap", "/camera/calibration": "foxglove.CameraCalibration", "/camera/depth_calibration": "foxglove.CameraCalibration", "/camera/pose": "foxglove.PoseInFrame", } @dataclass(slots=True) class CameraSummary: video_messages: int = 0 depth_messages: int = 0 pose_messages: int = 0 calibration_messages: int = 0 depth_calibration_messages: int = 0 body_messages: int = 0 present_members: int = 0 corrupted_gap_members: int = 0 unknown_members: int = 0 @dataclass(slots=True) class McapSummary: path: Path layout: str = "unknown" validation_status: str = "invalid" validation_reason: str = "" camera_labels: tuple[str, ...] = () bundle_count: int = 0 policy_counts: Counter[str] = field(default_factory=Counter) camera_stats: dict[str, CameraSummary] = field(default_factory=dict) schema_mismatches: list[str] = field(default_factory=list) def iter_mcap_paths(inputs: tuple[Path, ...], recursive: bool) -> list[Path]: discovered: list[Path] = [] for input_path in inputs: resolved = input_path.expanduser().resolve() if resolved.is_file(): discovered.append(resolved) continue if resolved.is_dir(): pattern = "*.mcap" if not recursive else "**/*.mcap" discovered.extend(sorted(resolved.glob(pattern))) continue raise click.ClickException(f"path does not exist: {resolved}") return sorted(dict.fromkeys(discovered)) def policy_name_from_message(bundle_message: object) -> str: descriptor = bundle_message.DESCRIPTOR.enum_types_by_name.get("BundlePolicy") if descriptor is None: return str(bundle_message.policy) value = descriptor.values_by_number.get(bundle_message.policy) return value.name if value is not None else str(bundle_message.policy) def status_name_from_member(member: object, present_value: int | None) -> str: if present_value is None: return "PRESENT" if member.HasField("timestamp") else "UNKNOWN" field_descriptor = member.DESCRIPTOR.fields_by_name.get("status") descriptor = field_descriptor.enum_type if field_descriptor is not None else None if descriptor is None: return "PRESENT" if member.status == present_value else "UNKNOWN" value = descriptor.values_by_number.get(member.status) return value.name if value is not None else str(member.status) def record_single_camera_topic( summary: McapSummary, topic: str, schema_name: str | None, ) -> None: stats = summary.camera_stats.setdefault("camera", CameraSummary()) if topic == "/camera/video": stats.video_messages += 1 elif topic == "/camera/depth": stats.depth_messages += 1 elif topic == "/camera/pose": stats.pose_messages += 1 elif topic == "/camera/calibration": stats.calibration_messages += 1 elif topic == "/camera/depth_calibration": stats.depth_calibration_messages += 1 elif topic == "/camera/body": stats.body_messages += 1 expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic) if expected_schema is not None and schema_name != expected_schema: summary.schema_mismatches.append( f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'" ) def probe_single_camera_output(path: Path) -> batch.OutputProbeResult: base_probe = batch.probe_output(path, ("camera",), bundle_topic=None) if base_probe.status != "valid": return base_probe reader_module = batch.load_mcap_reader() stats = CameraSummary() schema_mismatches: list[str] = [] try: with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, _message in reader.iter_messages(): topic = channel.topic schema_name = schema.name if schema is not None else None if topic == "/camera/video": stats.video_messages += 1 elif topic == "/camera/depth": stats.depth_messages += 1 elif topic == "/camera/pose": stats.pose_messages += 1 elif topic == "/camera/calibration": stats.calibration_messages += 1 elif topic == "/camera/depth_calibration": stats.depth_calibration_messages += 1 elif topic == "/camera/body": stats.body_messages += 1 expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic) if expected_schema is not None and schema_name != expected_schema: schema_mismatches.append( f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'" ) except Exception as error: # noqa: BLE001 return batch.OutputProbeResult(output_path=path, status="invalid", reason=str(error)) if schema_mismatches: return batch.OutputProbeResult( output_path=path, status="invalid", reason=schema_mismatches[0], ) if stats.video_messages == 0: return batch.OutputProbeResult( output_path=path, status="invalid", reason="single-camera MCAP has no /camera/video messages", ) if stats.depth_messages == 0: return batch.OutputProbeResult( output_path=path, status="invalid", reason="single-camera MCAP has no /camera/depth messages", ) if stats.video_messages != stats.depth_messages: return batch.OutputProbeResult( output_path=path, status="invalid", reason=( "single-camera video/depth count mismatch: " f"video_messages={stats.video_messages} depth_messages={stats.depth_messages}" ), ) if stats.calibration_messages != 1: return batch.OutputProbeResult( output_path=path, status="invalid", reason=( "single-camera calibration count mismatch: " f"/camera/calibration={stats.calibration_messages}" ), ) if stats.depth_calibration_messages not in (0, 1): return batch.OutputProbeResult( output_path=path, status="invalid", reason=( "single-camera depth calibration count mismatch: " f"/camera/depth_calibration={stats.depth_calibration_messages}" ), ) if stats.pose_messages > stats.video_messages: return batch.OutputProbeResult( output_path=path, status="invalid", reason=( "single-camera pose count exceeds video count: " f"pose_messages={stats.pose_messages} video_messages={stats.video_messages}" ), ) return batch.OutputProbeResult(output_path=path, status="valid") def summarize_mcap(path: Path) -> McapSummary: reader_module = batch.load_mcap_reader() summary = McapSummary(path=path) camera_labels: set[str] = set() saw_single_camera_topic = False saw_namespaced_camera_topic = False with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): topic = channel.topic schema_name = schema.name if schema is not None else None if topic == BUNDLE_TOPIC: summary.layout = "bundled" if schema is None or schema.name != "cvmmap_streamer.BundleManifest": summary.validation_status = "invalid" summary.validation_reason = f"bundle topic '{BUNDLE_TOPIC}' is missing the BundleManifest schema" continue bundle_class, present_value = batch.load_bundle_manifest_type(schema.data) bundle = bundle_class() bundle.ParseFromString(message.data) summary.bundle_count += 1 summary.policy_counts[policy_name_from_message(bundle)] += 1 for member in bundle.members: label = str(member.camera_label) camera_labels.add(label) stats = summary.camera_stats.setdefault(label, CameraSummary()) status_name = status_name_from_member(member, present_value) if status_name == "BUNDLE_MEMBER_STATUS_PRESENT" or status_name == "PRESENT": stats.present_members += 1 elif status_name == "BUNDLE_MEMBER_STATUS_CORRUPTED_GAP": stats.corrupted_gap_members += 1 else: stats.unknown_members += 1 continue if topic.startswith(CAMERA_PREFIX): saw_single_camera_topic = True if summary.layout == "unknown": summary.layout = "single-camera" record_single_camera_topic(summary, topic, schema_name) continue match = NAMESPACED_TOPIC_PATTERN.match(topic) if not match: continue label, stream_kind = match.groups() if label == "camera": continue saw_namespaced_camera_topic = True if summary.layout == "unknown": summary.layout = "bundled" camera_labels.add(label) stats = summary.camera_stats.setdefault(label, CameraSummary()) if stream_kind == "video": stats.video_messages += 1 elif stream_kind == "depth": stats.depth_messages += 1 elif stream_kind == "pose": stats.pose_messages += 1 elif stream_kind == "calibration": stats.calibration_messages += 1 elif stream_kind == "depth_calibration": stats.depth_calibration_messages += 1 elif stream_kind == "body": stats.body_messages += 1 if saw_single_camera_topic and saw_namespaced_camera_topic: summary.layout = "mixed" summary.validation_status = "invalid" summary.validation_reason = "MCAP mixes single-camera and bundled topic layouts" return summary if summary.layout == "single-camera": summary.camera_labels = ("camera",) probe = probe_single_camera_output(path) summary.validation_status = probe.status summary.validation_reason = probe.reason if summary.schema_mismatches and summary.validation_status == "valid": summary.validation_status = "invalid" summary.validation_reason = summary.schema_mismatches[0] return summary summary.camera_labels = tuple(sorted(camera_labels)) if summary.camera_labels: probe = batch.probe_output( path, summary.camera_labels, bundle_topic=BUNDLE_TOPIC if summary.layout == "bundled" else None, ) summary.validation_status = probe.status summary.validation_reason = probe.reason else: summary.validation_status = "invalid" summary.validation_reason = "could not infer a supported MCAP layout from topics" return summary def print_summary(summary: McapSummary) -> None: status_text = summary.validation_status layout_text = summary.layout cameras_text = ",".join(summary.camera_labels) if summary.camera_labels else "-" policy_text = ",".join( f"{policy}={count}" for policy, count in sorted(summary.policy_counts.items()) ) or "-" click.echo( f"{status_text}: {summary.path} [{layout_text}] bundles={summary.bundle_count} " f"cameras={cameras_text} policies={policy_text}" ) for label in summary.camera_labels: stats = summary.camera_stats[label] click.echo( " " f"{label}: video={stats.video_messages} depth={stats.depth_messages} pose={stats.pose_messages} " f"calibration={stats.calibration_messages} depth_calibration={stats.depth_calibration_messages} " f"body={stats.body_messages} present={stats.present_members} " f"corrupted_gap={stats.corrupted_gap_members} unknown={stats.unknown_members}" ) if summary.validation_reason: click.echo(f" reason: {summary.validation_reason}") @click.command() @click.argument("paths", nargs=-1, type=click.Path(path_type=Path)) @click.option("--recursive", is_flag=True, help="Recursively discover *.mcap files under directory inputs.") def main(paths: tuple[Path, ...], recursive: bool) -> None: """Summarize and validate single-camera or bundled MCAP files.""" if not paths: raise click.ClickException("provide at least one MCAP file or directory") mcap_paths = iter_mcap_paths(paths, recursive=recursive) if not mcap_paths: raise click.ClickException("no .mcap files matched the provided inputs") invalid_count = 0 for path in mcap_paths: summary = summarize_mcap(path) print_summary(summary) if summary.validation_status != "valid": invalid_count += 1 if invalid_count: raise SystemExit(1) if __name__ == "__main__": main()