Files
cvmmap-streamer/scripts/mcap_rgbd_viewer.py
T
crosstyan ffd246e508 feat(mcap): switch single-source exports to copy layout
Namespace single-source zed_svo_to_mcap outputs with a derived camera label so one-camera exports use the same copy-layout topic shape as current multi-camera copy files.

Use the per-camera frame and pose reference identifiers for those single-source outputs, document copy as the current one-camera wire shape, and clarify that legacy /camera/* files are retained only as a legacy contract.

Add a small mcap_rgbd_example helper plus mcap_recipes guide for summarizing bundled/copy MCAPs and exporting one RGB/depth sample, and update the validator and viewer wording/behavior to match the bundled-vs-copy semantics.
2026-03-24 18:44:44 +08:00

1169 lines
45 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import contextlib
import importlib
import subprocess
import sys
import tempfile
import time
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO
import click
import cv2
import numpy as np
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory, timestamp_pb2
import zed_batch_svo_to_mcap as batch
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
WORKSPACE_ROOT = REPO_ROOT.parent
MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap"
if str(SCRIPT_PATH.parent) not in sys.path:
sys.path.insert(0, str(SCRIPT_PATH.parent))
if str(MCAP_PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(MCAP_PYTHON_ROOT))
BUNDLE_TOPIC = "/bundle"
SINGLE_CAMERA_LABEL = "camera"
DISPLAY_WIDTH = 720
DISPLAY_HEIGHT = 420
TEXT_LINE_HEIGHT = 28
WINDOW_PADDING = 24
METADATA_PANEL_WIDTH = 380
VIEWPORT_WIDTH = DISPLAY_WIDTH * 2 + METADATA_PANEL_WIDTH + WINDOW_PADDING * 4
VIEWPORT_HEIGHT = DISPLAY_HEIGHT + 260
FRAME_SLIDER_DEBOUNCE_SECONDS = 0.08
PLAYBACK_FPS_WARNING_RATIO = 0.9
PLAYBACK_FPS_WARNING_ABS_TOLERANCE = 1.0
PLAYBACK_FPS_WINDOW_SECONDS = 2.0
_MCAP_READER_MODULE = None
_MESSAGE_CLASS_CACHE: dict[tuple[bytes, str], object] = {}
DEPTH_PALETTE_TO_OPENCV = {
"Gray": None,
"Turbo": cv2.COLORMAP_TURBO,
"Inferno": cv2.COLORMAP_INFERNO,
"Plasma": cv2.COLORMAP_PLASMA,
"Viridis": cv2.COLORMAP_VIRIDIS,
"Cividis": cv2.COLORMAP_CIVIDIS,
"Magma": cv2.COLORMAP_MAGMA,
"Parula": cv2.COLORMAP_PARULA,
}
@dataclass(slots=True, frozen=True)
class CalibrationInfo:
width: int
height: int
@dataclass(slots=True, frozen=True)
class VideoFrameInfo:
timestamp_ns: int
@dataclass(slots=True, frozen=True)
class DepthFrameInfo:
timestamp_ns: int
offset: int
length: int
width: int
height: int
encoding_name: str
source_unit_name: str
storage_unit_name: str
@dataclass(slots=True, frozen=True)
class BundleSlot:
bundle_index: int
bundle_timestamp_ns: int
status_name: str
corrupted_frames_skipped: int
sample_timestamp_ns: int | None
video_index: int | None
depth_index: int | None
@dataclass(slots=True)
class McapLayoutInfo:
path: Path
layout: str
camera_labels: tuple[str, ...]
bundled_policy_counts: dict[str, int]
@dataclass(slots=True)
class CameraViewState:
mcap_path: Path
layout: str
camera_label: str
video_format: str | None
video_frames: list[VideoFrameInfo]
depth_frames: list[DepthFrameInfo]
slots: list[BundleSlot]
video_calibration: CalibrationInfo | None
depth_calibration: CalibrationInfo | None
preview_video_path: Path | None
depth_cache_path: Path
temp_dir: tempfile.TemporaryDirectory[str]
def close(self) -> None:
self.temp_dir.cleanup()
@dataclass(slots=True)
class RenderedSlot:
rgb_bgr: np.ndarray
depth_bgr: np.ndarray
title: str
metadata_lines: list[str]
def load_mcap_reader():
global _MCAP_READER_MODULE
if _MCAP_READER_MODULE is not None:
return _MCAP_READER_MODULE
if str(MCAP_PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(MCAP_PYTHON_ROOT))
_MCAP_READER_MODULE = importlib.import_module("mcap.reader")
return _MCAP_READER_MODULE
def load_message_class(schema_data: bytes, message_type_name: str):
cache_key = (schema_data, message_type_name)
cached = _MESSAGE_CLASS_CACHE.get(cache_key)
if cached is not None:
return cached
descriptor_set = descriptor_pb2.FileDescriptorSet()
descriptor_set.ParseFromString(schema_data)
pool = descriptor_pool.DescriptorPool()
has_embedded_timestamp = any(
file_descriptor.name == "google/protobuf/timestamp.proto"
for file_descriptor in descriptor_set.file
)
if has_embedded_timestamp:
for file_descriptor in descriptor_set.file:
if file_descriptor.name == "google/protobuf/timestamp.proto":
pool.Add(file_descriptor)
break
else:
pool.AddSerializedFile(timestamp_pb2.DESCRIPTOR.serialized_pb)
for file_descriptor in descriptor_set.file:
if file_descriptor.name == "google/protobuf/timestamp.proto":
continue
pool.Add(file_descriptor)
message_descriptor = pool.FindMessageTypeByName(message_type_name)
message_class = message_factory.GetMessageClass(message_descriptor)
_MESSAGE_CLASS_CACHE[cache_key] = message_class
return message_class
def parse_timestamp_ns(timestamp_message: object, fallback_log_time_ns: int) -> int:
seconds = int(getattr(timestamp_message, "seconds", 0))
nanos = int(getattr(timestamp_message, "nanos", 0))
if seconds == 0 and nanos == 0:
return fallback_log_time_ns
return seconds * 1_000_000_000 + nanos
def format_timestamp_ns(timestamp_ns: int | None) -> str:
if timestamp_ns is None:
return "-"
seconds, nanos = divmod(timestamp_ns, 1_000_000_000)
return f"{seconds}.{nanos:09d}"
def enum_name(message: object, field_name: str) -> str:
field_descriptor = message.DESCRIPTOR.fields_by_name[field_name]
value = int(getattr(message, field_name))
resolved = field_descriptor.enum_type.values_by_number.get(value)
return resolved.name if resolved is not None else str(value)
def is_present_status(status_name: str) -> bool:
return status_name in {"PRESENT", "BUNDLE_MEMBER_STATUS_PRESENT"}
def is_corrupted_gap_status(status_name: str) -> bool:
return status_name in {"CORRUPTED_GAP", "BUNDLE_MEMBER_STATUS_CORRUPTED_GAP"}
def infer_layout(path: Path) -> McapLayoutInfo:
reader_module = load_mcap_reader()
camera_labels: set[str] = set()
bundled_policy_counts: dict[str, int] = {}
saw_single = False
saw_bundled = False
saw_bundle_manifest = False
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
topic = channel.topic
if topic.startswith("/camera/"):
saw_single = True
continue
if topic == BUNDLE_TOPIC:
saw_bundled = True
saw_bundle_manifest = True
if schema is None or schema.name != "cvmmap_streamer.BundleManifest":
continue
bundle_class, _ = batch.load_bundle_manifest_type(schema.data)
bundle = bundle_class()
bundle.ParseFromString(message.data)
for member in bundle.members:
camera_labels.add(str(member.camera_label))
policy_name = enum_name(bundle, "policy")
bundled_policy_counts[policy_name] = bundled_policy_counts.get(policy_name, 0) + 1
continue
if topic.count("/") == 2 and topic.startswith("/"):
label = topic.split("/")[1]
if label and label != "camera":
saw_bundled = True
camera_labels.add(label)
if saw_single and saw_bundled:
raise click.ClickException("MCAP mixes single-camera and multi-camera topics")
if saw_single:
return McapLayoutInfo(
path=path,
layout="single-camera",
camera_labels=(SINGLE_CAMERA_LABEL,),
bundled_policy_counts=bundled_policy_counts,
)
if saw_bundled and camera_labels:
return McapLayoutInfo(
path=path,
layout="bundled" if saw_bundle_manifest else "copy",
camera_labels=tuple(sorted(camera_labels)),
bundled_policy_counts=bundled_policy_counts,
)
raise click.ClickException(f"could not infer a supported MCAP layout from {path}")
def topic_for(layout: str, camera_label: str, kind: str) -> str:
if layout == "single-camera":
return f"/camera/{kind}"
return f"/{camera_label}/{kind}"
def build_preview_video(
ffmpeg_bin: str,
raw_video_path: Path,
video_format: str,
preview_width: int,
temp_dir: Path,
) -> Path:
input_format = {"h264": "h264", "h265": "hevc"}[video_format]
# This script uses one simple RGB preview path for both sequential playback and
# random-access scrubbing, so it builds an intra-frame MJPEG cache first.
preview_path = temp_dir / "preview.avi"
command = [
ffmpeg_bin,
"-hide_banner",
"-loglevel",
"error",
"-y",
"-fflags",
"+genpts",
"-f",
input_format,
"-i",
str(raw_video_path),
"-vf",
f"scale=min(iw\\,{preview_width}):-2:flags=lanczos",
"-c:v",
"mjpeg",
"-q:v",
"3",
str(preview_path),
]
completed = subprocess.run(command, check=False, capture_output=True, text=True)
if completed.returncode != 0:
reason = completed.stderr.strip() or completed.stdout.strip() or "ffmpeg failed to build preview cache"
raise click.ClickException(reason)
return preview_path
def read_camera_state(
path: Path,
*,
layout_info: McapLayoutInfo,
camera_label: str,
ffmpeg_bin: str,
preview_width: int,
) -> CameraViewState:
reader_module = load_mcap_reader()
temp_dir = tempfile.TemporaryDirectory(prefix="mcap_rgbd_viewer_")
temp_root = Path(temp_dir.name)
raw_video_path = temp_root / "stream.bin"
depth_cache_path = temp_root / "depth.cache"
video_topic = topic_for(layout_info.layout, camera_label, "video")
depth_topic = topic_for(layout_info.layout, camera_label, "depth")
calibration_topic = topic_for(layout_info.layout, camera_label, "calibration")
depth_calibration_topic = topic_for(layout_info.layout, camera_label, "depth_calibration")
video_frames: list[VideoFrameInfo] = []
depth_frames: list[DepthFrameInfo] = []
bundle_members: list[tuple[int, int, str, int, int | None]] = []
video_format: str | None = None
video_calibration: CalibrationInfo | None = None
depth_calibration: CalibrationInfo | None = None
with raw_video_path.open("wb") as video_stream, depth_cache_path.open("wb") as depth_stream:
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
topic = channel.topic
if topic == video_topic:
if schema is None or schema.name != "foxglove.CompressedVideo":
raise click.ClickException(f"unexpected schema on {video_topic}: {schema.name if schema else 'none'}")
message_class = load_message_class(schema.data, "foxglove.CompressedVideo")
payload = message_class()
payload.ParseFromString(message.data)
frame_format = str(payload.format)
if frame_format not in {"h264", "h265"}:
raise click.ClickException(f"unsupported video format '{frame_format}' on {video_topic}")
if video_format is None:
video_format = frame_format
elif frame_format != video_format:
raise click.ClickException(
f"inconsistent video format on {video_topic}: {video_format} then {frame_format}"
)
video_stream.write(payload.data)
video_frames.append(
VideoFrameInfo(
timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)),
)
)
continue
if topic == depth_topic:
if schema is None or schema.name != "cvmmap_streamer.DepthMap":
raise click.ClickException(f"unexpected schema on {depth_topic}: {schema.name if schema else 'none'}")
message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap")
payload = message_class()
payload.ParseFromString(message.data)
offset = depth_stream.tell()
depth_bytes = bytes(payload.data)
depth_stream.write(depth_bytes)
depth_frames.append(
DepthFrameInfo(
timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)),
offset=offset,
length=len(depth_bytes),
width=int(payload.width),
height=int(payload.height),
encoding_name=enum_name(payload, "encoding"),
source_unit_name=enum_name(payload, "source_unit"),
storage_unit_name=enum_name(payload, "storage_unit"),
)
)
continue
if topic == calibration_topic:
if schema is None or schema.name != "foxglove.CameraCalibration":
continue
message_class = load_message_class(schema.data, "foxglove.CameraCalibration")
payload = message_class()
payload.ParseFromString(message.data)
if video_calibration is None:
video_calibration = CalibrationInfo(width=int(payload.width), height=int(payload.height))
continue
if topic == depth_calibration_topic:
if schema is None or schema.name != "foxglove.CameraCalibration":
continue
message_class = load_message_class(schema.data, "foxglove.CameraCalibration")
payload = message_class()
payload.ParseFromString(message.data)
if depth_calibration is None:
depth_calibration = CalibrationInfo(width=int(payload.width), height=int(payload.height))
continue
if layout_info.layout == "bundled" and topic == BUNDLE_TOPIC:
if schema is None or schema.name != "cvmmap_streamer.BundleManifest":
continue
bundle_class, present_value = batch.load_bundle_manifest_type(schema.data)
bundle = bundle_class()
bundle.ParseFromString(message.data)
for member in bundle.members:
if str(member.camera_label) != camera_label:
continue
status_name = "PRESENT" if member.HasField("timestamp") else "UNKNOWN"
if present_value is not None:
field_descriptor = member.DESCRIPTOR.fields_by_name["status"]
enum_value = field_descriptor.enum_type.values_by_number.get(int(member.status))
status_name = enum_value.name if enum_value is not None else str(member.status)
sample_timestamp_ns = None
if member.HasField("timestamp"):
sample_timestamp_ns = parse_timestamp_ns(member.timestamp, int(message.log_time))
bundle_members.append(
(
int(bundle.bundle_index),
parse_timestamp_ns(bundle.timestamp, int(message.log_time)),
status_name,
int(getattr(member, "corrupted_frames_skipped", 0)),
sample_timestamp_ns,
)
)
break
preview_video_path: Path | None = None
if video_format is not None and video_frames:
suffix = ".h265" if video_format == "h265" else ".h264"
renamed_raw_video_path = raw_video_path.with_suffix(suffix)
raw_video_path.replace(renamed_raw_video_path)
preview_video_path = build_preview_video(
ffmpeg_bin=ffmpeg_bin,
raw_video_path=renamed_raw_video_path,
video_format=video_format,
preview_width=preview_width,
temp_dir=temp_root,
)
slots: list[BundleSlot] = []
if layout_info.layout == "bundled" and bundle_members:
present_cursor = 0
usable_present_frames = min(len(video_frames), len(depth_frames))
for bundle_index, bundle_timestamp_ns, status_name, corrupted_frames_skipped, sample_timestamp_ns in bundle_members:
video_index: int | None = None
depth_index: int | None = None
if is_present_status(status_name) and present_cursor < usable_present_frames:
video_index = present_cursor
depth_index = present_cursor
present_cursor += 1
slots.append(
BundleSlot(
bundle_index=bundle_index,
bundle_timestamp_ns=bundle_timestamp_ns,
status_name=status_name,
corrupted_frames_skipped=corrupted_frames_skipped,
sample_timestamp_ns=sample_timestamp_ns,
video_index=video_index,
depth_index=depth_index,
)
)
else:
paired_frames = min(len(video_frames), len(depth_frames))
slots = [
BundleSlot(
bundle_index=index,
bundle_timestamp_ns=video_frames[index].timestamp_ns,
status_name="PRESENT",
corrupted_frames_skipped=0,
sample_timestamp_ns=video_frames[index].timestamp_ns,
video_index=index,
depth_index=index,
)
for index in range(paired_frames)
]
if not slots:
raise click.ClickException(f"no viewable RGB+depth pairs found for camera '{camera_label}'")
return CameraViewState(
mcap_path=path,
layout=layout_info.layout,
camera_label=camera_label,
video_format=video_format,
video_frames=video_frames,
depth_frames=depth_frames,
slots=slots,
video_calibration=video_calibration,
depth_calibration=depth_calibration,
preview_video_path=preview_video_path,
depth_cache_path=depth_cache_path,
temp_dir=temp_dir,
)
def render_placeholder(text: str) -> np.ndarray:
canvas = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
cv2.putText(
canvas,
text,
(40, DISPLAY_HEIGHT // 2),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(255, 255, 255),
2,
cv2.LINE_AA,
)
return canvas
def fit_to_panel(frame_bgr: np.ndarray) -> np.ndarray:
src_height, src_width = frame_bgr.shape[:2]
scale = min(DISPLAY_WIDTH / src_width, DISPLAY_HEIGHT / src_height)
resized_width = max(1, int(round(src_width * scale)))
resized_height = max(1, int(round(src_height * scale)))
resized = cv2.resize(frame_bgr, (resized_width, resized_height), interpolation=cv2.INTER_AREA)
canvas = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
x_offset = (DISPLAY_WIDTH - resized_width) // 2
y_offset = (DISPLAY_HEIGHT - resized_height) // 2
canvas[y_offset : y_offset + resized_height, x_offset : x_offset + resized_width] = resized
return canvas
def read_video_frame(
preview_video_path: Path | None,
video_index: int | None,
*,
capture: cv2.VideoCapture | None = None,
) -> np.ndarray:
if preview_video_path is None or video_index is None:
return render_placeholder("NO VIDEO")
created_capture = False
if capture is None:
capture = cv2.VideoCapture(str(preview_video_path))
created_capture = True
if not capture.isOpened():
raise click.ClickException(f"OpenCV could not open preview cache {preview_video_path}")
try:
capture.set(cv2.CAP_PROP_POS_FRAMES, float(video_index))
ok, frame = capture.read()
finally:
if created_capture:
capture.release()
if not ok or frame is None:
return render_placeholder("VIDEO SEEK FAILED")
return fit_to_panel(frame)
def decode_depth_frame(
state: CameraViewState,
depth_index: int | None,
*,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
depth_cache_stream: BinaryIO | None = None,
) -> np.ndarray:
if depth_index is None:
return render_placeholder("NO DEPTH")
try:
import rvl
except ModuleNotFoundError as error:
raise click.ClickException(
"the viewer needs the optional rvl-impl binding; run `uv sync --extra viewer`"
) from error
ref = state.depth_frames[depth_index]
if depth_cache_stream is None:
with state.depth_cache_path.open("rb") as stream:
stream.seek(ref.offset)
payload = stream.read(ref.length)
else:
depth_cache_stream.seek(ref.offset)
payload = depth_cache_stream.read(ref.length)
if ref.encoding_name == "RVL_U16_LOSSLESS":
depth = rvl.decompress_u16(payload).astype(np.float32)
if ref.storage_unit_name == "STORAGE_UNIT_MILLIMETER" or ref.source_unit_name == "DEPTH_UNIT_MILLIMETER":
depth_m = depth / 1000.0
else:
depth_m = depth
elif ref.encoding_name == "RVL_F32":
depth_m = rvl.decompress_f32(payload).astype(np.float32)
else:
return render_placeholder(ref.encoding_name)
valid = np.isfinite(depth_m) & (depth_m > 0.0)
span = max(depth_max_m - depth_min_m, 1e-6)
clipped = np.clip((depth_m - depth_min_m) / span, 0.0, 1.0)
normalized = np.zeros(depth_m.shape, dtype=np.uint8)
normalized[valid] = np.round((1.0 - clipped[valid]) * 255.0).astype(np.uint8)
colormap = DEPTH_PALETTE_TO_OPENCV[depth_palette_name]
if colormap is None:
colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR)
else:
colored = cv2.applyColorMap(normalized, colormap)
colored[~valid] = 0
return fit_to_panel(colored)
def build_metadata_lines(
state: CameraViewState,
slot: BundleSlot,
*,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
) -> list[str]:
lines = [
f"file: {state.mcap_path.name}",
f"layout: {state.layout}",
f"camera: {state.camera_label}",
f"slots: {len(state.slots)}",
f"video frames: {len(state.video_frames)}",
f"depth frames: {len(state.depth_frames)}",
f"bundle/frame index: {slot.bundle_index}",
f"bundle ts: {format_timestamp_ns(slot.bundle_timestamp_ns)}",
f"sample ts: {format_timestamp_ns(slot.sample_timestamp_ns)}",
f"status: {slot.status_name}",
]
if slot.corrupted_frames_skipped:
lines.append(f"corrupted frames skipped: {slot.corrupted_frames_skipped}")
if state.video_format is not None:
lines.append(f"video format: {state.video_format}")
if state.video_calibration is not None:
lines.append(f"video calib: {state.video_calibration.width}x{state.video_calibration.height}")
if state.depth_calibration is not None:
lines.append(f"depth calib: {state.depth_calibration.width}x{state.depth_calibration.height}")
elif slot.depth_index is not None:
ref = state.depth_frames[slot.depth_index]
lines.append(f"depth frame: {ref.width}x{ref.height} {ref.encoding_name}")
lines.append(f"depth range: {depth_min_m:.2f}m .. {depth_max_m:.2f}m")
lines.append(f"depth palette: {depth_palette_name}")
return lines
def render_slot(
state: CameraViewState,
slot_index: int,
*,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
preview_capture: cv2.VideoCapture | None = None,
depth_cache_stream: BinaryIO | None = None,
) -> RenderedSlot:
slot = state.slots[slot_index]
if is_corrupted_gap_status(slot.status_name):
rgb_frame = render_placeholder("CORRUPTED GAP")
depth_frame = render_placeholder("CORRUPTED GAP")
else:
rgb_frame = read_video_frame(state.preview_video_path, slot.video_index, capture=preview_capture)
depth_frame = decode_depth_frame(
state,
slot.depth_index,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=depth_palette_name,
depth_cache_stream=depth_cache_stream,
)
title = (
f"{state.camera_label} "
f"{'bundle' if state.layout == 'bundled' else 'frame'}={slot.bundle_index} "
f"status={slot.status_name}"
)
return RenderedSlot(
rgb_bgr=rgb_frame,
depth_bgr=depth_frame,
title=title,
metadata_lines=build_metadata_lines(
state,
slot,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=depth_palette_name,
),
)
def compose_preview_image(rendered: RenderedSlot) -> np.ndarray:
banner_height = 64
banner = np.zeros((banner_height, DISPLAY_WIDTH * 2, 3), dtype=np.uint8)
cv2.putText(
banner,
rendered.title,
(16, 40),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(255, 255, 255),
2,
cv2.LINE_AA,
)
pair = np.concatenate([rendered.rgb_bgr, rendered.depth_bgr], axis=1)
return np.concatenate([banner, pair], axis=0)
def print_summary(layout_info: McapLayoutInfo, state: CameraViewState) -> None:
click.echo(f"path: {state.mcap_path}")
click.echo(f"layout: {layout_info.layout}")
click.echo(f"camera labels: {', '.join(layout_info.camera_labels)}")
if layout_info.bundled_policy_counts:
policy_text = ", ".join(
f"{policy}={count}"
for policy, count in sorted(layout_info.bundled_policy_counts.items())
)
click.echo(f"bundle policies: {policy_text}")
click.echo(f"selected camera: {state.camera_label}")
click.echo(f"viewable slots: {len(state.slots)}")
click.echo(f"video frames: {len(state.video_frames)}")
click.echo(f"depth frames: {len(state.depth_frames)}")
if state.video_format is not None:
click.echo(f"video format: {state.video_format}")
if state.video_calibration is not None:
click.echo(f"video calibration: {state.video_calibration.width}x{state.video_calibration.height}")
if state.depth_calibration is not None:
click.echo(f"depth calibration: {state.depth_calibration.width}x{state.depth_calibration.height}")
if state.slots:
click.echo(f"first slot timestamp: {format_timestamp_ns(state.slots[0].bundle_timestamp_ns)}")
click.echo(f"last slot timestamp: {format_timestamp_ns(state.slots[-1].bundle_timestamp_ns)}")
corrupted_gap_count = sum(1 for slot in state.slots if is_corrupted_gap_status(slot.status_name))
if corrupted_gap_count:
click.echo(f"corrupted gap slots: {corrupted_gap_count}")
def write_preview_image(
state: CameraViewState,
output_path: Path,
frame_index: int,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
) -> None:
if frame_index < 0 or frame_index >= len(state.slots):
raise click.ClickException(f"--frame-index {frame_index} is outside 0..{len(state.slots) - 1}")
rendered = render_slot(
state,
frame_index,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=depth_palette_name,
)
preview = compose_preview_image(rendered)
output_path.parent.mkdir(parents=True, exist_ok=True)
if not cv2.imwrite(str(output_path), preview):
raise click.ClickException(f"failed to write preview image to {output_path}")
click.echo(f"wrote preview: {output_path}")
def slot_target_timestamp_ns(slot: BundleSlot) -> int:
return slot.sample_timestamp_ns if slot.sample_timestamp_ns is not None else slot.bundle_timestamp_ns
def nearest_slot_index(slots: list[BundleSlot], target_timestamp_ns: int) -> int:
if not slots:
return 0
return min(
range(len(slots)),
key=lambda index: abs(slot_target_timestamp_ns(slots[index]) - target_timestamp_ns),
)
def bgr_to_texture(frame_bgr: np.ndarray) -> np.ndarray:
rgba = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGBA)
return np.ascontiguousarray(rgba, dtype=np.float32) / 255.0
class ViewerApp:
def __init__(
self,
*,
mcap_path: Path,
layout_info: McapLayoutInfo,
initial_state: CameraViewState,
ffmpeg_bin: str,
preview_width: int,
depth_min_m: float,
depth_max_m: float,
) -> None:
try:
import dearpygui.dearpygui as dpg
except ModuleNotFoundError as error:
raise click.ClickException(
"the GUI viewer needs DearPyGui; run `uv sync --extra viewer` first"
) from error
self.dpg = dpg
self.mcap_path = mcap_path
self.layout_info = layout_info
self.state = initial_state
self.ffmpeg_bin = ffmpeg_bin
self.preview_width = preview_width
self.depth_min_m = depth_min_m
self.depth_max_m = depth_max_m
self.depth_palette_name = "Turbo"
self.current_index = 0
self.pending_index: int | None = None
self.pending_index_requested_at = 0.0
self.playing = False
self.playback_fps = 15.0
self.next_playback_deadline = time.monotonic()
self.playback_progress_samples: deque[tuple[float, int]] = deque()
self.playback_advanced_frames = 0
self.actual_playback_fps = 0.0
self.preview_capture: cv2.VideoCapture | None = None
self.depth_cache_stream: BinaryIO | None = None
self.open_state_resources()
def open_state_resources(self) -> None:
if self.state.preview_video_path is not None:
capture = cv2.VideoCapture(str(self.state.preview_video_path))
if not capture.isOpened():
capture.release()
raise click.ClickException(f"OpenCV could not open preview cache {self.state.preview_video_path}")
self.preview_capture = capture
else:
self.preview_capture = None
self.depth_cache_stream = self.state.depth_cache_path.open("rb")
def close_state_resources(self) -> None:
if self.preview_capture is not None:
self.preview_capture.release()
self.preview_capture = None
if self.depth_cache_stream is not None:
self.depth_cache_stream.close()
self.depth_cache_stream = None
def replace_state(self, camera_label: str) -> None:
source_index = self.pending_index if self.pending_index is not None else self.current_index
source_index = max(0, min(source_index, len(self.state.slots) - 1))
target_timestamp_ns = slot_target_timestamp_ns(self.state.slots[source_index])
new_state = read_camera_state(
self.mcap_path,
layout_info=self.layout_info,
camera_label=camera_label,
ffmpeg_bin=self.ffmpeg_bin,
preview_width=self.preview_width,
)
self.close_state_resources()
self.state.close()
self.state = new_state
self.open_state_resources()
self.current_index = nearest_slot_index(self.state.slots, target_timestamp_ns)
self.pending_index = None
self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0)
self.reset_playback_metrics()
self.dpg.configure_item("frame_slider", max_value=max(0, len(self.state.slots) - 1))
self.refresh()
def refresh(self) -> None:
rendered = render_slot(
self.state,
self.current_index,
depth_min_m=self.depth_min_m,
depth_max_m=self.depth_max_m,
depth_palette_name=self.depth_palette_name,
preview_capture=self.preview_capture,
depth_cache_stream=self.depth_cache_stream,
)
self.dpg.set_value("rgb_texture", bgr_to_texture(rendered.rgb_bgr).ravel())
self.dpg.set_value("depth_texture", bgr_to_texture(rendered.depth_bgr).ravel())
self.dpg.set_value("viewer_title", rendered.title)
self.dpg.set_value("frame_slider", self.current_index)
self.dpg.set_value("frame_text", f"{self.current_index + 1}/{len(self.state.slots)}")
self.dpg.set_value("metadata_text", "\n".join(rendered.metadata_lines))
def set_frame_index(self, index: int, *, debounce: bool = False) -> None:
clamped = max(0, min(index, len(self.state.slots) - 1))
if debounce:
self.pending_index = clamped
self.pending_index_requested_at = time.monotonic()
self.dpg.set_value("frame_text", f"{clamped + 1}/{len(self.state.slots)}")
return
self.pending_index = None
self.current_index = clamped
self.refresh()
def set_playing(self, playing: bool) -> None:
self.playing = playing
self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0)
self.reset_playback_metrics()
def toggle_playing(self) -> None:
self.set_playing(not self.playing)
def set_playback_fps(self, playback_fps: float) -> None:
self.playback_fps = playback_fps
self.next_playback_deadline = time.monotonic() + 1.0 / max(self.playback_fps, 1.0)
self.update_playback_status()
def reset_playback_metrics(self) -> None:
self.playback_progress_samples.clear()
self.playback_advanced_frames = 0
self.actual_playback_fps = 0.0
self.update_playback_status()
def record_playback_step(self, timestamp_s: float, frames_advanced: int) -> None:
self.playback_advanced_frames += frames_advanced
self.playback_progress_samples.append((timestamp_s, self.playback_advanced_frames))
cutoff = timestamp_s - PLAYBACK_FPS_WINDOW_SECONDS
while self.playback_progress_samples and self.playback_progress_samples[0][0] < cutoff:
self.playback_progress_samples.popleft()
if len(self.playback_progress_samples) >= 2:
elapsed_s = self.playback_progress_samples[-1][0] - self.playback_progress_samples[0][0]
if elapsed_s > 0.0:
advanced_frames = self.playback_progress_samples[-1][1] - self.playback_progress_samples[0][1]
self.actual_playback_fps = advanced_frames / elapsed_s
else:
self.actual_playback_fps = 0.0
else:
self.actual_playback_fps = 0.0
self.update_playback_status()
def update_playback_status(self) -> None:
if not self.dpg.does_item_exist("playback_status_text"):
return
if (
self.playing
and self.actual_playback_fps > 0.0
and (
self.actual_playback_fps < self.playback_fps * PLAYBACK_FPS_WARNING_RATIO
and self.actual_playback_fps < self.playback_fps - PLAYBACK_FPS_WARNING_ABS_TOLERANCE
)
):
self.dpg.set_value(
"playback_status_text",
f"Warning: actual playback {self.actual_playback_fps:.1f} FPS is below target {self.playback_fps:.1f} FPS",
)
return
self.dpg.set_value("playback_status_text", "")
def flush_pending_index(self) -> None:
if self.pending_index is None:
return
if time.monotonic() - self.pending_index_requested_at < FRAME_SLIDER_DEBOUNCE_SECONDS:
return
target_index = self.pending_index
self.pending_index = None
self.current_index = target_index
self.refresh()
def maybe_advance(self) -> None:
self.flush_pending_index()
if not self.playing:
return
interval_s = 1.0 / max(self.playback_fps, 1.0)
now = time.monotonic()
if now < self.next_playback_deadline:
return
frames_due = max(1, int((now - self.next_playback_deadline) / interval_s) + 1)
self.next_playback_deadline += frames_due * interval_s
next_index = self.current_index + frames_due
if next_index >= len(self.state.slots):
frames_due = len(self.state.slots) - 1 - self.current_index
if frames_due <= 0:
self.set_playing(False)
return
next_index = len(self.state.slots) - 1
self.set_frame_index(next_index)
self.record_playback_step(time.monotonic(), frames_due)
if next_index >= len(self.state.slots) - 1:
self.set_playing(False)
def process_ui_callbacks(self) -> None:
jobs = self.dpg.get_callback_queue()
if jobs:
self.dpg.run_callbacks(jobs)
def run(self) -> None:
dpg = self.dpg
dpg.create_context()
dpg.configure_app(manual_callback_management=True)
with dpg.texture_registry(show=False):
dpg.add_dynamic_texture(DISPLAY_WIDTH, DISPLAY_HEIGHT, [0.0] * (DISPLAY_WIDTH * DISPLAY_HEIGHT * 4), tag="rgb_texture")
dpg.add_dynamic_texture(DISPLAY_WIDTH, DISPLAY_HEIGHT, [0.0] * (DISPLAY_WIDTH * DISPLAY_HEIGHT * 4), tag="depth_texture")
with dpg.window(tag="main_window", label="MCAP RGBD Viewer", no_close=True):
dpg.add_text(default_value="", tag="viewer_title")
with dpg.group(horizontal=True):
if len(self.layout_info.camera_labels) > 1:
dpg.add_combo(
items=list(self.layout_info.camera_labels),
default_value=self.state.camera_label,
label="Camera",
width=160,
callback=lambda _s, app_data: self.replace_state(str(app_data)),
)
dpg.add_button(
label="Play/Pause",
callback=lambda *_args: self.toggle_playing(),
)
dpg.add_button(label="Prev", callback=lambda *_args: self.set_frame_index(self.current_index - 1))
dpg.add_button(label="Next", callback=lambda *_args: self.set_frame_index(self.current_index + 1))
dpg.add_slider_float(
label="Playback FPS",
min_value=1.0,
max_value=60.0,
default_value=self.playback_fps,
width=240,
callback=lambda _s, app_data: self.set_playback_fps(float(app_data)),
)
dpg.add_text(default_value="", tag="playback_status_text", color=(255, 96, 96))
with dpg.group(horizontal=True):
dpg.add_slider_float(
label="Depth Min (m)",
min_value=0.0,
max_value=10.0,
default_value=self.depth_min_m,
width=260,
callback=lambda _s, app_data: self._set_depth_min(float(app_data)),
)
dpg.add_slider_float(
label="Depth Max (m)",
min_value=0.1,
max_value=20.0,
default_value=self.depth_max_m,
width=260,
callback=lambda _s, app_data: self._set_depth_max(float(app_data)),
)
dpg.add_combo(
items=list(DEPTH_PALETTE_TO_OPENCV.keys()),
default_value=self.depth_palette_name,
label="Palette",
width=160,
callback=lambda _s, app_data: self._set_depth_palette(str(app_data)),
)
with dpg.group(horizontal=True):
with dpg.child_window(width=DISPLAY_WIDTH + 16, height=DISPLAY_HEIGHT + 20):
dpg.add_text("RGB")
dpg.add_image("rgb_texture")
with dpg.child_window(width=DISPLAY_WIDTH + 16, height=DISPLAY_HEIGHT + 20):
dpg.add_text("Depth")
dpg.add_image("depth_texture")
with dpg.child_window(width=METADATA_PANEL_WIDTH, height=DISPLAY_HEIGHT + 20):
dpg.add_text("Metadata")
dpg.add_text(default_value="", wrap=METADATA_PANEL_WIDTH - 24, tag="metadata_text")
with dpg.group(horizontal=True):
dpg.add_slider_int(
label="Frame/Bundle",
min_value=0,
max_value=max(0, len(self.state.slots) - 1),
default_value=0,
tag="frame_slider",
width=DISPLAY_WIDTH * 2 + 80,
callback=lambda _s, app_data: self.set_frame_index(int(app_data), debounce=True),
)
dpg.add_text(default_value="", tag="frame_text")
with dpg.handler_registry():
dpg.add_key_press_handler(dpg.mvKey_Left, callback=lambda *_args: self.set_frame_index(self.current_index - 1))
dpg.add_key_press_handler(dpg.mvKey_Right, callback=lambda *_args: self.set_frame_index(self.current_index + 1))
dpg.add_key_press_handler(dpg.mvKey_Home, callback=lambda *_args: self.set_frame_index(0))
dpg.add_key_press_handler(dpg.mvKey_End, callback=lambda *_args: self.set_frame_index(len(self.state.slots) - 1))
dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=lambda *_args: self.toggle_playing())
dpg.create_viewport(
title=f"MCAP RGBD Viewer - {self.mcap_path.name}",
width=VIEWPORT_WIDTH,
height=VIEWPORT_HEIGHT,
)
dpg.setup_dearpygui()
dpg.show_viewport()
self.refresh()
try:
while dpg.is_dearpygui_running():
dpg.render_dearpygui_frame()
self.process_ui_callbacks()
self.maybe_advance()
finally:
self.close_state_resources()
dpg.destroy_context()
self.state.close()
def _set_depth_min(self, value: float) -> None:
self.depth_min_m = min(value, self.depth_max_m - 0.01)
self.refresh()
def _set_depth_max(self, value: float) -> None:
self.depth_max_m = max(value, self.depth_min_m + 0.01)
self.refresh()
def _set_depth_palette(self, value: str) -> None:
if value in DEPTH_PALETTE_TO_OPENCV:
self.depth_palette_name = value
self.refresh()
@click.command()
@click.argument("mcap_path", type=click.Path(path_type=Path, exists=True))
@click.option("--camera-label", help="Camera label to view. Defaults to 'camera' or the first sorted multi-camera label.")
@click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to build the preview cache.")
@click.option("--preview-width", default=1280, show_default=True, type=click.IntRange(min=64), help="Maximum width of the temporary decoded preview cache.")
@click.option("--frame-index", default=0, show_default=True, type=click.IntRange(min=0), help="Frame or bundle index used for --export-preview.")
@click.option("--depth-min-m", default=0.2, show_default=True, type=float, help="Minimum displayed depth in meters.")
@click.option("--depth-max-m", default=5.0, show_default=True, type=float, help="Maximum displayed depth in meters.")
@click.option(
"--depth-palette",
default="Turbo",
show_default=True,
type=click.Choice(tuple(DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False),
help="Depth color palette.",
)
@click.option("--summary-only", is_flag=True, help="Print layout/camera/frame metadata and exit without opening a GUI.")
@click.option("--export-preview", type=click.Path(path_type=Path), help="Write a side-by-side RGB/depth preview PNG and exit.")
def main(
mcap_path: Path,
camera_label: str | None,
ffmpeg_bin: str,
preview_width: int,
frame_index: int,
depth_min_m: float,
depth_max_m: float,
depth_palette: str,
summary_only: bool,
export_preview: Path | None,
) -> None:
layout_info = infer_layout(mcap_path.resolve())
if camera_label is None:
selected_camera = layout_info.camera_labels[0]
else:
selected_camera = camera_label
if selected_camera not in layout_info.camera_labels:
raise click.ClickException(
f"camera label '{selected_camera}' not found. available: {', '.join(layout_info.camera_labels)}"
)
selected_depth_palette = next(
palette_name for palette_name in DEPTH_PALETTE_TO_OPENCV if palette_name.lower() == depth_palette.lower()
)
state = read_camera_state(
mcap_path.resolve(),
layout_info=layout_info,
camera_label=selected_camera,
ffmpeg_bin=ffmpeg_bin,
preview_width=preview_width,
)
try:
if summary_only:
print_summary(layout_info, state)
return
if export_preview is not None:
export_preview_path = export_preview.expanduser().resolve()
write_preview_image(
state,
export_preview_path,
frame_index=frame_index,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=selected_depth_palette,
)
return
viewer = ViewerApp(
mcap_path=mcap_path.resolve(),
layout_info=layout_info,
initial_state=state,
ffmpeg_bin=ffmpeg_bin,
preview_width=preview_width,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
)
viewer.depth_palette_name = selected_depth_palette
viewer.run()
finally:
with contextlib.suppress(Exception):
state.close()
if __name__ == "__main__":
main()