#!/usr/bin/env python3 from __future__ import annotations from dataclasses import asdict, dataclass, field import json from pathlib import Path import subprocess import tempfile import click import cv2 import numpy as np from google.protobuf import descriptor_pb2, descriptor_pool, message_factory, timestamp_pb2 import mcap_bundle_validator as bundle_validator import zed_batch_svo_to_mcap as batch BUNDLE_TOPIC = "/bundle" DEPTH_PALETTE_TO_OPENCV = { "Gray": None, "Turbo": cv2.COLORMAP_TURBO, "Inferno": cv2.COLORMAP_INFERNO, "Plasma": cv2.COLORMAP_PLASMA, "Viridis": cv2.COLORMAP_VIRIDIS, "Cividis": cv2.COLORMAP_CIVIDIS, "Magma": cv2.COLORMAP_MAGMA, "Parula": cv2.COLORMAP_PARULA, } VIDEO_INPUT_FORMATS = {"h264": "h264", "h265": "hevc"} _MESSAGE_CLASS_CACHE: dict[tuple[bytes, str], object] = {} @dataclass(slots=True) class TimestampRange: start_ns: int | None = None end_ns: int | None = None def update(self, timestamp_ns: int) -> None: if self.start_ns is None or timestamp_ns < self.start_ns: self.start_ns = timestamp_ns if self.end_ns is None or timestamp_ns > self.end_ns: self.end_ns = timestamp_ns @dataclass(slots=True) class CameraRanges: video: TimestampRange = field(default_factory=TimestampRange) depth: TimestampRange = field(default_factory=TimestampRange) @dataclass(slots=True) class RecipeSummary: base: bundle_validator.McapSummary bundle_timestamps: TimestampRange = field(default_factory=TimestampRange) camera_ranges: dict[str, CameraRanges] = field(default_factory=dict) @dataclass(slots=True) class VideoSample: timestamp_ns: int format_name: str stream_index: int @dataclass(slots=True) class DepthSample: timestamp_ns: int payload: bytes stream_index: int width: int height: int encoding_name: str source_unit_name: str storage_unit_name: str @dataclass(slots=True) class BundleMemberSample: bundle_index: int bundle_timestamp_ns: int member_timestamp_ns: int | None status_name: str corrupted_frames_skipped: int member_stream_index: int def load_message_class(schema_data: bytes, message_type_name: str): cache_key = (schema_data, message_type_name) cached = _MESSAGE_CLASS_CACHE.get(cache_key) if cached is not None: return cached descriptor_set = descriptor_pb2.FileDescriptorSet() descriptor_set.ParseFromString(schema_data) pool = descriptor_pool.DescriptorPool() has_embedded_timestamp = any( file_descriptor.name == "google/protobuf/timestamp.proto" for file_descriptor in descriptor_set.file ) if has_embedded_timestamp: for file_descriptor in descriptor_set.file: if file_descriptor.name == "google/protobuf/timestamp.proto": pool.Add(file_descriptor) break else: pool.AddSerializedFile(timestamp_pb2.DESCRIPTOR.serialized_pb) for file_descriptor in descriptor_set.file: if file_descriptor.name == "google/protobuf/timestamp.proto": continue pool.Add(file_descriptor) message_descriptor = pool.FindMessageTypeByName(message_type_name) message_class = message_factory.GetMessageClass(message_descriptor) _MESSAGE_CLASS_CACHE[cache_key] = message_class return message_class def parse_timestamp_ns(timestamp_message: object, fallback_log_time_ns: int) -> int: seconds = int(getattr(timestamp_message, "seconds", 0)) nanos = int(getattr(timestamp_message, "nanos", 0)) if seconds == 0 and nanos == 0: return fallback_log_time_ns return seconds * 1_000_000_000 + nanos def format_timestamp_ns(timestamp_ns: int | None) -> str: if timestamp_ns is None: return "-" seconds, nanos = divmod(timestamp_ns, 1_000_000_000) return f"{seconds}.{nanos:09d}" def format_range(timestamp_range: TimestampRange) -> str: return f"{format_timestamp_ns(timestamp_range.start_ns)} .. {format_timestamp_ns(timestamp_range.end_ns)}" def enum_name(message: object, field_name: str) -> str: field_descriptor = message.DESCRIPTOR.fields_by_name[field_name] value = int(getattr(message, field_name)) resolved = field_descriptor.enum_type.values_by_number.get(value) return resolved.name if resolved is not None else str(value) def is_present_status(status_name: str) -> bool: return status_name in {"PRESENT", "BUNDLE_MEMBER_STATUS_PRESENT"} def topic_for(layout: str, camera_label: str, kind: str) -> str: if layout not in {"copy", "bundled"}: raise click.ClickException(f"unsupported layout '{layout}'") return f"/{camera_label}/{kind}" def selected_camera_label(base_summary: bundle_validator.McapSummary, camera_label: str | None) -> str: if camera_label is None: return base_summary.camera_labels[0] if camera_label not in base_summary.camera_labels: available = ", ".join(base_summary.camera_labels) raise click.ClickException(f"camera label '{camera_label}' not found. available: {available}") return camera_label def ensure_supported_layout(base_summary: bundle_validator.McapSummary) -> None: if base_summary.layout == "single-camera": raise click.ClickException( "legacy /camera/* MCAP files are not supported by this helper; regenerate them into copy-layout MCAPs" ) if base_summary.layout not in {"copy", "bundled"}: reason = base_summary.validation_reason or "unsupported MCAP layout" raise click.ClickException(reason) def summarize_mcap(path: Path) -> RecipeSummary: base_summary = bundle_validator.summarize_mcap(path) camera_ranges = { label: CameraRanges() for label in (base_summary.camera_labels or ("camera",)) } bundle_timestamps = TimestampRange() reader_module = batch.load_mcap_reader() with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): topic = channel.topic if topic == BUNDLE_TOPIC and schema is not None and schema.name == "cvmmap_streamer.BundleManifest": bundle_class, _present_value = batch.load_bundle_manifest_type(schema.data) bundle_message = bundle_class() bundle_message.ParseFromString(message.data) bundle_timestamps.update(parse_timestamp_ns(bundle_message.timestamp, int(message.log_time))) continue if topic.endswith("/video"): if topic == "/camera/video": label = "camera" else: label = topic.removeprefix("/").removesuffix("/video") if schema is None or schema.name != "foxglove.CompressedVideo" or label not in camera_ranges: continue message_class = load_message_class(schema.data, "foxglove.CompressedVideo") payload = message_class() payload.ParseFromString(message.data) camera_ranges[label].video.update(parse_timestamp_ns(payload.timestamp, int(message.log_time))) continue if topic.endswith("/depth"): if topic == "/camera/depth": label = "camera" else: label = topic.removeprefix("/").removesuffix("/depth") if schema is None or schema.name != "cvmmap_streamer.DepthMap" or label not in camera_ranges: continue message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap") payload = message_class() payload.ParseFromString(message.data) camera_ranges[label].depth.update(parse_timestamp_ns(payload.timestamp, int(message.log_time))) return RecipeSummary( base=base_summary, bundle_timestamps=bundle_timestamps, camera_ranges=camera_ranges, ) def print_summary(summary: RecipeSummary) -> None: base = summary.base click.echo(f"path: {base.path}") click.echo(f"validation: {base.validation_status}") if base.validation_reason: click.echo(f"validation reason: {base.validation_reason}") click.echo(f"layout: {base.layout}") click.echo(f"camera labels: {', '.join(base.camera_labels) if base.camera_labels else '-'}") if base.layout == "bundled": click.echo(f"bundle count: {base.bundle_count}") click.echo(f"bundle timestamp range: {format_range(summary.bundle_timestamps)}") policy_text = ", ".join( f"{policy}={count}" for policy, count in sorted(base.policy_counts.items()) ) or "-" click.echo(f"bundle policies: {policy_text}") for label in base.camera_labels: stats = base.camera_stats[label] ranges = summary.camera_ranges[label] click.echo(f"camera: {label}") click.echo(f" video messages: {stats.video_messages}") click.echo(f" video timestamp range: {format_range(ranges.video)}") click.echo(f" depth messages: {stats.depth_messages}") click.echo(f" depth timestamp range: {format_range(ranges.depth)}") click.echo(f" pose messages: {stats.pose_messages}") click.echo(f" calibration messages: {stats.calibration_messages}") click.echo(f" depth calibration messages: {stats.depth_calibration_messages}") click.echo(f" body messages: {stats.body_messages}") if base.layout == "bundled": click.echo(f" present bundle members: {stats.present_members}") click.echo(f" corrupted gap members: {stats.corrupted_gap_members}") click.echo(f" unknown bundle members: {stats.unknown_members}") def decode_depth_array(depth_sample: DepthSample) -> np.ndarray: try: import rvl except ModuleNotFoundError as error: raise click.ClickException( "depth export needs the optional rvl-impl binding; run `uv sync --extra viewer`" ) from error if depth_sample.encoding_name == "RVL_U16_LOSSLESS": depth = rvl.decompress_u16(depth_sample.payload).astype(np.float32) if ( depth_sample.storage_unit_name == "STORAGE_UNIT_MILLIMETER" or depth_sample.source_unit_name == "DEPTH_UNIT_MILLIMETER" ): return depth / 1000.0 return depth if depth_sample.encoding_name == "RVL_F32": return rvl.decompress_f32(depth_sample.payload).astype(np.float32) raise click.ClickException(f"unsupported depth encoding '{depth_sample.encoding_name}'") def colorize_depth( depth_m: np.ndarray, *, depth_min_m: float, depth_max_m: float, depth_palette_name: str, ) -> np.ndarray: valid = np.isfinite(depth_m) & (depth_m > 0.0) span = max(depth_max_m - depth_min_m, 1e-6) clipped = np.clip((depth_m - depth_min_m) / span, 0.0, 1.0) normalized = np.zeros(depth_m.shape, dtype=np.uint8) normalized[valid] = np.round((1.0 - clipped[valid]) * 255.0).astype(np.uint8) colormap = DEPTH_PALETTE_TO_OPENCV[depth_palette_name] if colormap is None: colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR) else: colored = cv2.applyColorMap(normalized, colormap) colored[~valid] = 0 return colored def export_rgb_frame( *, ffmpeg_bin: str, raw_video_path: Path, video_format: str, frame_index: int, output_path: Path, ) -> None: input_format = VIDEO_INPUT_FORMATS.get(video_format) if input_format is None: raise click.ClickException(f"unsupported video format '{video_format}'") command = [ ffmpeg_bin, "-hide_banner", "-loglevel", "error", "-y", "-fflags", "+genpts", "-f", input_format, "-i", str(raw_video_path), "-vf", f"select=eq(n\\,{frame_index})", "-frames:v", "1", str(output_path), ] try: completed = subprocess.run(command, check=False, capture_output=True, text=True) except FileNotFoundError as error: raise click.ClickException(f"ffmpeg binary not found: {ffmpeg_bin}") from error if completed.returncode != 0: reason = completed.stderr.strip() or completed.stdout.strip() or "ffmpeg failed to export the RGB frame" raise click.ClickException(reason) if not output_path.is_file(): raise click.ClickException(f"ffmpeg did not write {output_path}") def collect_sample_data( path: Path, *, layout: str, camera_label: str, sample_index: int, ) -> tuple[VideoSample, DepthSample, BundleMemberSample | None, bytes]: reader_module = batch.load_mcap_reader() video_topic = topic_for(layout, camera_label, "video") depth_topic = topic_for(layout, camera_label, "depth") video_sample: VideoSample | None = None depth_sample: DepthSample | None = None bundle_sample: BundleMemberSample | None = None video_index = 0 depth_index = 0 bundle_member_index = 0 video_format: str | None = None with tempfile.TemporaryDirectory(prefix="mcap_rgbd_example_") as temp_dir_name: raw_video_path = Path(temp_dir_name) / "stream.bin" with raw_video_path.open("wb") as raw_video_stream: with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): topic = channel.topic if layout == "bundled" and topic == BUNDLE_TOPIC and bundle_sample is None: if schema is None or schema.name != "cvmmap_streamer.BundleManifest": continue bundle_class, present_value = batch.load_bundle_manifest_type(schema.data) bundle_message = bundle_class() bundle_message.ParseFromString(message.data) for member in bundle_message.members: if str(member.camera_label) != camera_label: continue status_name = bundle_validator.status_name_from_member(member, present_value) member_timestamp_ns = None if member.HasField("timestamp"): member_timestamp_ns = parse_timestamp_ns(member.timestamp, int(message.log_time)) if is_present_status(status_name): if bundle_member_index == sample_index: bundle_sample = BundleMemberSample( bundle_index=int(bundle_message.bundle_index), bundle_timestamp_ns=parse_timestamp_ns(bundle_message.timestamp, int(message.log_time)), member_timestamp_ns=member_timestamp_ns, status_name=status_name, corrupted_frames_skipped=int(getattr(member, "corrupted_frames_skipped", 0)), member_stream_index=bundle_member_index, ) bundle_member_index += 1 break continue if topic == video_topic: if schema is None or schema.name != "foxglove.CompressedVideo": raise click.ClickException(f"unexpected schema on {video_topic}: {schema.name if schema else 'none'}") message_class = load_message_class(schema.data, "foxglove.CompressedVideo") payload = message_class() payload.ParseFromString(message.data) frame_format = str(payload.format) if frame_format not in VIDEO_INPUT_FORMATS: raise click.ClickException(f"unsupported video format '{frame_format}' on {video_topic}") if video_format is None: video_format = frame_format elif video_format != frame_format: raise click.ClickException( f"inconsistent video format on {video_topic}: {video_format} then {frame_format}" ) if video_index <= sample_index: raw_video_stream.write(bytes(payload.data)) if video_index == sample_index: video_sample = VideoSample( timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)), format_name=frame_format, stream_index=video_index, ) video_index += 1 continue if topic == depth_topic: if schema is None or schema.name != "cvmmap_streamer.DepthMap": raise click.ClickException(f"unexpected schema on {depth_topic}: {schema.name if schema else 'none'}") message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap") payload = message_class() payload.ParseFromString(message.data) if depth_index == sample_index: depth_sample = DepthSample( timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)), payload=bytes(payload.data), stream_index=depth_index, width=int(payload.width), height=int(payload.height), encoding_name=enum_name(payload, "encoding"), source_unit_name=enum_name(payload, "source_unit"), storage_unit_name=enum_name(payload, "storage_unit"), ) depth_index += 1 continue if ( video_sample is not None and depth_sample is not None and (layout != "bundled" or bundle_sample is not None) ): break raw_video_bytes = raw_video_path.read_bytes() if video_sample is None: raise click.ClickException(f"sample index {sample_index} exceeded available video samples") if depth_sample is None: raise click.ClickException(f"sample index {sample_index} exceeded available depth samples") if layout == "bundled" and bundle_sample is None: raise click.ClickException( f"could not map per-camera sample index {sample_index} to a bundle member for {camera_label}" ) return video_sample, depth_sample, bundle_sample, raw_video_bytes def write_sample_outputs( *, path: Path, layout: str, output_dir: Path, camera_label: str, sample_index: int, video_sample: VideoSample, depth_sample: DepthSample, bundle_sample: BundleMemberSample | None, raw_video_bytes: bytes, ffmpeg_bin: str, depth_min_m: float, depth_max_m: float, depth_palette_name: str, ) -> None: output_dir.mkdir(parents=True, exist_ok=True) rgb_output_path = output_dir / "rgb.png" depth_output_path = output_dir / "depth.npy" depth_preview_path = output_dir / "depth_preview.png" metadata_path = output_dir / "sample_metadata.json" with tempfile.TemporaryDirectory(prefix="mcap_rgbd_example_export_") as temp_dir_name: raw_video_path = Path(temp_dir_name) / f"sample.{video_sample.format_name}" raw_video_path.write_bytes(raw_video_bytes) export_rgb_frame( ffmpeg_bin=ffmpeg_bin, raw_video_path=raw_video_path, video_format=video_sample.format_name, frame_index=sample_index, output_path=rgb_output_path, ) depth_m = decode_depth_array(depth_sample) np.save(depth_output_path, depth_m) depth_preview = colorize_depth( depth_m, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=depth_palette_name, ) if not cv2.imwrite(str(depth_preview_path), depth_preview): raise click.ClickException(f"failed to write depth preview to {depth_preview_path}") metadata = { "mcap_path": str(path), "layout": layout, } metadata.update( { "camera_label": camera_label, "sample_index": sample_index, "video_stream_index": video_sample.stream_index, "video_timestamp_ns": video_sample.timestamp_ns, "video_timestamp": format_timestamp_ns(video_sample.timestamp_ns), "video_format": video_sample.format_name, "depth_stream_index": depth_sample.stream_index, "depth_timestamp_ns": depth_sample.timestamp_ns, "depth_timestamp": format_timestamp_ns(depth_sample.timestamp_ns), "depth_width": depth_sample.width, "depth_height": depth_sample.height, "depth_encoding": depth_sample.encoding_name, "depth_source_unit": depth_sample.source_unit_name, "depth_storage_unit": depth_sample.storage_unit_name, "depth_palette": depth_palette_name, "depth_min_m": depth_min_m, "depth_max_m": depth_max_m, "rgb_output_path": str(rgb_output_path), "depth_output_path": str(depth_output_path), "depth_preview_path": str(depth_preview_path), } ) if bundle_sample is not None: metadata["bundle"] = asdict(bundle_sample) metadata["bundle"]["bundle_timestamp"] = format_timestamp_ns(bundle_sample.bundle_timestamp_ns) metadata["bundle"]["member_timestamp"] = format_timestamp_ns(bundle_sample.member_timestamp_ns) metadata_path.write_text(json.dumps(metadata, indent=2, sort_keys=True) + "\n", encoding="utf-8") @click.group() def main() -> None: """Small MCAP RGBD example helper for bundled and copy-layout MCAP files.""" @main.command("summary") @click.argument("mcap_path", type=click.Path(path_type=Path, exists=True)) def summary_command(mcap_path: Path) -> None: """Print a compact metadata summary for a single MCAP file.""" summary = summarize_mcap(mcap_path.resolve()) ensure_supported_layout(summary.base) print_summary(summary) @main.command("export-sample") @click.argument("mcap_path", type=click.Path(path_type=Path, exists=True)) @click.option("--camera-label", help="Camera label to export. Defaults to the first sorted namespaced label.") @click.option("--sample-index", default=0, show_default=True, type=click.IntRange(min=0), help="Zero-based per-camera RGB+depth sample index.") @click.option("--output-dir", required=True, type=click.Path(path_type=Path), help="Directory to write rgb.png, depth.npy, depth_preview.png, and sample_metadata.json.") @click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to decode the selected RGB frame.") @click.option("--depth-min-m", default=0.2, show_default=True, type=float, help="Minimum displayed depth in meters for depth_preview.png.") @click.option("--depth-max-m", default=5.0, show_default=True, type=float, help="Maximum displayed depth in meters for depth_preview.png.") @click.option( "--depth-palette", default="Turbo", show_default=True, type=click.Choice(tuple(DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False), help="Depth color palette for depth_preview.png.", ) def export_sample_command( mcap_path: Path, camera_label: str | None, sample_index: int, output_dir: Path, ffmpeg_bin: str, depth_min_m: float, depth_max_m: float, depth_palette: str, ) -> None: """Export one per-camera RGB/depth sample from a bundled or copy-layout MCAP file.""" summary = summarize_mcap(mcap_path.resolve()) ensure_supported_layout(summary.base) if summary.base.validation_status != "valid": raise click.ClickException( f"refusing to export from invalid MCAP: {summary.base.validation_reason or summary.base.validation_status}" ) label = selected_camera_label(summary.base, camera_label) stats = summary.base.camera_stats[label] pair_count = min(stats.video_messages, stats.depth_messages) if pair_count <= 0: raise click.ClickException(f"camera '{label}' has no paired RGB+depth samples") if sample_index >= pair_count: raise click.ClickException( f"--sample-index {sample_index} is outside 0..{pair_count - 1} for camera '{label}'" ) selected_palette = next( palette_name for palette_name in DEPTH_PALETTE_TO_OPENCV if palette_name.lower() == depth_palette.lower() ) video_sample, depth_sample, bundle_sample, raw_video_bytes = collect_sample_data( mcap_path.resolve(), layout=summary.base.layout, camera_label=label, sample_index=sample_index, ) write_sample_outputs( path=mcap_path.resolve(), layout=summary.base.layout, output_dir=output_dir.expanduser().resolve(), camera_label=label, sample_index=sample_index, video_sample=video_sample, depth_sample=depth_sample, bundle_sample=bundle_sample, raw_video_bytes=raw_video_bytes, ffmpeg_bin=ffmpeg_bin, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=selected_palette, ) click.echo(f"wrote sample export: {output_dir.expanduser().resolve()}") if __name__ == "__main__": main()