#!/usr/bin/env python3 from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import BinaryIO import click import cv2 import numpy as np import mcap_rgbd_viewer as viewer @dataclass(slots=True, frozen=True) class CameraCalibration: width: int height: int distortion_model: str distortion: tuple[float, float, float, float, float] intrinsic_matrix: tuple[float, float, float, float, float, float, float, float, float] rectification_matrix: tuple[float, float, float, float, float, float, float, float, float] projection_matrix: tuple[float, float, float, float, float, float, float, float, float, float, float, float] @property def fx(self) -> float: return self.intrinsic_matrix[0] @property def fy(self) -> float: return self.intrinsic_matrix[4] @property def cx(self) -> float: return self.intrinsic_matrix[2] @property def cy(self) -> float: return self.intrinsic_matrix[5] @dataclass(slots=True, frozen=True) class CalibrationPair: video: CameraCalibration depth: CameraCalibration @dataclass(slots=True, frozen=True) class AffineMapping: scale_x: float scale_y: float offset_x: float offset_y: float def matrix(self) -> np.ndarray: return np.array( [ [self.scale_x, 0.0, self.offset_x], [0.0, self.scale_y, self.offset_y], ], dtype=np.float32, ) def select_camera_label(layout_info: viewer.McapLayoutInfo, camera_label: str | None) -> str: if camera_label is None: return layout_info.camera_labels[0] if camera_label not in layout_info.camera_labels: available = ", ".join(layout_info.camera_labels) raise click.ClickException(f"camera label '{camera_label}' not found; available: {available}") return camera_label def load_calibration(path: Path, topic: str) -> CameraCalibration: reader_module = viewer.load_mcap_reader() with path.open("rb") as stream: reader = reader_module.make_reader(stream) for schema, channel, message in reader.iter_messages(): if channel.topic != topic: continue if schema is None or schema.name != "foxglove.CameraCalibration": raise click.ClickException(f"unexpected schema on {topic}: {schema.name if schema else 'none'}") message_class = viewer.load_message_class(schema.data, "foxglove.CameraCalibration") payload = message_class() payload.ParseFromString(message.data) return CameraCalibration( width=int(payload.width), height=int(payload.height), distortion_model=str(payload.distortion_model), distortion=tuple(float(value) for value in payload.D[:5]), intrinsic_matrix=tuple(float(value) for value in payload.K[:9]), rectification_matrix=tuple(float(value) for value in payload.R[:9]), projection_matrix=tuple(float(value) for value in payload.P[:12]), ) raise click.ClickException(f"missing calibration topic {topic} in {path}") def load_calibration_pair(path: Path, layout_info: viewer.McapLayoutInfo, camera_label: str) -> CalibrationPair: video_topic = viewer.topic_for(layout_info.layout, camera_label, "calibration") depth_topic = viewer.topic_for(layout_info.layout, camera_label, "depth_calibration") return CalibrationPair( video=load_calibration(path, video_topic), depth=load_calibration(path, depth_topic), ) def mapping_from_depth_to_rgb(pair: CalibrationPair) -> AffineMapping: scale_x = pair.video.fx / pair.depth.fx scale_y = pair.video.fy / pair.depth.fy offset_x = pair.video.cx - (scale_x * pair.depth.cx) offset_y = pair.video.cy - (scale_y * pair.depth.cy) return AffineMapping(scale_x=scale_x, scale_y=scale_y, offset_x=offset_x, offset_y=offset_y) def mapping_from_rgb_to_depth(pair: CalibrationPair) -> AffineMapping: scale_x = pair.depth.fx / pair.video.fx scale_y = pair.depth.fy / pair.video.fy offset_x = pair.depth.cx - (scale_x * pair.video.cx) offset_y = pair.depth.cy - (scale_y * pair.video.cy) return AffineMapping(scale_x=scale_x, scale_y=scale_y, offset_x=offset_x, offset_y=offset_y) def describe_mapping(pair: CalibrationPair) -> str: depth_to_rgb = mapping_from_depth_to_rgb(pair) rgb_to_depth = mapping_from_rgb_to_depth(pair) anisotropic = abs(depth_to_rgb.scale_x - depth_to_rgb.scale_y) > 1e-6 has_offset = abs(depth_to_rgb.offset_x) > 1e-3 or abs(depth_to_rgb.offset_y) > 1e-3 shape = "anisotropic stretch" if anisotropic else "uniform scale" if has_offset: shape += " with offset" else: shape += " with zero offset" return ( f"mapping type: {shape}\n" f"depth->rgb: u_rgb = {depth_to_rgb.scale_x:.9f} * u_depth + {depth_to_rgb.offset_x:.9f}\n" f"depth->rgb: v_rgb = {depth_to_rgb.scale_y:.9f} * v_depth + {depth_to_rgb.offset_y:.9f}\n" f"rgb->depth: u_depth = {rgb_to_depth.scale_x:.9f} * u_rgb + {rgb_to_depth.offset_x:.9f}\n" f"rgb->depth: v_depth = {rgb_to_depth.scale_y:.9f} * v_rgb + {rgb_to_depth.offset_y:.9f}" ) def is_identity_rectification(calibration: CameraCalibration) -> bool: expected = (1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0) return max(abs(value - target) for value, target in zip(calibration.rectification_matrix, expected, strict=True)) < 1e-6 def has_zero_distortion(calibration: CameraCalibration) -> bool: return max(abs(value) for value in calibration.distortion) < 1e-9 def depth_pixel_to_rgb(depth_u: float, depth_v: float, pair: CalibrationPair) -> tuple[float, float]: mapping = mapping_from_depth_to_rgb(pair) return ( (mapping.scale_x * depth_u) + mapping.offset_x, (mapping.scale_y * depth_v) + mapping.offset_y, ) def rgb_pixel_to_depth(rgb_u: float, rgb_v: float, pair: CalibrationPair) -> tuple[float, float]: mapping = mapping_from_rgb_to_depth(pair) return ( (mapping.scale_x * rgb_u) + mapping.offset_x, (mapping.scale_y * rgb_v) + mapping.offset_y, ) def align_depth_to_rgb( depth_image: np.ndarray, pair: CalibrationPair, *, interpolation: int = cv2.INTER_NEAREST, ) -> np.ndarray: mapping = mapping_from_depth_to_rgb(pair) return cv2.warpAffine( depth_image, mapping.matrix(), (pair.video.width, pair.video.height), flags=interpolation, borderMode=cv2.BORDER_CONSTANT, borderValue=0, ) def align_rgb_to_depth( rgb_image: np.ndarray, pair: CalibrationPair, *, interpolation: int = cv2.INTER_LINEAR, ) -> np.ndarray: mapping = mapping_from_rgb_to_depth(pair) return cv2.warpAffine( rgb_image, mapping.matrix(), (pair.depth.width, pair.depth.height), flags=interpolation, borderMode=cv2.BORDER_CONSTANT, borderValue=0, ) def load_depth_array(state: viewer.CameraViewState, depth_index: int, depth_cache_stream: BinaryIO | None = None) -> np.ndarray: try: import rvl except ModuleNotFoundError as error: raise click.ClickException( "image export needs the optional rvl-impl binding; run `uv sync --extra viewer`" ) from error ref = state.depth_frames[depth_index] if depth_cache_stream is None: with state.depth_cache_path.open("rb") as stream: stream.seek(ref.offset) payload = stream.read(ref.length) else: depth_cache_stream.seek(ref.offset) payload = depth_cache_stream.read(ref.length) if ref.encoding_name == "RVL_U16_LOSSLESS": depth = rvl.decompress_u16(payload).reshape(ref.height, ref.width) return depth.astype(np.float32) if ref.encoding_name == "RVL_F32": return rvl.decompress_f32(payload).reshape(ref.height, ref.width).astype(np.float32) raise click.ClickException(f"unsupported depth encoding '{ref.encoding_name}'") def resolve_present_slot(state: viewer.CameraViewState, frame_index: int) -> tuple[int, viewer.BundleSlot]: if not state.slots: raise click.ClickException("MCAP does not contain any viewable RGB+depth pairs") clamped = max(0, min(frame_index, len(state.slots) - 1)) slot = state.slots[clamped] if slot.video_index is not None and slot.depth_index is not None: return clamped, slot for delta in range(1, len(state.slots)): left = clamped - delta if left >= 0: candidate = state.slots[left] if candidate.video_index is not None and candidate.depth_index is not None: return left, candidate right = clamped + delta if right < len(state.slots): candidate = state.slots[right] if candidate.video_index is not None and candidate.depth_index is not None: return right, candidate raise click.ClickException("could not find a present RGB+depth slot") def colorize_depth(depth_m: np.ndarray, palette_name: str) -> np.ndarray: valid = np.isfinite(depth_m) & (depth_m > 0.0) normalized = np.zeros(depth_m.shape, dtype=np.uint8) if valid.any(): lo = float(np.percentile(depth_m[valid], 5.0)) hi = float(np.percentile(depth_m[valid], 95.0)) span = max(hi - lo, 1e-6) scaled = np.clip((depth_m - lo) / span, 0.0, 1.0) normalized[valid] = np.round((1.0 - scaled[valid]) * 255.0).astype(np.uint8) colormap = viewer.DEPTH_PALETTE_TO_OPENCV[palette_name] if colormap is None: colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR) else: colored = cv2.applyColorMap(normalized, colormap) colored[~valid] = 0 return colored def export_example_images( path: Path, *, layout_info: viewer.McapLayoutInfo, camera_label: str, pair: CalibrationPair, frame_index: int, ffmpeg_bin: str, output_dir: Path, palette_name: str, ) -> None: state = viewer.read_camera_state( path, layout_info=layout_info, camera_label=camera_label, ffmpeg_bin=ffmpeg_bin, preview_width=pair.video.width, ) try: resolved_index, slot = resolve_present_slot(state, frame_index) capture = cv2.VideoCapture(str(state.preview_video_path)) capture.set(cv2.CAP_PROP_POS_FRAMES, float(slot.video_index)) ok, rgb_bgr = capture.read() capture.release() if not ok or rgb_bgr is None: raise click.ClickException(f"could not decode RGB frame {slot.video_index}") depth_native = load_depth_array(state, slot.depth_index) / 1000.0 depth_aligned = align_depth_to_rgb(depth_native, pair, interpolation=cv2.INTER_NEAREST) rgb_aligned = align_rgb_to_depth(rgb_bgr, pair, interpolation=cv2.INTER_LINEAR) output_dir.mkdir(parents=True, exist_ok=True) rgb_path = output_dir / "rgb_frame.png" depth_native_path = output_dir / "depth_native_colorized.png" depth_aligned_path = output_dir / "depth_aligned_to_rgb_colorized.png" overlay_path = output_dir / "depth_overlay_on_rgb.png" rgb_to_depth_path = output_dir / "rgb_aligned_to_depth.png" depth_native_color = colorize_depth(depth_native, palette_name) depth_aligned_color = colorize_depth(depth_aligned, palette_name) overlay = cv2.addWeighted(rgb_bgr, 0.72, depth_aligned_color, 0.28, 0.0) cv2.imwrite(str(rgb_path), rgb_bgr) cv2.imwrite(str(depth_native_path), depth_native_color) cv2.imwrite(str(depth_aligned_path), depth_aligned_color) cv2.imwrite(str(overlay_path), overlay) cv2.imwrite(str(rgb_to_depth_path), rgb_aligned) click.echo(f"exported slot index: {resolved_index}") click.echo(f"rgb frame: {rgb_path}") click.echo(f"native depth: {depth_native_path}") click.echo(f"depth aligned to rgb: {depth_aligned_path}") click.echo(f"depth overlay on rgb: {overlay_path}") click.echo(f"rgb aligned to depth: {rgb_to_depth_path}") finally: state.close() @click.command() @click.argument("mcap_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)) @click.option("--camera-label", type=str, help="Camera label to inspect; defaults to the first camera in the MCAP.") @click.option("--frame-index", type=int, default=0, show_default=True, help="Frame or bundle index used for example image export.") @click.option("--output-dir", type=click.Path(path_type=Path, file_okay=False), help="When set, export an aligned depth example and overlay PNGs here.") @click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to decode MCAP video for the example export.") @click.option( "--depth-palette", type=click.Choice(tuple(viewer.DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False), default="Turbo", show_default=True, help="Depth palette used for exported example PNGs.", ) def main( mcap_path: Path, camera_label: str | None, frame_index: int, output_dir: Path | None, ffmpeg_bin: str, depth_palette: str, ) -> None: """Explain and demonstrate how depth/rgb alignment works for an exported MCAP.""" layout_info = viewer.infer_layout(mcap_path) selected_camera = select_camera_label(layout_info, camera_label) pair = load_calibration_pair(mcap_path, layout_info, selected_camera) click.echo(f"path: {mcap_path}") click.echo(f"layout: {layout_info.layout}") click.echo(f"camera: {selected_camera}") click.echo(f"video calibration: {pair.video.width}x{pair.video.height}") click.echo(f"depth calibration: {pair.depth.width}x{pair.depth.height}") click.echo( "video intrinsics: " f"fx={pair.video.fx:.6f} fy={pair.video.fy:.6f} cx={pair.video.cx:.6f} cy={pair.video.cy:.6f}" ) click.echo( "depth intrinsics: " f"fx={pair.depth.fx:.6f} fy={pair.depth.fy:.6f} cx={pair.depth.cx:.6f} cy={pair.depth.cy:.6f}" ) click.echo( "zero distortion / identity rectification: " f"video={has_zero_distortion(pair.video) and is_identity_rectification(pair.video)} " f"depth={has_zero_distortion(pair.depth) and is_identity_rectification(pair.depth)}" ) click.echo(describe_mapping(pair)) sample_depth_u = pair.depth.width * 0.5 sample_depth_v = pair.depth.height * 0.5 mapped_rgb_u, mapped_rgb_v = depth_pixel_to_rgb(sample_depth_u, sample_depth_v, pair) click.echo( "sample center mapping: " f"depth({sample_depth_u:.3f}, {sample_depth_v:.3f}) -> rgb({mapped_rgb_u:.3f}, {mapped_rgb_v:.3f})" ) sample_rgb_u = pair.video.width * 0.5 sample_rgb_v = pair.video.height * 0.5 mapped_depth_u, mapped_depth_v = rgb_pixel_to_depth(sample_rgb_u, sample_rgb_v, pair) click.echo( "sample inverse mapping: " f"rgb({sample_rgb_u:.3f}, {sample_rgb_v:.3f}) -> depth({mapped_depth_u:.3f}, {mapped_depth_v:.3f})" ) if output_dir is not None: export_example_images( mcap_path, layout_info=layout_info, camera_label=selected_camera, pair=pair, frame_index=frame_index, ffmpeg_bin=ffmpeg_bin, output_dir=output_dir, palette_name=depth_palette, ) if __name__ == "__main__": main()