#!/usr/bin/env python3 from __future__ import annotations import argparse import math import os import shlex import subprocess import sys import tempfile from collections import Counter from pathlib import Path from typing import Iterable import cv2 import numpy as np SCRIPT_PATH = Path(__file__).resolve() REPO_ROOT = SCRIPT_PATH.parents[1] WORKSPACE_ROOT = REPO_ROOT.parent MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap" if str(MCAP_PYTHON_ROOT) not in sys.path: sys.path.insert(0, str(MCAP_PYTHON_ROOT)) from mcap.reader import make_reader # noqa: E402 VIDEO_FORMATS = ("h264", "h265") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Convert ZED SVO/SVO2 recordings to MCAP and generate a lightweight preview. " "If the input is already an MCAP file, conversion is skipped." ) ) parser.add_argument("input", help="Input .svo/.svo2 file, .mcap file, or a directory containing SVO files") parser.add_argument("--output-dir", help="Directory for generated MCAP files and previews") parser.add_argument( "--preview-all", action="store_true", help="When the input is a directory, generate a preview for every converted MCAP instead of just the first one", ) parser.add_argument("--no-preview", action="store_true", help="Convert only, do not generate preview images") parser.add_argument( "--format", choices=("auto", "h264", "h265"), default="auto", help="CompressedVideo format to extract from MCAP during preview", ) parser.add_argument("--codec", choices=VIDEO_FORMATS, default="h264", help="Video codec for SVO to MCAP conversion") parser.add_argument( "--encoder-device", choices=("auto", "nvidia", "software"), default="software", help="Encoder device passed to zed_svo_to_mcap", ) parser.add_argument( "--mcap-compression", choices=("none", "lz4", "zstd"), default="none", help="MCAP chunk compression passed to zed_svo_to_mcap", ) parser.add_argument( "--depth-mode", choices=("neural_light", "neural", "neural_plus"), default="neural", help="Depth mode passed to zed_svo_to_mcap", ) parser.add_argument( "--depth-size", default="optimal", help="Depth size passed to zed_svo_to_mcap (optimal|native|x)", ) parser.add_argument("--start-frame", type=int, default=0, help="First SVO frame to convert") parser.add_argument("--end-frame", type=int, help="Last SVO frame to convert") parser.add_argument( "--sample-count", type=int, default=9, help="Number of decoded frames to place in the preview contact sheet", ) parser.add_argument( "--frame-step", type=int, default=15, help="Decode every Nth frame for the contact sheet", ) parser.add_argument( "--contact-sheet-width", type=int, default=480, help="Width of each preview tile in pixels", ) parser.add_argument( "--cuda-visible-devices", help=( "Optional CUDA_VISIBLE_DEVICES value to export while running zed_svo_to_mcap. " "Useful when the ZED SDK must be pinned to a specific GPU UUID." ), ) parser.add_argument("--zed-bin", help="Explicit path to zed_svo_to_mcap") parser.add_argument("--reader-bin", help="Explicit path to mcap_reader_tester") return parser.parse_args() def locate_binary(name: str, override: str | None) -> Path: if override: path = Path(override).expanduser().resolve() if not path.is_file(): raise FileNotFoundError(f"binary not found: {path}") return path candidates = ( REPO_ROOT / "build" / "bin" / name, REPO_ROOT / "build" / name, ) for candidate in candidates: if candidate.is_file(): return candidate raise FileNotFoundError(f"could not find {name} under {REPO_ROOT / 'build'}") def quote_command(args: Iterable[str]) -> str: return " ".join(shlex.quote(arg) for arg in args) def run(args: list[str], env: dict[str, str] | None = None) -> None: print(f"$ {quote_command(args)}", flush=True) subprocess.run(args, check=True, env=env) def summarize_mcap(mcap_path: Path) -> list[tuple[str, str, str, int]]: counts: Counter[tuple[str, str, str]] = Counter() with mcap_path.open("rb") as stream: reader = make_reader(stream) for schema, channel, _message in reader.iter_messages(): schema_name = schema.name if schema is not None else "" counts[(channel.topic, channel.message_encoding, schema_name)] += 1 summary_rows = [ (topic, encoding, schema_name, count) for (topic, encoding, schema_name), count in sorted(counts.items()) ] print(f"MCAP summary: {mcap_path}") for topic, encoding, schema_name, count in summary_rows: print(f" {count:6d} topic={topic} encoding={encoding} schema={schema_name}") return summary_rows def infer_video_format(reader_bin: Path, mcap_path: Path, requested: str) -> str: if requested != "auto": return requested for candidate in VIDEO_FORMATS: result = subprocess.run( [str(reader_bin), str(mcap_path), "--expect-format", candidate, "--min-messages", "1"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False, ) if result.returncode == 0: return candidate raise RuntimeError(f"could not infer video format from {mcap_path}") def dump_annexb(reader_bin: Path, mcap_path: Path, video_format: str, output_path: Path) -> None: run( [ str(reader_bin), str(mcap_path), "--expect-format", video_format, "--min-messages", "1", "--dump-annexb-output", str(output_path), ] ) def make_contact_sheet(stream_path: Path, image_path: Path, sample_count: int, frame_step: int, tile_width: int) -> int: capture = cv2.VideoCapture(str(stream_path)) if not capture.isOpened(): raise RuntimeError(f"OpenCV could not open decoded stream {stream_path}") frames: list[np.ndarray] = [] frame_index = 0 while len(frames) < sample_count: ok, frame = capture.read() if not ok: break if frame_index % frame_step == 0: annotated = frame.copy() cv2.putText( annotated, f"frame {frame_index}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2, cv2.LINE_AA, ) frames.append(annotated) frame_index += 1 capture.release() if not frames: raise RuntimeError(f"no frames decoded from {stream_path}") tile_width = max(64, tile_width) resized: list[np.ndarray] = [] for frame in frames: scale = tile_width / frame.shape[1] tile_height = max(1, int(round(frame.shape[0] * scale))) resized.append(cv2.resize(frame, (tile_width, tile_height), interpolation=cv2.INTER_AREA)) max_height = max(frame.shape[0] for frame in resized) padded: list[np.ndarray] = [] for frame in resized: if frame.shape[0] == max_height: padded.append(frame) continue canvas = np.zeros((max_height, frame.shape[1], 3), dtype=np.uint8) canvas[: frame.shape[0], :, :] = frame padded.append(canvas) columns = max(1, math.ceil(math.sqrt(len(padded)))) rows = math.ceil(len(padded) / columns) blank = np.zeros_like(padded[0]) row_images: list[np.ndarray] = [] for row_index in range(rows): row_frames = padded[row_index * columns : (row_index + 1) * columns] while len(row_frames) < columns: row_frames.append(blank) row_images.append(np.concatenate(row_frames, axis=1)) sheet = np.concatenate(row_images, axis=0) image_path.parent.mkdir(parents=True, exist_ok=True) if not cv2.imwrite(str(image_path), sheet): raise RuntimeError(f"failed to write preview image {image_path}") print(f"Preview contact sheet: {image_path}") return len(frames) def collect_svo_inputs(input_path: Path) -> list[Path]: if input_path.is_file(): if input_path.suffix.lower() in {".svo", ".svo2"}: return [input_path] if input_path.suffix.lower() == ".mcap": return [] raise ValueError(f"unsupported input file: {input_path}") if input_path.is_dir(): return sorted( path for path in input_path.rglob("*") if path.suffix.lower() in {".svo", ".svo2"} ) raise FileNotFoundError(f"input not found: {input_path}") def default_output_dir(input_path: Path) -> Path: if input_path.is_dir(): return input_path / "mcap_preview" return input_path.parent / "mcap_preview" def convert_svo( zed_bin: Path, svo_path: Path, mcap_path: Path, args: argparse.Namespace, ) -> None: env = os.environ.copy() if args.cuda_visible_devices: env["CUDA_VISIBLE_DEVICES"] = args.cuda_visible_devices command = [ str(zed_bin), "--input", str(svo_path), "--output", str(mcap_path), "--codec", args.codec, "--encoder-device", args.encoder_device, "--mcap-compression", args.mcap_compression, "--depth-mode", args.depth_mode, "--depth-size", args.depth_size, "--start-frame", str(args.start_frame), ] if args.end_frame is not None: command.extend(["--end-frame", str(args.end_frame)]) mcap_path.parent.mkdir(parents=True, exist_ok=True) run(command, env=env) def preview_mcap(reader_bin: Path, mcap_path: Path, args: argparse.Namespace) -> None: summarize_mcap(mcap_path) video_format = infer_video_format(reader_bin, mcap_path, args.format) print(f"Detected video format: {video_format}") stream_extension = ".h265" if video_format == "h265" else ".h264" with tempfile.TemporaryDirectory(prefix="zed_mcap_preview_") as temp_dir: temp_root = Path(temp_dir) stream_path = temp_root / f"preview{stream_extension}" dump_annexb(reader_bin, mcap_path, video_format, stream_path) preview_path = mcap_path.with_suffix(".preview.png") decoded = make_contact_sheet( stream_path, preview_path, sample_count=args.sample_count, frame_step=args.frame_step, tile_width=args.contact_sheet_width, ) print(f"Decoded {decoded} preview frame(s)") def main() -> int: args = parse_args() input_path = Path(args.input).expanduser().resolve() output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else default_output_dir(input_path) output_dir.mkdir(parents=True, exist_ok=True) reader_bin = locate_binary("mcap_reader_tester", args.reader_bin) zed_bin = locate_binary("zed_svo_to_mcap", args.zed_bin) if input_path.suffix.lower() != ".mcap" or input_path.is_dir() else None if input_path.is_file() and input_path.suffix.lower() == ".mcap": if not args.no_preview: preview_mcap(reader_bin, input_path, args) return 0 svo_inputs = collect_svo_inputs(input_path) if not svo_inputs: raise RuntimeError(f"no .svo/.svo2 files found under {input_path}") converted_paths: list[Path] = [] for svo_path in svo_inputs: output_name = f"{svo_path.stem}.mcap" mcap_path = output_dir / output_name convert_svo(zed_bin, svo_path, mcap_path, args) converted_paths.append(mcap_path) if args.no_preview: return 0 preview_targets = converted_paths if args.preview_all else converted_paths[:1] for mcap_path in preview_targets: preview_mcap(reader_bin, mcap_path, args) print("Generated MCAP files:") for mcap_path in converted_paths: print(f" {mcap_path}") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: raise SystemExit(130)