#!/usr/bin/env python3 from __future__ import annotations import contextlib import importlib import subprocess import sys import tempfile import time from collections import deque from dataclasses import dataclass from pathlib import Path from typing import BinaryIO import click import cv2 import numpy as np from google.protobuf import descriptor_pb2, descriptor_pool, message_factory, timestamp_pb2 import zed_batch_svo_to_mcap as batch SCRIPT_PATH = Path(__file__).resolve() REPO_ROOT = SCRIPT_PATH.parents[1] WORKSPACE_ROOT = REPO_ROOT.parent MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap" if str(SCRIPT_PATH.parent) not in sys.path: sys.path.insert(0, str(SCRIPT_PATH.parent)) if str(MCAP_PYTHON_ROOT) not in sys.path: sys.path.insert(0, str(MCAP_PYTHON_ROOT)) BUNDLE_TOPIC = "/bundle" SINGLE_CAMERA_LABEL = "camera" DISPLAY_WIDTH = 720 DISPLAY_HEIGHT = 420 TEXT_LINE_HEIGHT = 28 WINDOW_PADDING = 24 METADATA_PANEL_WIDTH = 380 VIEWPORT_WIDTH = DISPLAY_WIDTH * 2 + METADATA_PANEL_WIDTH + WINDOW_PADDING * 4 VIEWPORT_HEIGHT = DISPLAY_HEIGHT + 260 FRAME_SLIDER_DEBOUNCE_SECONDS = 0.08 PLAYBACK_FPS_WARNING_RATIO = 0.9 PLAYBACK_FPS_WARNING_ABS_TOLERANCE = 1.0 PLAYBACK_FPS_WINDOW_SECONDS = 2.0 _MCAP_READER_MODULE = None _MESSAGE_CLASS_CACHE: dict[tuple[bytes, str], object] = {} DEPTH_PALETTE_TO_OPENCV = { "Gray": None, "Turbo": cv2.COLORMAP_TURBO, "Inferno": cv2.COLORMAP_INFERNO, "Plasma": cv2.COLORMAP_PLASMA, "Viridis": cv2.COLORMAP_VIRIDIS, "Cividis": cv2.COLORMAP_CIVIDIS, "Magma": cv2.COLORMAP_MAGMA, "Parula": cv2.COLORMAP_PARULA, } @dataclass(slots=True, frozen=True) class CalibrationInfo: width: int height: int @dataclass(slots=True, frozen=True) class VideoFrameInfo: timestamp_ns: int @dataclass(slots=True, frozen=True) class DepthFrameInfo: timestamp_ns: int offset: int length: int width: int height: int encoding_name: str source_unit_name: str storage_unit_name: str @dataclass(slots=True, frozen=True) class BundleSlot: bundle_index: int bundle_timestamp_ns: int status_name: str corrupted_frames_skipped: int sample_timestamp_ns: int | None video_index: int | None depth_index: int | None @dataclass(slots=True) class McapLayoutInfo: path: Path layout: str camera_labels: tuple[str, ...] bundled_policy_counts: dict[str, int] @dataclass(slots=True) class CameraViewState: mcap_path: Path layout: str camera_label: str video_format: str | None video_frames: list[VideoFrameInfo] depth_frames: list[DepthFrameInfo] slots: list[BundleSlot] video_calibration: CalibrationInfo | None depth_calibration: CalibrationInfo | None preview_video_path: Path | None depth_cache_path: Path temp_dir: tempfile.TemporaryDirectory[str] def close(self) -> None: self.temp_dir.cleanup() @dataclass(slots=True) class RenderedSlot: rgb_bgr: np.ndarray depth_bgr: np.ndarray title: str metadata_lines: list[str] def load_mcap_reader(): global _MCAP_READER_MODULE if _MCAP_READER_MODULE is not None: return _MCAP_READER_MODULE if str(MCAP_PYTHON_ROOT) not in sys.path: sys.path.insert(0, str(MCAP_PYTHON_ROOT)) _MCAP_READER_MODULE = importlib.import_module("mcap.reader") return _MCAP_READER_MODULE def load_message_class(schema_data: bytes, message_type_name: str): cache_key = (schema_data, message_type_name) cached = _MESSAGE_CLASS_CACHE.get(cache_key) if cached is not None: return cached descriptor_set = descriptor_pb2.FileDescriptorSet() descriptor_set.ParseFromString(schema_data) pool = descriptor_pool.DescriptorPool() has_embedded_timestamp = any( file_descriptor.name == "google/protobuf/timestamp.proto" for file_descriptor in descriptor_set.file ) if has_embedded_timestamp: for file_descriptor in descriptor_set.file: if file_descriptor.name == "google/protobuf/timestamp.proto": pool.Add(file_descriptor) break else: pool.AddSerializedFile(timestamp_pb2.DESCRIPTOR.serialized_pb) for file_descriptor in descriptor_set.file: if file_descriptor.name == "google/protobuf/timestamp.proto": continue pool.Add(file_descriptor) message_descriptor = pool.FindMessageTypeByName(message_type_name) message_class = message_factory.GetMessageClass(message_descriptor) _MESSAGE_CLASS_CACHE[cache_key] = message_class return message_class def parse_timestamp_ns(timestamp_message: object, fallback_log_time_ns: int) -> int: seconds = int(getattr(timestamp_message, "seconds", 0)) nanos = int(getattr(timestamp_message, "nanos", 0)) if seconds == 0 and nanos == 0: return fallback_log_time_ns return seconds * 1_000_000_000 + nanos def format_timestamp_ns(timestamp_ns: int | None) -> str: if timestamp_ns is None: return "-" seconds, nanos = divmod(timestamp_ns, 1_000_000_000) return f"{seconds}.{nanos:09d}" def enum_name(message: object, field_name: str) -> str: field_descriptor = message.DESCRIPTOR.fields_by_name[field_name] value = int(getattr(message, field_name)) resolved = field_descriptor.enum_type.values_by_number.get(value) return resolved.name if resolved is not None else str(value) def is_present_status(status_name: str) -> bool: return status_name in {"PRESENT", "BUNDLE_MEMBER_STATUS_PRESENT"} def is_corrupted_gap_status(status_name: str) -> bool: return status_name in {"CORRUPTED_GAP", "BUNDLE_MEMBER_STATUS_CORRUPTED_GAP"} def infer_layout(path: Path) -> McapLayoutInfo: reader_module = load_mcap_reader() camera_labels: set[str] = set() bundled_policy_counts: dict[str, int] = {} saw_single = False saw_bundled = False saw_bundle_manifest = False with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): topic = channel.topic if topic.startswith("/camera/"): saw_single = True continue if topic == BUNDLE_TOPIC: saw_bundled = True saw_bundle_manifest = True if schema is None or schema.name != "cvmmap_streamer.BundleManifest": continue bundle_class, _ = batch.load_bundle_manifest_type(schema.data) bundle = bundle_class() bundle.ParseFromString(message.data) for member in bundle.members: camera_labels.add(str(member.camera_label)) policy_name = enum_name(bundle, "policy") bundled_policy_counts[policy_name] = bundled_policy_counts.get(policy_name, 0) + 1 continue if topic.count("/") == 2 and topic.startswith("/"): label = topic.split("/")[1] if label and label != "camera": saw_bundled = True camera_labels.add(label) if saw_single and saw_bundled: raise click.ClickException("MCAP mixes single-camera and multi-camera topics") if saw_single: return McapLayoutInfo( path=path, layout="single-camera", camera_labels=(SINGLE_CAMERA_LABEL,), bundled_policy_counts=bundled_policy_counts, ) if saw_bundled and camera_labels: return McapLayoutInfo( path=path, layout="bundled" if saw_bundle_manifest else "copy", camera_labels=tuple(sorted(camera_labels)), bundled_policy_counts=bundled_policy_counts, ) raise click.ClickException(f"could not infer a supported MCAP layout from {path}") def topic_for(layout: str, camera_label: str, kind: str) -> str: if layout == "single-camera": return f"/camera/{kind}" return f"/{camera_label}/{kind}" def build_preview_video( ffmpeg_bin: str, raw_video_path: Path, video_format: str, preview_width: int, temp_dir: Path, ) -> Path: input_format = {"h264": "h264", "h265": "hevc"}[video_format] # This script uses one simple RGB preview path for both sequential playback and # random-access scrubbing, so it builds an intra-frame MJPEG cache first. preview_path = temp_dir / "preview.avi" command = [ ffmpeg_bin, "-hide_banner", "-loglevel", "error", "-y", "-fflags", "+genpts", "-f", input_format, "-i", str(raw_video_path), "-vf", f"scale=min(iw\\,{preview_width}):-2:flags=lanczos", "-c:v", "mjpeg", "-q:v", "3", str(preview_path), ] completed = subprocess.run(command, check=False, capture_output=True, text=True) if completed.returncode != 0: reason = completed.stderr.strip() or completed.stdout.strip() or "ffmpeg failed to build preview cache" raise click.ClickException(reason) return preview_path def read_camera_state( path: Path, *, layout_info: McapLayoutInfo, camera_label: str, ffmpeg_bin: str, preview_width: int, ) -> CameraViewState: reader_module = load_mcap_reader() temp_dir = tempfile.TemporaryDirectory(prefix="mcap_rgbd_viewer_") temp_root = Path(temp_dir.name) raw_video_path = temp_root / "stream.bin" depth_cache_path = temp_root / "depth.cache" video_topic = topic_for(layout_info.layout, camera_label, "video") depth_topic = topic_for(layout_info.layout, camera_label, "depth") calibration_topic = topic_for(layout_info.layout, camera_label, "calibration") depth_calibration_topic = topic_for(layout_info.layout, camera_label, "depth_calibration") video_frames: list[VideoFrameInfo] = [] depth_frames: list[DepthFrameInfo] = [] bundle_members: list[tuple[int, int, str, int, int | None]] = [] video_format: str | None = None video_calibration: CalibrationInfo | None = None depth_calibration: CalibrationInfo | None = None with raw_video_path.open("wb") as video_stream, depth_cache_path.open("wb") as depth_stream: with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): topic = channel.topic if topic == video_topic: if schema is None or schema.name != "foxglove.CompressedVideo": raise click.ClickException(f"unexpected schema on {video_topic}: {schema.name if schema else 'none'}") message_class = load_message_class(schema.data, "foxglove.CompressedVideo") payload = message_class() payload.ParseFromString(message.data) frame_format = str(payload.format) if frame_format not in {"h264", "h265"}: raise click.ClickException(f"unsupported video format '{frame_format}' on {video_topic}") if video_format is None: video_format = frame_format elif frame_format != video_format: raise click.ClickException( f"inconsistent video format on {video_topic}: {video_format} then {frame_format}" ) video_stream.write(payload.data) video_frames.append( VideoFrameInfo( timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)), ) ) continue if topic == depth_topic: if schema is None or schema.name != "cvmmap_streamer.DepthMap": raise click.ClickException(f"unexpected schema on {depth_topic}: {schema.name if schema else 'none'}") message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap") payload = message_class() payload.ParseFromString(message.data) offset = depth_stream.tell() depth_bytes = bytes(payload.data) depth_stream.write(depth_bytes) depth_frames.append( DepthFrameInfo( timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)), offset=offset, length=len(depth_bytes), width=int(payload.width), height=int(payload.height), encoding_name=enum_name(payload, "encoding"), source_unit_name=enum_name(payload, "source_unit"), storage_unit_name=enum_name(payload, "storage_unit"), ) ) continue if topic == calibration_topic: if schema is None or schema.name != "foxglove.CameraCalibration": continue message_class = load_message_class(schema.data, "foxglove.CameraCalibration") payload = message_class() payload.ParseFromString(message.data) if video_calibration is None: video_calibration = CalibrationInfo(width=int(payload.width), height=int(payload.height)) continue if topic == depth_calibration_topic: if schema is None or schema.name != "foxglove.CameraCalibration": continue message_class = load_message_class(schema.data, "foxglove.CameraCalibration") payload = message_class() payload.ParseFromString(message.data) if depth_calibration is None: depth_calibration = CalibrationInfo(width=int(payload.width), height=int(payload.height)) continue if layout_info.layout == "bundled" and topic == BUNDLE_TOPIC: if schema is None or schema.name != "cvmmap_streamer.BundleManifest": continue bundle_class, present_value = batch.load_bundle_manifest_type(schema.data) bundle = bundle_class() bundle.ParseFromString(message.data) for member in bundle.members: if str(member.camera_label) != camera_label: continue status_name = "PRESENT" if member.HasField("timestamp") else "UNKNOWN" if present_value is not None: field_descriptor = member.DESCRIPTOR.fields_by_name["status"] enum_value = field_descriptor.enum_type.values_by_number.get(int(member.status)) status_name = enum_value.name if enum_value is not None else str(member.status) sample_timestamp_ns = None if member.HasField("timestamp"): sample_timestamp_ns = parse_timestamp_ns(member.timestamp, int(message.log_time)) bundle_members.append( ( int(bundle.bundle_index), parse_timestamp_ns(bundle.timestamp, int(message.log_time)), status_name, int(getattr(member, "corrupted_frames_skipped", 0)), sample_timestamp_ns, ) ) break preview_video_path: Path | None = None if video_format is not None and video_frames: suffix = ".h265" if video_format == "h265" else ".h264" renamed_raw_video_path = raw_video_path.with_suffix(suffix) raw_video_path.replace(renamed_raw_video_path) preview_video_path = build_preview_video( ffmpeg_bin=ffmpeg_bin, raw_video_path=renamed_raw_video_path, video_format=video_format, preview_width=preview_width, temp_dir=temp_root, ) slots: list[BundleSlot] = [] if layout_info.layout == "bundled" and bundle_members: present_cursor = 0 usable_present_frames = min(len(video_frames), len(depth_frames)) for bundle_index, bundle_timestamp_ns, status_name, corrupted_frames_skipped, sample_timestamp_ns in bundle_members: video_index: int | None = None depth_index: int | None = None if is_present_status(status_name) and present_cursor < usable_present_frames: video_index = present_cursor depth_index = present_cursor present_cursor += 1 slots.append( BundleSlot( bundle_index=bundle_index, bundle_timestamp_ns=bundle_timestamp_ns, status_name=status_name, corrupted_frames_skipped=corrupted_frames_skipped, sample_timestamp_ns=sample_timestamp_ns, video_index=video_index, depth_index=depth_index, ) ) else: paired_frames = min(len(video_frames), len(depth_frames)) slots = [ BundleSlot( bundle_index=index, bundle_timestamp_ns=video_frames[index].timestamp_ns, status_name="PRESENT", corrupted_frames_skipped=0, sample_timestamp_ns=video_frames[index].timestamp_ns, video_index=index, depth_index=index, ) for index in range(paired_frames) ] if not slots: raise click.ClickException(f"no viewable RGB+depth pairs found for camera '{camera_label}'") return CameraViewState( mcap_path=path, layout=layout_info.layout, camera_label=camera_label, video_format=video_format, video_frames=video_frames, depth_frames=depth_frames, slots=slots, video_calibration=video_calibration, depth_calibration=depth_calibration, preview_video_path=preview_video_path, depth_cache_path=depth_cache_path, temp_dir=temp_dir, ) def render_placeholder(text: str) -> np.ndarray: canvas = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) cv2.putText( canvas, text, (40, DISPLAY_HEIGHT // 2), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, cv2.LINE_AA, ) return canvas def fit_to_panel(frame_bgr: np.ndarray) -> np.ndarray: src_height, src_width = frame_bgr.shape[:2] scale = min(DISPLAY_WIDTH / src_width, DISPLAY_HEIGHT / src_height) resized_width = max(1, int(round(src_width * scale))) resized_height = max(1, int(round(src_height * scale))) resized = cv2.resize(frame_bgr, (resized_width, resized_height), interpolation=cv2.INTER_AREA) canvas = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) x_offset = (DISPLAY_WIDTH - resized_width) // 2 y_offset = (DISPLAY_HEIGHT - resized_height) // 2 canvas[y_offset : y_offset + resized_height, x_offset : x_offset + resized_width] = resized return canvas def read_video_frame( preview_video_path: Path | None, video_index: int | None, *, capture: cv2.VideoCapture | None = None, ) -> np.ndarray: if preview_video_path is None or video_index is None: return render_placeholder("NO VIDEO") created_capture = False if capture is None: capture = cv2.VideoCapture(str(preview_video_path)) created_capture = True if not capture.isOpened(): raise click.ClickException(f"OpenCV could not open preview cache {preview_video_path}") try: capture.set(cv2.CAP_PROP_POS_FRAMES, float(video_index)) ok, frame = capture.read() finally: if created_capture: capture.release() if not ok or frame is None: return render_placeholder("VIDEO SEEK FAILED") return fit_to_panel(frame) def decode_depth_frame( state: CameraViewState, depth_index: int | None, *, depth_min_m: float, depth_max_m: float, depth_palette_name: str, depth_cache_stream: BinaryIO | None = None, ) -> np.ndarray: if depth_index is None: return render_placeholder("NO DEPTH") try: import rvl except ModuleNotFoundError as error: raise click.ClickException( "the viewer needs the optional rvl-impl binding; run `uv sync --extra viewer`" ) from error ref = state.depth_frames[depth_index] if depth_cache_stream is None: with state.depth_cache_path.open("rb") as stream: stream.seek(ref.offset) payload = stream.read(ref.length) else: depth_cache_stream.seek(ref.offset) payload = depth_cache_stream.read(ref.length) if ref.encoding_name == "RVL_U16_LOSSLESS": depth = rvl.decompress_u16(payload).astype(np.float32) if ref.storage_unit_name == "STORAGE_UNIT_MILLIMETER" or ref.source_unit_name == "DEPTH_UNIT_MILLIMETER": depth_m = depth / 1000.0 else: depth_m = depth elif ref.encoding_name == "RVL_F32": depth_m = rvl.decompress_f32(payload).astype(np.float32) else: return render_placeholder(ref.encoding_name) valid = np.isfinite(depth_m) & (depth_m > 0.0) span = max(depth_max_m - depth_min_m, 1e-6) clipped = np.clip((depth_m - depth_min_m) / span, 0.0, 1.0) normalized = np.zeros(depth_m.shape, dtype=np.uint8) normalized[valid] = np.round((1.0 - clipped[valid]) * 255.0).astype(np.uint8) colormap = DEPTH_PALETTE_TO_OPENCV[depth_palette_name] if colormap is None: colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR) else: colored = cv2.applyColorMap(normalized, colormap) colored[~valid] = 0 return fit_to_panel(colored) def build_metadata_lines( state: CameraViewState, slot: BundleSlot, *, depth_min_m: float, depth_max_m: float, depth_palette_name: str, ) -> list[str]: lines = [ f"file: {state.mcap_path.name}", f"layout: {state.layout}", f"camera: {state.camera_label}", f"slots: {len(state.slots)}", f"video frames: {len(state.video_frames)}", f"depth frames: {len(state.depth_frames)}", f"bundle/frame index: {slot.bundle_index}", f"bundle ts: {format_timestamp_ns(slot.bundle_timestamp_ns)}", f"sample ts: {format_timestamp_ns(slot.sample_timestamp_ns)}", f"status: {slot.status_name}", ] if slot.corrupted_frames_skipped: lines.append(f"corrupted frames skipped: {slot.corrupted_frames_skipped}") if state.video_format is not None: lines.append(f"video format: {state.video_format}") if state.video_calibration is not None: lines.append(f"video calib: {state.video_calibration.width}x{state.video_calibration.height}") if state.depth_calibration is not None: lines.append(f"depth calib: {state.depth_calibration.width}x{state.depth_calibration.height}") elif slot.depth_index is not None: ref = state.depth_frames[slot.depth_index] lines.append(f"depth frame: {ref.width}x{ref.height} {ref.encoding_name}") lines.append(f"depth range: {depth_min_m:.2f}m .. {depth_max_m:.2f}m") lines.append(f"depth palette: {depth_palette_name}") return lines def render_slot( state: CameraViewState, slot_index: int, *, depth_min_m: float, depth_max_m: float, depth_palette_name: str, preview_capture: cv2.VideoCapture | None = None, depth_cache_stream: BinaryIO | None = None, ) -> RenderedSlot: slot = state.slots[slot_index] if is_corrupted_gap_status(slot.status_name): rgb_frame = render_placeholder("CORRUPTED GAP") depth_frame = render_placeholder("CORRUPTED GAP") else: rgb_frame = read_video_frame(state.preview_video_path, slot.video_index, capture=preview_capture) depth_frame = decode_depth_frame( state, slot.depth_index, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=depth_palette_name, depth_cache_stream=depth_cache_stream, ) title = ( f"{state.camera_label} " f"{'bundle' if state.layout == 'bundled' else 'frame'}={slot.bundle_index} " f"status={slot.status_name}" ) return RenderedSlot( rgb_bgr=rgb_frame, depth_bgr=depth_frame, title=title, metadata_lines=build_metadata_lines( state, slot, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=depth_palette_name, ), ) def compose_preview_image(rendered: RenderedSlot) -> np.ndarray: banner_height = 64 banner = np.zeros((banner_height, DISPLAY_WIDTH * 2, 3), dtype=np.uint8) cv2.putText( banner, rendered.title, (16, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) pair = np.concatenate([rendered.rgb_bgr, rendered.depth_bgr], axis=1) return np.concatenate([banner, pair], axis=0) def print_summary(layout_info: McapLayoutInfo, state: CameraViewState) -> None: click.echo(f"path: {state.mcap_path}") click.echo(f"layout: {layout_info.layout}") click.echo(f"camera labels: {', '.join(layout_info.camera_labels)}") if layout_info.bundled_policy_counts: policy_text = ", ".join( f"{policy}={count}" for policy, count in sorted(layout_info.bundled_policy_counts.items()) ) click.echo(f"bundle policies: {policy_text}") click.echo(f"selected camera: {state.camera_label}") click.echo(f"viewable slots: {len(state.slots)}") click.echo(f"video frames: {len(state.video_frames)}") click.echo(f"depth frames: {len(state.depth_frames)}") if state.video_format is not None: click.echo(f"video format: {state.video_format}") if state.video_calibration is not None: click.echo(f"video calibration: {state.video_calibration.width}x{state.video_calibration.height}") if state.depth_calibration is not None: click.echo(f"depth calibration: {state.depth_calibration.width}x{state.depth_calibration.height}") if state.slots: click.echo(f"first slot timestamp: {format_timestamp_ns(state.slots[0].bundle_timestamp_ns)}") click.echo(f"last slot timestamp: {format_timestamp_ns(state.slots[-1].bundle_timestamp_ns)}") corrupted_gap_count = sum(1 for slot in state.slots if is_corrupted_gap_status(slot.status_name)) if corrupted_gap_count: click.echo(f"corrupted gap slots: {corrupted_gap_count}") def write_preview_image( state: CameraViewState, output_path: Path, frame_index: int, depth_min_m: float, depth_max_m: float, depth_palette_name: str, ) -> None: if frame_index < 0 or frame_index >= len(state.slots): raise click.ClickException(f"--frame-index {frame_index} is outside 0..{len(state.slots) - 1}") rendered = render_slot( state, frame_index, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=depth_palette_name, ) preview = compose_preview_image(rendered) output_path.parent.mkdir(parents=True, exist_ok=True) if not cv2.imwrite(str(output_path), preview): raise click.ClickException(f"failed to write preview image to {output_path}") click.echo(f"wrote preview: {output_path}") def slot_target_timestamp_ns(slot: BundleSlot) -> int: return slot.sample_timestamp_ns if slot.sample_timestamp_ns is not None else slot.bundle_timestamp_ns def nearest_slot_index(slots: list[BundleSlot], target_timestamp_ns: int) -> int: if not slots: return 0 return min( range(len(slots)), key=lambda index: abs(slot_target_timestamp_ns(slots[index]) - target_timestamp_ns), ) def bgr_to_texture(frame_bgr: np.ndarray) -> np.ndarray: rgba = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGBA) return np.ascontiguousarray(rgba, dtype=np.float32) / 255.0 class ViewerApp: def __init__( self, *, mcap_path: Path, layout_info: McapLayoutInfo, initial_state: CameraViewState, ffmpeg_bin: str, preview_width: int, depth_min_m: float, depth_max_m: float, ) -> None: try: import dearpygui.dearpygui as dpg except ModuleNotFoundError as error: raise click.ClickException( "the GUI viewer needs DearPyGui; run `uv sync --extra viewer` first" ) from error self.dpg = dpg self.mcap_path = mcap_path self.layout_info = layout_info self.state = initial_state self.ffmpeg_bin = ffmpeg_bin self.preview_width = preview_width self.depth_min_m = depth_min_m self.depth_max_m = depth_max_m self.depth_palette_name = "Turbo" self.current_index = 0 self.pending_index: int | None = None self.pending_index_requested_at = 0.0 self.playing = False self.playback_fps = 15.0 self.next_playback_deadline = time.monotonic() self.playback_progress_samples: deque[tuple[float, int]] = deque() self.playback_advanced_frames = 0 self.actual_playback_fps = 0.0 self.preview_capture: cv2.VideoCapture | None = None self.depth_cache_stream: BinaryIO | None = None self.open_state_resources() def open_state_resources(self) -> None: if self.state.preview_video_path is not None: capture = cv2.VideoCapture(str(self.state.preview_video_path)) if not capture.isOpened(): capture.release() raise click.ClickException(f"OpenCV could not open preview cache {self.state.preview_video_path}") self.preview_capture = capture else: self.preview_capture = None self.depth_cache_stream = self.state.depth_cache_path.open("rb") def close_state_resources(self) -> None: if self.preview_capture is not None: self.preview_capture.release() self.preview_capture = None if self.depth_cache_stream is not None: self.depth_cache_stream.close() self.depth_cache_stream = None def replace_state(self, camera_label: str) -> None: source_index = self.pending_index if self.pending_index is not None else self.current_index source_index = max(0, min(source_index, len(self.state.slots) - 1)) target_timestamp_ns = slot_target_timestamp_ns(self.state.slots[source_index]) new_state = read_camera_state( self.mcap_path, layout_info=self.layout_info, camera_label=camera_label, ffmpeg_bin=self.ffmpeg_bin, preview_width=self.preview_width, ) self.close_state_resources() self.state.close() self.state = new_state self.open_state_resources() self.current_index = nearest_slot_index(self.state.slots, target_timestamp_ns) self.pending_index = None self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0) self.reset_playback_metrics() self.dpg.configure_item("frame_slider", max_value=max(0, len(self.state.slots) - 1)) self.refresh() def refresh(self) -> None: rendered = render_slot( self.state, self.current_index, depth_min_m=self.depth_min_m, depth_max_m=self.depth_max_m, depth_palette_name=self.depth_palette_name, preview_capture=self.preview_capture, depth_cache_stream=self.depth_cache_stream, ) self.dpg.set_value("rgb_texture", bgr_to_texture(rendered.rgb_bgr).ravel()) self.dpg.set_value("depth_texture", bgr_to_texture(rendered.depth_bgr).ravel()) self.dpg.set_value("viewer_title", rendered.title) self.dpg.set_value("frame_slider", self.current_index) self.dpg.set_value("frame_text", f"{self.current_index + 1}/{len(self.state.slots)}") self.dpg.set_value("metadata_text", "\n".join(rendered.metadata_lines)) def set_frame_index(self, index: int, *, debounce: bool = False) -> None: clamped = max(0, min(index, len(self.state.slots) - 1)) if debounce: self.pending_index = clamped self.pending_index_requested_at = time.monotonic() self.dpg.set_value("frame_text", f"{clamped + 1}/{len(self.state.slots)}") return self.pending_index = None self.current_index = clamped self.refresh() def set_playing(self, playing: bool) -> None: self.playing = playing self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0) self.reset_playback_metrics() def toggle_playing(self) -> None: self.set_playing(not self.playing) def set_playback_fps(self, playback_fps: float) -> None: self.playback_fps = playback_fps self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0) self.update_playback_status() def reset_playback_metrics(self) -> None: self.playback_progress_samples.clear() self.playback_advanced_frames = 0 self.actual_playback_fps = 0.0 self.update_playback_status() def record_playback_step(self, timestamp_s: float, frames_advanced: int) -> None: self.playback_advanced_frames += frames_advanced self.playback_progress_samples.append((timestamp_s, self.playback_advanced_frames)) cutoff = timestamp_s - PLAYBACK_FPS_WINDOW_SECONDS while self.playback_progress_samples and self.playback_progress_samples[0][0] < cutoff: self.playback_progress_samples.popleft() if len(self.playback_progress_samples) >= 2: elapsed_s = self.playback_progress_samples[-1][0] - self.playback_progress_samples[0][0] if elapsed_s > 0.0: advanced_frames = self.playback_progress_samples[-1][1] - self.playback_progress_samples[0][1] self.actual_playback_fps = advanced_frames / elapsed_s else: self.actual_playback_fps = 0.0 else: self.actual_playback_fps = 0.0 self.update_playback_status() def update_playback_status(self) -> None: if not self.dpg.does_item_exist("playback_status_text"): return if ( self.playing and self.actual_playback_fps > 0.0 and ( self.actual_playback_fps < self.playback_fps * PLAYBACK_FPS_WARNING_RATIO and self.actual_playback_fps < self.playback_fps - PLAYBACK_FPS_WARNING_ABS_TOLERANCE ) ): self.dpg.set_value( "playback_status_text", f"Warning: actual playback {self.actual_playback_fps:.1f} FPS is below target {self.playback_fps:.1f} FPS", ) return self.dpg.set_value("playback_status_text", "") def flush_pending_index(self) -> None: if self.pending_index is None: return if time.monotonic() - self.pending_index_requested_at < FRAME_SLIDER_DEBOUNCE_SECONDS: return target_index = self.pending_index self.pending_index = None self.current_index = target_index self.refresh() def maybe_advance(self) -> None: self.flush_pending_index() if not self.playing: return interval_s = 1.0 / max(self.playback_fps, 1.0) now = time.monotonic() if now < self.next_playback_deadline: return frames_due = max(1, int((now - self.next_playback_deadline) / interval_s) + 1) self.next_playback_deadline += frames_due * interval_s next_index = self.current_index + frames_due if next_index >= len(self.state.slots): frames_due = len(self.state.slots) - 1 - self.current_index if frames_due <= 0: self.set_playing(False) return next_index = len(self.state.slots) - 1 self.set_frame_index(next_index) self.record_playback_step(time.monotonic(), frames_due) if next_index >= len(self.state.slots) - 1: self.set_playing(False) def process_ui_callbacks(self) -> None: jobs = self.dpg.get_callback_queue() if jobs: self.dpg.run_callbacks(jobs) def run(self) -> None: dpg = self.dpg dpg.create_context() dpg.configure_app(manual_callback_management=True) with dpg.texture_registry(show=False): dpg.add_dynamic_texture(DISPLAY_WIDTH, DISPLAY_HEIGHT, [0.0] * (DISPLAY_WIDTH * DISPLAY_HEIGHT * 4), tag="rgb_texture") dpg.add_dynamic_texture(DISPLAY_WIDTH, DISPLAY_HEIGHT, [0.0] * (DISPLAY_WIDTH * DISPLAY_HEIGHT * 4), tag="depth_texture") with dpg.window(tag="main_window", label="MCAP RGBD Viewer", no_close=True): dpg.add_text(default_value="", tag="viewer_title") with dpg.group(horizontal=True): if self.layout_info.layout != "single-camera": dpg.add_combo( items=list(self.layout_info.camera_labels), default_value=self.state.camera_label, label="Camera", width=160, callback=lambda _s, app_data: self.replace_state(str(app_data)), ) dpg.add_button( label="Play/Pause", callback=lambda *_args: self.toggle_playing(), ) dpg.add_button(label="Prev", callback=lambda *_args: self.set_frame_index(self.current_index - 1)) dpg.add_button(label="Next", callback=lambda *_args: self.set_frame_index(self.current_index + 1)) dpg.add_slider_float( label="Playback FPS", min_value=1.0, max_value=60.0, default_value=self.playback_fps, width=240, callback=lambda _s, app_data: self.set_playback_fps(float(app_data)), ) dpg.add_text(default_value="", tag="playback_status_text", color=(255, 96, 96)) with dpg.group(horizontal=True): dpg.add_slider_float( label="Depth Min (m)", min_value=0.0, max_value=10.0, default_value=self.depth_min_m, width=260, callback=lambda _s, app_data: self._set_depth_min(float(app_data)), ) dpg.add_slider_float( label="Depth Max (m)", min_value=0.1, max_value=20.0, default_value=self.depth_max_m, width=260, callback=lambda _s, app_data: self._set_depth_max(float(app_data)), ) dpg.add_combo( items=list(DEPTH_PALETTE_TO_OPENCV.keys()), default_value=self.depth_palette_name, label="Palette", width=160, callback=lambda _s, app_data: self._set_depth_palette(str(app_data)), ) with dpg.group(horizontal=True): with dpg.child_window(width=DISPLAY_WIDTH + 16, height=DISPLAY_HEIGHT + 20): dpg.add_text("RGB") dpg.add_image("rgb_texture") with dpg.child_window(width=DISPLAY_WIDTH + 16, height=DISPLAY_HEIGHT + 20): dpg.add_text("Depth") dpg.add_image("depth_texture") with dpg.child_window(width=METADATA_PANEL_WIDTH, height=DISPLAY_HEIGHT + 20): dpg.add_text("Metadata") dpg.add_text(default_value="", wrap=METADATA_PANEL_WIDTH - 24, tag="metadata_text") with dpg.group(horizontal=True): dpg.add_slider_int( label="Frame/Bundle", min_value=0, max_value=max(0, len(self.state.slots) - 1), default_value=0, tag="frame_slider", width=DISPLAY_WIDTH * 2 + 80, callback=lambda _s, app_data: self.set_frame_index(int(app_data), debounce=True), ) dpg.add_text(default_value="", tag="frame_text") with dpg.handler_registry(): dpg.add_key_press_handler(dpg.mvKey_Left, callback=lambda *_args: self.set_frame_index(self.current_index - 1)) dpg.add_key_press_handler(dpg.mvKey_Right, callback=lambda *_args: self.set_frame_index(self.current_index + 1)) dpg.add_key_press_handler(dpg.mvKey_Home, callback=lambda *_args: self.set_frame_index(0)) dpg.add_key_press_handler(dpg.mvKey_End, callback=lambda *_args: self.set_frame_index(len(self.state.slots) - 1)) dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=lambda *_args: self.toggle_playing()) dpg.create_viewport( title=f"MCAP RGBD Viewer - {self.mcap_path.name}", width=VIEWPORT_WIDTH, height=VIEWPORT_HEIGHT, ) dpg.setup_dearpygui() dpg.show_viewport() self.refresh() try: while dpg.is_dearpygui_running(): dpg.render_dearpygui_frame() self.process_ui_callbacks() self.maybe_advance() finally: self.close_state_resources() dpg.destroy_context() self.state.close() def _set_depth_min(self, value: float) -> None: self.depth_min_m = min(value, self.depth_max_m - 0.01) self.refresh() def _set_depth_max(self, value: float) -> None: self.depth_max_m = max(value, self.depth_min_m + 0.01) self.refresh() def _set_depth_palette(self, value: str) -> None: if value in DEPTH_PALETTE_TO_OPENCV: self.depth_palette_name = value self.refresh() @click.command() @click.argument("mcap_path", type=click.Path(path_type=Path, exists=True)) @click.option("--camera-label", help="Camera label to view. Defaults to 'camera' or the first sorted multi-camera label.") @click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to build the preview cache.") @click.option("--preview-width", default=1280, show_default=True, type=click.IntRange(min=64), help="Maximum width of the temporary decoded preview cache.") @click.option("--frame-index", default=0, show_default=True, type=click.IntRange(min=0), help="Frame or bundle index used for --export-preview.") @click.option("--depth-min-m", default=0.2, show_default=True, type=float, help="Minimum displayed depth in meters.") @click.option("--depth-max-m", default=5.0, show_default=True, type=float, help="Maximum displayed depth in meters.") @click.option( "--depth-palette", default="Turbo", show_default=True, type=click.Choice(tuple(DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False), help="Depth color palette.", ) @click.option("--summary-only", is_flag=True, help="Print layout/camera/frame metadata and exit without opening a GUI.") @click.option("--export-preview", type=click.Path(path_type=Path), help="Write a side-by-side RGB/depth preview PNG and exit.") def main( mcap_path: Path, camera_label: str | None, ffmpeg_bin: str, preview_width: int, frame_index: int, depth_min_m: float, depth_max_m: float, depth_palette: str, summary_only: bool, export_preview: Path | None, ) -> None: layout_info = infer_layout(mcap_path.resolve()) if camera_label is None: selected_camera = layout_info.camera_labels[0] else: selected_camera = camera_label if selected_camera not in layout_info.camera_labels: raise click.ClickException( f"camera label '{selected_camera}' not found. available: {', '.join(layout_info.camera_labels)}" ) selected_depth_palette = next( palette_name for palette_name in DEPTH_PALETTE_TO_OPENCV if palette_name.lower() == depth_palette.lower() ) state = read_camera_state( mcap_path.resolve(), layout_info=layout_info, camera_label=selected_camera, ffmpeg_bin=ffmpeg_bin, preview_width=preview_width, ) try: if summary_only: print_summary(layout_info, state) return if export_preview is not None: export_preview_path = export_preview.expanduser().resolve() write_preview_image( state, export_preview_path, frame_index=frame_index, depth_min_m=depth_min_m, depth_max_m=depth_max_m, depth_palette_name=selected_depth_palette, ) return viewer = ViewerApp( mcap_path=mcap_path.resolve(), layout_info=layout_info, initial_state=state, ffmpeg_bin=ffmpeg_bin, preview_width=preview_width, depth_min_m=depth_min_m, depth_max_m=depth_max_m, ) viewer.depth_palette_name = selected_depth_palette viewer.run() finally: with contextlib.suppress(Exception): state.close() if __name__ == "__main__": main()