refactor(zed): remove extracted offline helper tooling

Drop the offline ZED helper implementations that were moved into zed-offline-tools.\n\nThis removes the standalone conversion binaries, batch/index/inspection scripts, related configs and tests, and the tool-specific support code that no longer belongs in cvmmap-streamer.\n\nThe build files and docs are updated to point at the standalone repo while keeping the streamer runtime surface intact.
This commit is contained in:
2026-04-13 07:50:41 +00:00
parent 30cd956c5c
commit ddea6b0e3d
28 changed files with 16 additions and 12881 deletions
-362
View File
@@ -1,362 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
import re
import click
import zed_batch_svo_to_mcap as batch
BUNDLE_TOPIC = "/bundle"
CAMERA_PREFIX = "/camera/"
NAMESPACED_TOPIC_PATTERN = re.compile(r"^/([^/]+)/([^/]+)$")
SINGLE_TOPIC_SCHEMA_NAMES = {
"/camera/video": "foxglove.CompressedVideo",
"/camera/depth": "cvmmap_streamer.DepthMap",
"/camera/calibration": "foxglove.CameraCalibration",
"/camera/depth_calibration": "foxglove.CameraCalibration",
"/camera/pose": "foxglove.PoseInFrame",
}
@dataclass(slots=True)
class CameraSummary:
video_messages: int = 0
depth_messages: int = 0
pose_messages: int = 0
calibration_messages: int = 0
depth_calibration_messages: int = 0
body_messages: int = 0
present_members: int = 0
corrupted_gap_members: int = 0
unknown_members: int = 0
@dataclass(slots=True)
class McapSummary:
path: Path
layout: str = "unknown"
validation_status: str = "invalid"
validation_reason: str = ""
camera_labels: tuple[str, ...] = ()
bundle_count: int = 0
policy_counts: Counter[str] = field(default_factory=Counter)
camera_stats: dict[str, CameraSummary] = field(default_factory=dict)
schema_mismatches: list[str] = field(default_factory=list)
def iter_mcap_paths(inputs: tuple[Path, ...], recursive: bool) -> list[Path]:
discovered: list[Path] = []
for input_path in inputs:
resolved = input_path.expanduser().resolve()
if resolved.is_file():
discovered.append(resolved)
continue
if resolved.is_dir():
pattern = "*.mcap" if not recursive else "**/*.mcap"
discovered.extend(sorted(resolved.glob(pattern)))
continue
raise click.ClickException(f"path does not exist: {resolved}")
return sorted(dict.fromkeys(discovered))
def policy_name_from_message(bundle_message: object) -> str:
descriptor = bundle_message.DESCRIPTOR.enum_types_by_name.get("BundlePolicy")
if descriptor is None:
return str(bundle_message.policy)
value = descriptor.values_by_number.get(bundle_message.policy)
return value.name if value is not None else str(bundle_message.policy)
def status_name_from_member(member: object, present_value: int | None) -> str:
if present_value is None:
return "PRESENT" if member.HasField("timestamp") else "UNKNOWN"
field_descriptor = member.DESCRIPTOR.fields_by_name.get("status")
descriptor = field_descriptor.enum_type if field_descriptor is not None else None
if descriptor is None:
return "PRESENT" if member.status == present_value else "UNKNOWN"
value = descriptor.values_by_number.get(member.status)
return value.name if value is not None else str(member.status)
def record_single_camera_topic(
summary: McapSummary,
topic: str,
schema_name: str | None,
) -> None:
stats = summary.camera_stats.setdefault("camera", CameraSummary())
if topic == "/camera/video":
stats.video_messages += 1
elif topic == "/camera/depth":
stats.depth_messages += 1
elif topic == "/camera/pose":
stats.pose_messages += 1
elif topic == "/camera/calibration":
stats.calibration_messages += 1
elif topic == "/camera/depth_calibration":
stats.depth_calibration_messages += 1
elif topic == "/camera/body":
stats.body_messages += 1
expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic)
if expected_schema is not None and schema_name != expected_schema:
summary.schema_mismatches.append(
f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'"
)
def probe_single_camera_output(path: Path) -> batch.OutputProbeResult:
base_probe = batch.probe_output(path, ("camera",), layout="single-camera", bundle_topic=None)
if base_probe.status != "valid":
return base_probe
reader_module = batch.load_mcap_reader()
stats = CameraSummary()
schema_mismatches: list[str] = []
try:
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, _message in reader.iter_messages():
topic = channel.topic
schema_name = schema.name if schema is not None else None
if topic == "/camera/video":
stats.video_messages += 1
elif topic == "/camera/depth":
stats.depth_messages += 1
elif topic == "/camera/pose":
stats.pose_messages += 1
elif topic == "/camera/calibration":
stats.calibration_messages += 1
elif topic == "/camera/depth_calibration":
stats.depth_calibration_messages += 1
elif topic == "/camera/body":
stats.body_messages += 1
expected_schema = SINGLE_TOPIC_SCHEMA_NAMES.get(topic)
if expected_schema is not None and schema_name != expected_schema:
schema_mismatches.append(
f"{topic}: expected schema '{expected_schema}', got '{schema_name or 'none'}'"
)
except Exception as error: # noqa: BLE001
return batch.OutputProbeResult(output_path=path, status="invalid", reason=str(error))
if schema_mismatches:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason=schema_mismatches[0],
)
if stats.video_messages == 0:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason="single-camera MCAP has no /camera/video messages",
)
if stats.depth_messages == 0:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason="single-camera MCAP has no /camera/depth messages",
)
if stats.video_messages != stats.depth_messages:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason=(
"single-camera video/depth count mismatch: "
f"video_messages={stats.video_messages} depth_messages={stats.depth_messages}"
),
)
if stats.calibration_messages != 1:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason=(
"single-camera calibration count mismatch: "
f"/camera/calibration={stats.calibration_messages}"
),
)
if stats.depth_calibration_messages not in (0, 1):
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason=(
"single-camera depth calibration count mismatch: "
f"/camera/depth_calibration={stats.depth_calibration_messages}"
),
)
if stats.pose_messages > stats.video_messages:
return batch.OutputProbeResult(
output_path=path,
status="invalid",
reason=(
"single-camera pose count exceeds video count: "
f"pose_messages={stats.pose_messages} video_messages={stats.video_messages}"
),
)
return batch.OutputProbeResult(output_path=path, status="valid")
def summarize_mcap(path: Path) -> McapSummary:
reader_module = batch.load_mcap_reader()
summary = McapSummary(path=path)
camera_labels: set[str] = set()
saw_single_camera_topic = False
saw_namespaced_camera_topic = False
saw_bundle_manifest = False
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
topic = channel.topic
schema_name = schema.name if schema is not None else None
if topic == BUNDLE_TOPIC:
summary.layout = "bundled"
saw_bundle_manifest = True
if schema is None or schema.name != "cvmmap_streamer.BundleManifest":
summary.validation_status = "invalid"
summary.validation_reason = f"bundle topic '{BUNDLE_TOPIC}' is missing the BundleManifest schema"
continue
bundle_class, present_value = batch.load_bundle_manifest_type(schema.data)
bundle = bundle_class()
bundle.ParseFromString(message.data)
summary.bundle_count += 1
summary.policy_counts[policy_name_from_message(bundle)] += 1
for member in bundle.members:
label = str(member.camera_label)
camera_labels.add(label)
stats = summary.camera_stats.setdefault(label, CameraSummary())
status_name = status_name_from_member(member, present_value)
if status_name == "BUNDLE_MEMBER_STATUS_PRESENT" or status_name == "PRESENT":
stats.present_members += 1
elif status_name == "BUNDLE_MEMBER_STATUS_CORRUPTED_GAP":
stats.corrupted_gap_members += 1
else:
stats.unknown_members += 1
continue
if topic.startswith(CAMERA_PREFIX):
saw_single_camera_topic = True
if summary.layout == "unknown":
summary.layout = "single-camera"
record_single_camera_topic(summary, topic, schema_name)
continue
match = NAMESPACED_TOPIC_PATTERN.match(topic)
if not match:
continue
label, stream_kind = match.groups()
if label == "camera":
continue
saw_namespaced_camera_topic = True
if summary.layout == "unknown":
summary.layout = "copy"
camera_labels.add(label)
stats = summary.camera_stats.setdefault(label, CameraSummary())
if stream_kind == "video":
stats.video_messages += 1
elif stream_kind == "depth":
stats.depth_messages += 1
elif stream_kind == "pose":
stats.pose_messages += 1
elif stream_kind == "calibration":
stats.calibration_messages += 1
elif stream_kind == "depth_calibration":
stats.depth_calibration_messages += 1
elif stream_kind == "body":
stats.body_messages += 1
if saw_single_camera_topic and saw_namespaced_camera_topic:
summary.layout = "mixed"
summary.validation_status = "invalid"
summary.validation_reason = "MCAP mixes single-camera and multi-camera topic layouts"
return summary
if saw_namespaced_camera_topic and not saw_bundle_manifest and summary.layout == "bundled":
summary.layout = "copy"
if summary.layout == "single-camera":
summary.camera_labels = ("camera",)
probe = probe_single_camera_output(path)
summary.validation_status = probe.status
summary.validation_reason = probe.reason
if summary.schema_mismatches and summary.validation_status == "valid":
summary.validation_status = "invalid"
summary.validation_reason = summary.schema_mismatches[0]
return summary
summary.camera_labels = tuple(sorted(camera_labels))
if summary.camera_labels:
probe = batch.probe_output(
path,
summary.camera_labels,
layout=summary.layout,
bundle_topic=BUNDLE_TOPIC if summary.layout == "bundled" else None,
)
summary.validation_status = probe.status
summary.validation_reason = probe.reason
else:
summary.validation_status = "invalid"
summary.validation_reason = "could not infer a supported MCAP layout from topics"
return summary
def print_summary(summary: McapSummary) -> None:
status_text = summary.validation_status
layout_text = summary.layout
cameras_text = ",".join(summary.camera_labels) if summary.camera_labels else "-"
policy_text = ",".join(
f"{policy}={count}"
for policy, count in sorted(summary.policy_counts.items())
) or "-"
click.echo(
f"{status_text}: {summary.path} [{layout_text}] bundles={summary.bundle_count} "
f"cameras={cameras_text} policies={policy_text}"
)
for label in summary.camera_labels:
stats = summary.camera_stats[label]
click.echo(
" "
f"{label}: video={stats.video_messages} depth={stats.depth_messages} pose={stats.pose_messages} "
f"calibration={stats.calibration_messages} depth_calibration={stats.depth_calibration_messages} "
f"body={stats.body_messages} present={stats.present_members} "
f"corrupted_gap={stats.corrupted_gap_members} unknown={stats.unknown_members}"
)
if summary.validation_reason:
click.echo(f" reason: {summary.validation_reason}")
@click.command()
@click.argument("paths", nargs=-1, type=click.Path(path_type=Path))
@click.option("--recursive", is_flag=True, help="Recursively discover *.mcap files under directory inputs.")
def main(paths: tuple[Path, ...], recursive: bool) -> None:
"""Summarize and validate legacy single-camera, bundled, or copy-layout MCAP files."""
if not paths:
raise click.ClickException("provide at least one MCAP file or directory")
mcap_paths = iter_mcap_paths(paths, recursive=recursive)
if not mcap_paths:
raise click.ClickException("no .mcap files matched the provided inputs")
invalid_count = 0
for path in mcap_paths:
summary = summarize_mcap(path)
print_summary(summary)
if summary.validation_status != "valid":
invalid_count += 1
if invalid_count:
raise SystemExit(1)
if __name__ == "__main__":
main()
-400
View File
@@ -1,400 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO
import click
import cv2
import numpy as np
import mcap_rgbd_viewer as viewer
@dataclass(slots=True, frozen=True)
class CameraCalibration:
width: int
height: int
distortion_model: str
distortion: tuple[float, float, float, float, float]
intrinsic_matrix: tuple[float, float, float, float, float, float, float, float, float]
rectification_matrix: tuple[float, float, float, float, float, float, float, float, float]
projection_matrix: tuple[float, float, float, float, float, float, float, float, float, float, float, float]
@property
def fx(self) -> float:
return self.intrinsic_matrix[0]
@property
def fy(self) -> float:
return self.intrinsic_matrix[4]
@property
def cx(self) -> float:
return self.intrinsic_matrix[2]
@property
def cy(self) -> float:
return self.intrinsic_matrix[5]
@dataclass(slots=True, frozen=True)
class CalibrationPair:
video: CameraCalibration
depth: CameraCalibration
@dataclass(slots=True, frozen=True)
class AffineMapping:
scale_x: float
scale_y: float
offset_x: float
offset_y: float
def matrix(self) -> np.ndarray:
return np.array(
[
[self.scale_x, 0.0, self.offset_x],
[0.0, self.scale_y, self.offset_y],
],
dtype=np.float32,
)
def select_camera_label(layout_info: viewer.McapLayoutInfo, camera_label: str | None) -> str:
if camera_label is None:
return layout_info.camera_labels[0]
if camera_label not in layout_info.camera_labels:
available = ", ".join(layout_info.camera_labels)
raise click.ClickException(f"camera label '{camera_label}' not found; available: {available}")
return camera_label
def load_calibration(path: Path, topic: str) -> CameraCalibration:
reader_module = viewer.load_mcap_reader()
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
if channel.topic != topic:
continue
if schema is None or schema.name != "foxglove.CameraCalibration":
raise click.ClickException(f"unexpected schema on {topic}: {schema.name if schema else 'none'}")
message_class = viewer.load_message_class(schema.data, "foxglove.CameraCalibration")
payload = message_class()
payload.ParseFromString(message.data)
return CameraCalibration(
width=int(payload.width),
height=int(payload.height),
distortion_model=str(payload.distortion_model),
distortion=tuple(float(value) for value in payload.D[:5]),
intrinsic_matrix=tuple(float(value) for value in payload.K[:9]),
rectification_matrix=tuple(float(value) for value in payload.R[:9]),
projection_matrix=tuple(float(value) for value in payload.P[:12]),
)
raise click.ClickException(f"missing calibration topic {topic} in {path}")
def load_calibration_pair(path: Path, layout_info: viewer.McapLayoutInfo, camera_label: str) -> CalibrationPair:
video_topic = viewer.topic_for(layout_info.layout, camera_label, "calibration")
depth_topic = viewer.topic_for(layout_info.layout, camera_label, "depth_calibration")
return CalibrationPair(
video=load_calibration(path, video_topic),
depth=load_calibration(path, depth_topic),
)
def mapping_from_depth_to_rgb(pair: CalibrationPair) -> AffineMapping:
scale_x = pair.video.fx / pair.depth.fx
scale_y = pair.video.fy / pair.depth.fy
offset_x = pair.video.cx - (scale_x * pair.depth.cx)
offset_y = pair.video.cy - (scale_y * pair.depth.cy)
return AffineMapping(scale_x=scale_x, scale_y=scale_y, offset_x=offset_x, offset_y=offset_y)
def mapping_from_rgb_to_depth(pair: CalibrationPair) -> AffineMapping:
scale_x = pair.depth.fx / pair.video.fx
scale_y = pair.depth.fy / pair.video.fy
offset_x = pair.depth.cx - (scale_x * pair.video.cx)
offset_y = pair.depth.cy - (scale_y * pair.video.cy)
return AffineMapping(scale_x=scale_x, scale_y=scale_y, offset_x=offset_x, offset_y=offset_y)
def describe_mapping(pair: CalibrationPair) -> str:
depth_to_rgb = mapping_from_depth_to_rgb(pair)
rgb_to_depth = mapping_from_rgb_to_depth(pair)
anisotropic = abs(depth_to_rgb.scale_x - depth_to_rgb.scale_y) > 1e-6
has_offset = abs(depth_to_rgb.offset_x) > 1e-3 or abs(depth_to_rgb.offset_y) > 1e-3
shape = "anisotropic stretch" if anisotropic else "uniform scale"
if has_offset:
shape += " with offset"
else:
shape += " with zero offset"
return (
f"mapping type: {shape}\n"
f"depth->rgb: u_rgb = {depth_to_rgb.scale_x:.9f} * u_depth + {depth_to_rgb.offset_x:.9f}\n"
f"depth->rgb: v_rgb = {depth_to_rgb.scale_y:.9f} * v_depth + {depth_to_rgb.offset_y:.9f}\n"
f"rgb->depth: u_depth = {rgb_to_depth.scale_x:.9f} * u_rgb + {rgb_to_depth.offset_x:.9f}\n"
f"rgb->depth: v_depth = {rgb_to_depth.scale_y:.9f} * v_rgb + {rgb_to_depth.offset_y:.9f}"
)
def is_identity_rectification(calibration: CameraCalibration) -> bool:
expected = (1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)
return max(abs(value - target) for value, target in zip(calibration.rectification_matrix, expected, strict=True)) < 1e-6
def has_zero_distortion(calibration: CameraCalibration) -> bool:
return max(abs(value) for value in calibration.distortion) < 1e-9
def depth_pixel_to_rgb(depth_u: float, depth_v: float, pair: CalibrationPair) -> tuple[float, float]:
mapping = mapping_from_depth_to_rgb(pair)
return (
(mapping.scale_x * depth_u) + mapping.offset_x,
(mapping.scale_y * depth_v) + mapping.offset_y,
)
def rgb_pixel_to_depth(rgb_u: float, rgb_v: float, pair: CalibrationPair) -> tuple[float, float]:
mapping = mapping_from_rgb_to_depth(pair)
return (
(mapping.scale_x * rgb_u) + mapping.offset_x,
(mapping.scale_y * rgb_v) + mapping.offset_y,
)
def align_depth_to_rgb(
depth_image: np.ndarray,
pair: CalibrationPair,
*,
interpolation: int = cv2.INTER_NEAREST,
) -> np.ndarray:
mapping = mapping_from_depth_to_rgb(pair)
return cv2.warpAffine(
depth_image,
mapping.matrix(),
(pair.video.width, pair.video.height),
flags=interpolation,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0,
)
def align_rgb_to_depth(
rgb_image: np.ndarray,
pair: CalibrationPair,
*,
interpolation: int = cv2.INTER_LINEAR,
) -> np.ndarray:
mapping = mapping_from_rgb_to_depth(pair)
return cv2.warpAffine(
rgb_image,
mapping.matrix(),
(pair.depth.width, pair.depth.height),
flags=interpolation,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0,
)
def load_depth_array(state: viewer.CameraViewState, depth_index: int, depth_cache_stream: BinaryIO | None = None) -> np.ndarray:
try:
import rvl
except ModuleNotFoundError as error:
raise click.ClickException(
"image export needs the optional rvl-impl binding; run `uv sync --extra viewer`"
) from error
ref = state.depth_frames[depth_index]
if depth_cache_stream is None:
with state.depth_cache_path.open("rb") as stream:
stream.seek(ref.offset)
payload = stream.read(ref.length)
else:
depth_cache_stream.seek(ref.offset)
payload = depth_cache_stream.read(ref.length)
if ref.encoding_name == "RVL_U16_LOSSLESS":
depth = rvl.decompress_u16(payload).reshape(ref.height, ref.width)
return depth.astype(np.float32)
if ref.encoding_name == "RVL_F32":
return rvl.decompress_f32(payload).reshape(ref.height, ref.width).astype(np.float32)
raise click.ClickException(f"unsupported depth encoding '{ref.encoding_name}'")
def resolve_present_slot(state: viewer.CameraViewState, frame_index: int) -> tuple[int, viewer.BundleSlot]:
if not state.slots:
raise click.ClickException("MCAP does not contain any viewable RGB+depth pairs")
clamped = max(0, min(frame_index, len(state.slots) - 1))
slot = state.slots[clamped]
if slot.video_index is not None and slot.depth_index is not None:
return clamped, slot
for delta in range(1, len(state.slots)):
left = clamped - delta
if left >= 0:
candidate = state.slots[left]
if candidate.video_index is not None and candidate.depth_index is not None:
return left, candidate
right = clamped + delta
if right < len(state.slots):
candidate = state.slots[right]
if candidate.video_index is not None and candidate.depth_index is not None:
return right, candidate
raise click.ClickException("could not find a present RGB+depth slot")
def colorize_depth(depth_m: np.ndarray, palette_name: str) -> np.ndarray:
valid = np.isfinite(depth_m) & (depth_m > 0.0)
normalized = np.zeros(depth_m.shape, dtype=np.uint8)
if valid.any():
lo = float(np.percentile(depth_m[valid], 5.0))
hi = float(np.percentile(depth_m[valid], 95.0))
span = max(hi - lo, 1e-6)
scaled = np.clip((depth_m - lo) / span, 0.0, 1.0)
normalized[valid] = np.round((1.0 - scaled[valid]) * 255.0).astype(np.uint8)
colormap = viewer.DEPTH_PALETTE_TO_OPENCV[palette_name]
if colormap is None:
colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR)
else:
colored = cv2.applyColorMap(normalized, colormap)
colored[~valid] = 0
return colored
def export_example_images(
path: Path,
*,
layout_info: viewer.McapLayoutInfo,
camera_label: str,
pair: CalibrationPair,
frame_index: int,
ffmpeg_bin: str,
output_dir: Path,
palette_name: str,
) -> None:
state = viewer.read_camera_state(
path,
layout_info=layout_info,
camera_label=camera_label,
ffmpeg_bin=ffmpeg_bin,
preview_width=pair.video.width,
)
try:
resolved_index, slot = resolve_present_slot(state, frame_index)
capture = cv2.VideoCapture(str(state.preview_video_path))
capture.set(cv2.CAP_PROP_POS_FRAMES, float(slot.video_index))
ok, rgb_bgr = capture.read()
capture.release()
if not ok or rgb_bgr is None:
raise click.ClickException(f"could not decode RGB frame {slot.video_index}")
depth_native = load_depth_array(state, slot.depth_index) / 1000.0
depth_aligned = align_depth_to_rgb(depth_native, pair, interpolation=cv2.INTER_NEAREST)
rgb_aligned = align_rgb_to_depth(rgb_bgr, pair, interpolation=cv2.INTER_LINEAR)
output_dir.mkdir(parents=True, exist_ok=True)
rgb_path = output_dir / "rgb_frame.png"
depth_native_path = output_dir / "depth_native_colorized.png"
depth_aligned_path = output_dir / "depth_aligned_to_rgb_colorized.png"
overlay_path = output_dir / "depth_overlay_on_rgb.png"
rgb_to_depth_path = output_dir / "rgb_aligned_to_depth.png"
depth_native_color = colorize_depth(depth_native, palette_name)
depth_aligned_color = colorize_depth(depth_aligned, palette_name)
overlay = cv2.addWeighted(rgb_bgr, 0.72, depth_aligned_color, 0.28, 0.0)
cv2.imwrite(str(rgb_path), rgb_bgr)
cv2.imwrite(str(depth_native_path), depth_native_color)
cv2.imwrite(str(depth_aligned_path), depth_aligned_color)
cv2.imwrite(str(overlay_path), overlay)
cv2.imwrite(str(rgb_to_depth_path), rgb_aligned)
click.echo(f"exported slot index: {resolved_index}")
click.echo(f"rgb frame: {rgb_path}")
click.echo(f"native depth: {depth_native_path}")
click.echo(f"depth aligned to rgb: {depth_aligned_path}")
click.echo(f"depth overlay on rgb: {overlay_path}")
click.echo(f"rgb aligned to depth: {rgb_to_depth_path}")
finally:
state.close()
@click.command()
@click.argument("mcap_path", type=click.Path(path_type=Path, exists=True, dir_okay=False))
@click.option("--camera-label", type=str, help="Camera label to inspect; defaults to the first camera in the MCAP.")
@click.option("--frame-index", type=int, default=0, show_default=True, help="Frame or bundle index used for example image export.")
@click.option("--output-dir", type=click.Path(path_type=Path, file_okay=False), help="When set, export an aligned depth example and overlay PNGs here.")
@click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to decode MCAP video for the example export.")
@click.option(
"--depth-palette",
type=click.Choice(tuple(viewer.DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False),
default="Turbo",
show_default=True,
help="Depth palette used for exported example PNGs.",
)
def main(
mcap_path: Path,
camera_label: str | None,
frame_index: int,
output_dir: Path | None,
ffmpeg_bin: str,
depth_palette: str,
) -> None:
"""Explain and demonstrate how depth/rgb alignment works for an exported MCAP."""
layout_info = viewer.infer_layout(mcap_path)
selected_camera = select_camera_label(layout_info, camera_label)
pair = load_calibration_pair(mcap_path, layout_info, selected_camera)
click.echo(f"path: {mcap_path}")
click.echo(f"layout: {layout_info.layout}")
click.echo(f"camera: {selected_camera}")
click.echo(f"video calibration: {pair.video.width}x{pair.video.height}")
click.echo(f"depth calibration: {pair.depth.width}x{pair.depth.height}")
click.echo(
"video intrinsics: "
f"fx={pair.video.fx:.6f} fy={pair.video.fy:.6f} cx={pair.video.cx:.6f} cy={pair.video.cy:.6f}"
)
click.echo(
"depth intrinsics: "
f"fx={pair.depth.fx:.6f} fy={pair.depth.fy:.6f} cx={pair.depth.cx:.6f} cy={pair.depth.cy:.6f}"
)
click.echo(
"zero distortion / identity rectification: "
f"video={has_zero_distortion(pair.video) and is_identity_rectification(pair.video)} "
f"depth={has_zero_distortion(pair.depth) and is_identity_rectification(pair.depth)}"
)
click.echo(describe_mapping(pair))
sample_depth_u = pair.depth.width * 0.5
sample_depth_v = pair.depth.height * 0.5
mapped_rgb_u, mapped_rgb_v = depth_pixel_to_rgb(sample_depth_u, sample_depth_v, pair)
click.echo(
"sample center mapping: "
f"depth({sample_depth_u:.3f}, {sample_depth_v:.3f}) -> rgb({mapped_rgb_u:.3f}, {mapped_rgb_v:.3f})"
)
sample_rgb_u = pair.video.width * 0.5
sample_rgb_v = pair.video.height * 0.5
mapped_depth_u, mapped_depth_v = rgb_pixel_to_depth(sample_rgb_u, sample_rgb_v, pair)
click.echo(
"sample inverse mapping: "
f"rgb({sample_rgb_u:.3f}, {sample_rgb_v:.3f}) -> depth({mapped_depth_u:.3f}, {mapped_depth_v:.3f})"
)
if output_dir is not None:
export_example_images(
mcap_path,
layout_info=layout_info,
camera_label=selected_camera,
pair=pair,
frame_index=frame_index,
ffmpeg_bin=ffmpeg_bin,
output_dir=output_dir,
palette_name=depth_palette,
)
if __name__ == "__main__":
main()
-630
View File
@@ -1,630 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
from dataclasses import asdict, dataclass, field
import json
from pathlib import Path
import subprocess
import tempfile
import click
import cv2
import numpy as np
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory, timestamp_pb2
import mcap_bundle_validator as bundle_validator
import zed_batch_svo_to_mcap as batch
BUNDLE_TOPIC = "/bundle"
DEPTH_PALETTE_TO_OPENCV = {
"Gray": None,
"Turbo": cv2.COLORMAP_TURBO,
"Inferno": cv2.COLORMAP_INFERNO,
"Plasma": cv2.COLORMAP_PLASMA,
"Viridis": cv2.COLORMAP_VIRIDIS,
"Cividis": cv2.COLORMAP_CIVIDIS,
"Magma": cv2.COLORMAP_MAGMA,
"Parula": cv2.COLORMAP_PARULA,
}
VIDEO_INPUT_FORMATS = {"h264": "h264", "h265": "hevc"}
_MESSAGE_CLASS_CACHE: dict[tuple[bytes, str], object] = {}
@dataclass(slots=True)
class TimestampRange:
start_ns: int | None = None
end_ns: int | None = None
def update(self, timestamp_ns: int) -> None:
if self.start_ns is None or timestamp_ns < self.start_ns:
self.start_ns = timestamp_ns
if self.end_ns is None or timestamp_ns > self.end_ns:
self.end_ns = timestamp_ns
@dataclass(slots=True)
class CameraRanges:
video: TimestampRange = field(default_factory=TimestampRange)
depth: TimestampRange = field(default_factory=TimestampRange)
@dataclass(slots=True)
class RecipeSummary:
base: bundle_validator.McapSummary
bundle_timestamps: TimestampRange = field(default_factory=TimestampRange)
camera_ranges: dict[str, CameraRanges] = field(default_factory=dict)
@dataclass(slots=True)
class VideoSample:
timestamp_ns: int
format_name: str
stream_index: int
@dataclass(slots=True)
class DepthSample:
timestamp_ns: int
payload: bytes
stream_index: int
width: int
height: int
encoding_name: str
source_unit_name: str
storage_unit_name: str
@dataclass(slots=True)
class BundleMemberSample:
bundle_index: int
bundle_timestamp_ns: int
member_timestamp_ns: int | None
status_name: str
corrupted_frames_skipped: int
member_stream_index: int
def load_message_class(schema_data: bytes, message_type_name: str):
cache_key = (schema_data, message_type_name)
cached = _MESSAGE_CLASS_CACHE.get(cache_key)
if cached is not None:
return cached
descriptor_set = descriptor_pb2.FileDescriptorSet()
descriptor_set.ParseFromString(schema_data)
pool = descriptor_pool.DescriptorPool()
has_embedded_timestamp = any(
file_descriptor.name == "google/protobuf/timestamp.proto"
for file_descriptor in descriptor_set.file
)
if has_embedded_timestamp:
for file_descriptor in descriptor_set.file:
if file_descriptor.name == "google/protobuf/timestamp.proto":
pool.Add(file_descriptor)
break
else:
pool.AddSerializedFile(timestamp_pb2.DESCRIPTOR.serialized_pb)
for file_descriptor in descriptor_set.file:
if file_descriptor.name == "google/protobuf/timestamp.proto":
continue
pool.Add(file_descriptor)
message_descriptor = pool.FindMessageTypeByName(message_type_name)
message_class = message_factory.GetMessageClass(message_descriptor)
_MESSAGE_CLASS_CACHE[cache_key] = message_class
return message_class
def parse_timestamp_ns(timestamp_message: object, fallback_log_time_ns: int) -> int:
seconds = int(getattr(timestamp_message, "seconds", 0))
nanos = int(getattr(timestamp_message, "nanos", 0))
if seconds == 0 and nanos == 0:
return fallback_log_time_ns
return seconds * 1_000_000_000 + nanos
def format_timestamp_ns(timestamp_ns: int | None) -> str:
if timestamp_ns is None:
return "-"
seconds, nanos = divmod(timestamp_ns, 1_000_000_000)
return f"{seconds}.{nanos:09d}"
def format_range(timestamp_range: TimestampRange) -> str:
return f"{format_timestamp_ns(timestamp_range.start_ns)} .. {format_timestamp_ns(timestamp_range.end_ns)}"
def enum_name(message: object, field_name: str) -> str:
field_descriptor = message.DESCRIPTOR.fields_by_name[field_name]
value = int(getattr(message, field_name))
resolved = field_descriptor.enum_type.values_by_number.get(value)
return resolved.name if resolved is not None else str(value)
def is_present_status(status_name: str) -> bool:
return status_name in {"PRESENT", "BUNDLE_MEMBER_STATUS_PRESENT"}
def topic_for(layout: str, camera_label: str, kind: str) -> str:
if layout == "single-camera":
return f"/camera/{kind}"
if layout not in {"copy", "bundled"}:
raise click.ClickException(f"unsupported layout '{layout}'")
return f"/{camera_label}/{kind}"
def selected_camera_label(base_summary: bundle_validator.McapSummary, camera_label: str | None) -> str:
if camera_label is None:
return base_summary.camera_labels[0]
if camera_label not in base_summary.camera_labels:
available = ", ".join(base_summary.camera_labels)
raise click.ClickException(f"camera label '{camera_label}' not found. available: {available}")
return camera_label
def ensure_supported_layout(base_summary: bundle_validator.McapSummary) -> None:
if base_summary.layout not in {"single-camera", "copy", "bundled"}:
reason = base_summary.validation_reason or "unsupported MCAP layout"
raise click.ClickException(reason)
def summarize_mcap(path: Path) -> RecipeSummary:
base_summary = bundle_validator.summarize_mcap(path)
camera_ranges = {
label: CameraRanges()
for label in (base_summary.camera_labels or ("camera",))
}
bundle_timestamps = TimestampRange()
reader_module = batch.load_mcap_reader()
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
topic = channel.topic
if topic == BUNDLE_TOPIC and schema is not None and schema.name == "cvmmap_streamer.BundleManifest":
bundle_class, _present_value = batch.load_bundle_manifest_type(schema.data)
bundle_message = bundle_class()
bundle_message.ParseFromString(message.data)
bundle_timestamps.update(parse_timestamp_ns(bundle_message.timestamp, int(message.log_time)))
continue
if topic.endswith("/video"):
if topic == "/camera/video":
label = "camera"
else:
label = topic.removeprefix("/").removesuffix("/video")
if schema is None or schema.name != "foxglove.CompressedVideo" or label not in camera_ranges:
continue
message_class = load_message_class(schema.data, "foxglove.CompressedVideo")
payload = message_class()
payload.ParseFromString(message.data)
camera_ranges[label].video.update(parse_timestamp_ns(payload.timestamp, int(message.log_time)))
continue
if topic.endswith("/depth"):
if topic == "/camera/depth":
label = "camera"
else:
label = topic.removeprefix("/").removesuffix("/depth")
if schema is None or schema.name != "cvmmap_streamer.DepthMap" or label not in camera_ranges:
continue
message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap")
payload = message_class()
payload.ParseFromString(message.data)
camera_ranges[label].depth.update(parse_timestamp_ns(payload.timestamp, int(message.log_time)))
return RecipeSummary(
base=base_summary,
bundle_timestamps=bundle_timestamps,
camera_ranges=camera_ranges,
)
def print_summary(summary: RecipeSummary) -> None:
base = summary.base
click.echo(f"path: {base.path}")
click.echo(f"validation: {base.validation_status}")
if base.validation_reason:
click.echo(f"validation reason: {base.validation_reason}")
click.echo(f"layout: {base.layout}")
click.echo(f"camera labels: {', '.join(base.camera_labels) if base.camera_labels else '-'}")
if base.layout == "bundled":
click.echo(f"bundle count: {base.bundle_count}")
click.echo(f"bundle timestamp range: {format_range(summary.bundle_timestamps)}")
policy_text = ", ".join(
f"{policy}={count}"
for policy, count in sorted(base.policy_counts.items())
) or "-"
click.echo(f"bundle policies: {policy_text}")
for label in base.camera_labels:
stats = base.camera_stats[label]
ranges = summary.camera_ranges[label]
click.echo(f"camera: {label}")
click.echo(f" video messages: {stats.video_messages}")
click.echo(f" video timestamp range: {format_range(ranges.video)}")
click.echo(f" depth messages: {stats.depth_messages}")
click.echo(f" depth timestamp range: {format_range(ranges.depth)}")
click.echo(f" pose messages: {stats.pose_messages}")
click.echo(f" calibration messages: {stats.calibration_messages}")
click.echo(f" depth calibration messages: {stats.depth_calibration_messages}")
click.echo(f" body messages: {stats.body_messages}")
if base.layout == "bundled":
click.echo(f" present bundle members: {stats.present_members}")
click.echo(f" corrupted gap members: {stats.corrupted_gap_members}")
click.echo(f" unknown bundle members: {stats.unknown_members}")
def decode_depth_array(depth_sample: DepthSample) -> np.ndarray:
try:
import rvl
except ModuleNotFoundError as error:
raise click.ClickException(
"depth export needs the optional rvl-impl binding; run `uv sync --extra viewer`"
) from error
if depth_sample.encoding_name == "RVL_U16_LOSSLESS":
depth = rvl.decompress_u16(depth_sample.payload).astype(np.float32)
if (
depth_sample.storage_unit_name == "STORAGE_UNIT_MILLIMETER"
or depth_sample.source_unit_name == "DEPTH_UNIT_MILLIMETER"
):
return depth / 1000.0
return depth
if depth_sample.encoding_name == "RVL_F32":
return rvl.decompress_f32(depth_sample.payload).astype(np.float32)
raise click.ClickException(f"unsupported depth encoding '{depth_sample.encoding_name}'")
def colorize_depth(
depth_m: np.ndarray,
*,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
) -> np.ndarray:
valid = np.isfinite(depth_m) & (depth_m > 0.0)
span = max(depth_max_m - depth_min_m, 1e-6)
clipped = np.clip((depth_m - depth_min_m) / span, 0.0, 1.0)
normalized = np.zeros(depth_m.shape, dtype=np.uint8)
normalized[valid] = np.round((1.0 - clipped[valid]) * 255.0).astype(np.uint8)
colormap = DEPTH_PALETTE_TO_OPENCV[depth_palette_name]
if colormap is None:
colored = cv2.cvtColor(normalized, cv2.COLOR_GRAY2BGR)
else:
colored = cv2.applyColorMap(normalized, colormap)
colored[~valid] = 0
return colored
def export_rgb_frame(
*,
ffmpeg_bin: str,
raw_video_path: Path,
video_format: str,
frame_index: int,
output_path: Path,
) -> None:
input_format = VIDEO_INPUT_FORMATS.get(video_format)
if input_format is None:
raise click.ClickException(f"unsupported video format '{video_format}'")
command = [
ffmpeg_bin,
"-hide_banner",
"-loglevel",
"error",
"-y",
"-fflags",
"+genpts",
"-f",
input_format,
"-i",
str(raw_video_path),
"-vf",
f"select=eq(n\\,{frame_index})",
"-frames:v",
"1",
str(output_path),
]
try:
completed = subprocess.run(command, check=False, capture_output=True, text=True)
except FileNotFoundError as error:
raise click.ClickException(f"ffmpeg binary not found: {ffmpeg_bin}") from error
if completed.returncode != 0:
reason = completed.stderr.strip() or completed.stdout.strip() or "ffmpeg failed to export the RGB frame"
raise click.ClickException(reason)
if not output_path.is_file():
raise click.ClickException(f"ffmpeg did not write {output_path}")
def collect_sample_data(
path: Path,
*,
layout: str,
camera_label: str,
sample_index: int,
) -> tuple[VideoSample, DepthSample, BundleMemberSample | None, bytes]:
reader_module = batch.load_mcap_reader()
video_topic = topic_for(layout, camera_label, "video")
depth_topic = topic_for(layout, camera_label, "depth")
video_sample: VideoSample | None = None
depth_sample: DepthSample | None = None
bundle_sample: BundleMemberSample | None = None
video_index = 0
depth_index = 0
bundle_member_index = 0
video_format: str | None = None
with tempfile.TemporaryDirectory(prefix="mcap_rgbd_example_") as temp_dir_name:
raw_video_path = Path(temp_dir_name) / "stream.bin"
with raw_video_path.open("wb") as raw_video_stream:
with path.open("rb") as stream:
reader = reader_module.make_reader(stream)
for schema, channel, message in reader.iter_messages():
topic = channel.topic
if layout == "bundled" and topic == BUNDLE_TOPIC and bundle_sample is None:
if schema is None or schema.name != "cvmmap_streamer.BundleManifest":
continue
bundle_class, present_value = batch.load_bundle_manifest_type(schema.data)
bundle_message = bundle_class()
bundle_message.ParseFromString(message.data)
for member in bundle_message.members:
if str(member.camera_label) != camera_label:
continue
status_name = bundle_validator.status_name_from_member(member, present_value)
member_timestamp_ns = None
if member.HasField("timestamp"):
member_timestamp_ns = parse_timestamp_ns(member.timestamp, int(message.log_time))
if is_present_status(status_name):
if bundle_member_index == sample_index:
bundle_sample = BundleMemberSample(
bundle_index=int(bundle_message.bundle_index),
bundle_timestamp_ns=parse_timestamp_ns(bundle_message.timestamp, int(message.log_time)),
member_timestamp_ns=member_timestamp_ns,
status_name=status_name,
corrupted_frames_skipped=int(getattr(member, "corrupted_frames_skipped", 0)),
member_stream_index=bundle_member_index,
)
bundle_member_index += 1
break
continue
if topic == video_topic:
if schema is None or schema.name != "foxglove.CompressedVideo":
raise click.ClickException(f"unexpected schema on {video_topic}: {schema.name if schema else 'none'}")
message_class = load_message_class(schema.data, "foxglove.CompressedVideo")
payload = message_class()
payload.ParseFromString(message.data)
frame_format = str(payload.format)
if frame_format not in VIDEO_INPUT_FORMATS:
raise click.ClickException(f"unsupported video format '{frame_format}' on {video_topic}")
if video_format is None:
video_format = frame_format
elif video_format != frame_format:
raise click.ClickException(
f"inconsistent video format on {video_topic}: {video_format} then {frame_format}"
)
if video_index <= sample_index:
raw_video_stream.write(bytes(payload.data))
if video_index == sample_index:
video_sample = VideoSample(
timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)),
format_name=frame_format,
stream_index=video_index,
)
video_index += 1
continue
if topic == depth_topic:
if schema is None or schema.name != "cvmmap_streamer.DepthMap":
raise click.ClickException(f"unexpected schema on {depth_topic}: {schema.name if schema else 'none'}")
message_class = load_message_class(schema.data, "cvmmap_streamer.DepthMap")
payload = message_class()
payload.ParseFromString(message.data)
if depth_index == sample_index:
depth_sample = DepthSample(
timestamp_ns=parse_timestamp_ns(payload.timestamp, int(message.log_time)),
payload=bytes(payload.data),
stream_index=depth_index,
width=int(payload.width),
height=int(payload.height),
encoding_name=enum_name(payload, "encoding"),
source_unit_name=enum_name(payload, "source_unit"),
storage_unit_name=enum_name(payload, "storage_unit"),
)
depth_index += 1
continue
if (
video_sample is not None
and depth_sample is not None
and (layout != "bundled" or bundle_sample is not None)
):
break
raw_video_bytes = raw_video_path.read_bytes()
if video_sample is None:
raise click.ClickException(f"sample index {sample_index} exceeded available video samples")
if depth_sample is None:
raise click.ClickException(f"sample index {sample_index} exceeded available depth samples")
if layout == "bundled" and bundle_sample is None:
raise click.ClickException(
f"could not map per-camera sample index {sample_index} to a bundle member for {camera_label}"
)
return video_sample, depth_sample, bundle_sample, raw_video_bytes
def write_sample_outputs(
*,
path: Path,
layout: str,
output_dir: Path,
camera_label: str,
sample_index: int,
video_sample: VideoSample,
depth_sample: DepthSample,
bundle_sample: BundleMemberSample | None,
raw_video_bytes: bytes,
ffmpeg_bin: str,
depth_min_m: float,
depth_max_m: float,
depth_palette_name: str,
) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
rgb_output_path = output_dir / "rgb.png"
depth_output_path = output_dir / "depth.npy"
depth_preview_path = output_dir / "depth_preview.png"
metadata_path = output_dir / "sample_metadata.json"
with tempfile.TemporaryDirectory(prefix="mcap_rgbd_example_export_") as temp_dir_name:
raw_video_path = Path(temp_dir_name) / f"sample.{video_sample.format_name}"
raw_video_path.write_bytes(raw_video_bytes)
export_rgb_frame(
ffmpeg_bin=ffmpeg_bin,
raw_video_path=raw_video_path,
video_format=video_sample.format_name,
frame_index=sample_index,
output_path=rgb_output_path,
)
depth_m = decode_depth_array(depth_sample)
np.save(depth_output_path, depth_m)
depth_preview = colorize_depth(
depth_m,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=depth_palette_name,
)
if not cv2.imwrite(str(depth_preview_path), depth_preview):
raise click.ClickException(f"failed to write depth preview to {depth_preview_path}")
metadata = {
"mcap_path": str(path),
"layout": layout,
}
metadata.update(
{
"camera_label": camera_label,
"sample_index": sample_index,
"video_stream_index": video_sample.stream_index,
"video_timestamp_ns": video_sample.timestamp_ns,
"video_timestamp": format_timestamp_ns(video_sample.timestamp_ns),
"video_format": video_sample.format_name,
"depth_stream_index": depth_sample.stream_index,
"depth_timestamp_ns": depth_sample.timestamp_ns,
"depth_timestamp": format_timestamp_ns(depth_sample.timestamp_ns),
"depth_width": depth_sample.width,
"depth_height": depth_sample.height,
"depth_encoding": depth_sample.encoding_name,
"depth_source_unit": depth_sample.source_unit_name,
"depth_storage_unit": depth_sample.storage_unit_name,
"depth_palette": depth_palette_name,
"depth_min_m": depth_min_m,
"depth_max_m": depth_max_m,
"rgb_output_path": str(rgb_output_path),
"depth_output_path": str(depth_output_path),
"depth_preview_path": str(depth_preview_path),
}
)
if bundle_sample is not None:
metadata["bundle"] = asdict(bundle_sample)
metadata["bundle"]["bundle_timestamp"] = format_timestamp_ns(bundle_sample.bundle_timestamp_ns)
metadata["bundle"]["member_timestamp"] = format_timestamp_ns(bundle_sample.member_timestamp_ns)
metadata_path.write_text(json.dumps(metadata, indent=2, sort_keys=True) + "\n", encoding="utf-8")
@click.group()
def main() -> None:
"""Small MCAP RGBD example helper for bundled, copy, and legacy single-camera MCAP files."""
@main.command("summary")
@click.argument("mcap_path", type=click.Path(path_type=Path, exists=True))
def summary_command(mcap_path: Path) -> None:
"""Print a compact metadata summary for a single MCAP file."""
summary = summarize_mcap(mcap_path.resolve())
ensure_supported_layout(summary.base)
print_summary(summary)
@main.command("export-sample")
@click.argument("mcap_path", type=click.Path(path_type=Path, exists=True))
@click.option("--camera-label", help="Camera label to export. Defaults to `camera` for legacy files or the first sorted namespaced label.")
@click.option("--sample-index", default=0, show_default=True, type=click.IntRange(min=0), help="Zero-based per-camera RGB+depth sample index.")
@click.option("--output-dir", required=True, type=click.Path(path_type=Path), help="Directory to write rgb.png, depth.npy, depth_preview.png, and sample_metadata.json.")
@click.option("--ffmpeg-bin", default="ffmpeg", show_default=True, help="ffmpeg binary used to decode the selected RGB frame.")
@click.option("--depth-min-m", default=0.2, show_default=True, type=float, help="Minimum displayed depth in meters for depth_preview.png.")
@click.option("--depth-max-m", default=5.0, show_default=True, type=float, help="Maximum displayed depth in meters for depth_preview.png.")
@click.option(
"--depth-palette",
default="Turbo",
show_default=True,
type=click.Choice(tuple(DEPTH_PALETTE_TO_OPENCV.keys()), case_sensitive=False),
help="Depth color palette for depth_preview.png.",
)
def export_sample_command(
mcap_path: Path,
camera_label: str | None,
sample_index: int,
output_dir: Path,
ffmpeg_bin: str,
depth_min_m: float,
depth_max_m: float,
depth_palette: str,
) -> None:
"""Export one per-camera RGB/depth sample from a bundled, copy, or legacy single-camera MCAP file."""
summary = summarize_mcap(mcap_path.resolve())
ensure_supported_layout(summary.base)
if summary.base.validation_status != "valid":
raise click.ClickException(
f"refusing to export from invalid MCAP: {summary.base.validation_reason or summary.base.validation_status}"
)
label = selected_camera_label(summary.base, camera_label)
stats = summary.base.camera_stats[label]
pair_count = min(stats.video_messages, stats.depth_messages)
if pair_count <= 0:
raise click.ClickException(f"camera '{label}' has no paired RGB+depth samples")
if sample_index >= pair_count:
raise click.ClickException(
f"--sample-index {sample_index} is outside 0..{pair_count - 1} for camera '{label}'"
)
selected_palette = next(
palette_name
for palette_name in DEPTH_PALETTE_TO_OPENCV
if palette_name.lower() == depth_palette.lower()
)
video_sample, depth_sample, bundle_sample, raw_video_bytes = collect_sample_data(
mcap_path.resolve(),
layout=summary.base.layout,
camera_label=label,
sample_index=sample_index,
)
write_sample_outputs(
path=mcap_path.resolve(),
layout=summary.base.layout,
output_dir=output_dir.expanduser().resolve(),
camera_label=label,
sample_index=sample_index,
video_sample=video_sample,
depth_sample=depth_sample,
bundle_sample=bundle_sample,
raw_video_bytes=raw_video_bytes,
ffmpeg_bin=ffmpeg_bin,
depth_min_m=depth_min_m,
depth_max_m=depth_max_m,
depth_palette_name=selected_palette,
)
click.echo(f"wrote sample export: {output_dir.expanduser().resolve()}")
if __name__ == "__main__":
main()
File diff suppressed because it is too large Load Diff
-255
View File
@@ -1,255 +0,0 @@
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Generic, Protocol, TypeVar
import click
from click.core import ParameterSource
class SegmentScanLike(Protocol):
segment_dir: Path
matched_files: int
is_valid: bool
ScanT = TypeVar("ScanT", bound=SegmentScanLike)
@dataclass(slots=True, frozen=True)
class SourceResolution(Generic[ScanT]):
mode: str
segment_dirs: tuple[Path, ...]
ignored_partial_dirs: tuple[ScanT, ...]
def dedupe_paths(paths: list[Path]) -> list[Path]:
ordered: list[Path] = []
seen: set[Path] = set()
for path in paths:
resolved = path.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
ordered.append(resolved)
return ordered
def parse_segments_csv(csv_path: Path, csv_root: Path | None) -> tuple[Path, ...]:
csv_path = csv_path.expanduser().resolve()
if not csv_path.is_file():
raise click.ClickException(f"CSV not found: {csv_path}")
if csv_root is not None:
base_dir = csv_root.expanduser().resolve()
if not base_dir.is_dir():
raise click.ClickException(f"CSV root is not a directory: {base_dir}")
else:
base_dir = csv_path.parent
segment_dirs: list[Path] = []
seen: set[Path] = set()
with csv_path.open(newline="") as stream:
reader = csv.DictReader(stream)
if reader.fieldnames is None or "segment_dir" not in reader.fieldnames:
raise click.ClickException(f"{csv_path} must contain a 'segment_dir' header")
for row_number, row in enumerate(reader, start=2):
raw_segment_dir = (row.get("segment_dir") or "").strip()
if not raw_segment_dir:
raise click.ClickException(f"{csv_path}:{row_number} has an empty segment_dir value")
segment_dir = Path(raw_segment_dir)
resolved = segment_dir if segment_dir.is_absolute() else base_dir / segment_dir
resolved = resolved.expanduser().resolve()
if resolved in seen:
continue
seen.add(resolved)
segment_dirs.append(resolved)
if not segment_dirs:
raise click.ClickException(f"{csv_path} did not contain any segment_dir rows")
return tuple(segment_dirs)
def discover_segment_dirs(
root: Path,
recursive: bool,
*,
scan_segment_dir: Callable[[Path], ScanT],
no_matches_message: Callable[[Path], str],
) -> SourceResolution[ScanT]:
resolved_root = root.expanduser().resolve()
if not resolved_root.is_dir():
raise click.ClickException(f"dataset root does not exist: {resolved_root}")
candidate_dirs = {resolved_root}
iterator = resolved_root.rglob("*") if recursive else resolved_root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_dirs: list[Path] = []
ignored_partial_dirs: list[ScanT] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_dirs.append(segment_dir)
elif scan.matched_files > 0:
ignored_partial_dirs.append(scan)
if not valid_dirs:
raise click.ClickException(no_matches_message(resolved_root))
return SourceResolution(
mode="dataset-root",
segment_dirs=tuple(valid_dirs),
ignored_partial_dirs=tuple(ignored_partial_dirs),
)
def raise_if_recursive_flag_is_incompatible(
ctx: click.Context,
dataset_root: Path | None,
*,
dataset_root_flag: str = "--dataset-root",
) -> None:
if ctx.get_parameter_source("recursive") is ParameterSource.DEFAULT:
return
if dataset_root is None:
raise click.ClickException(f"--recursive/--no-recursive can only be used with {dataset_root_flag}")
def raise_for_legacy_source_args(
legacy_input_dir: Path | None,
legacy_segment_dirs: tuple[Path, ...],
*,
dataset_root_flag: str = "--dataset-root",
segment_flag: str = "--segment",
) -> None:
if legacy_input_dir is not None:
resolved = legacy_input_dir.expanduser().resolve()
raise click.ClickException(
f"positional dataset paths are no longer supported; use {dataset_root_flag} {resolved}"
)
if legacy_segment_dirs:
resolved = legacy_segment_dirs[0].expanduser().resolve()
raise click.ClickException(
f"--segment-dir is no longer supported in this batch wrapper; use {segment_flag} {resolved} "
f"for an explicit segment directory, or {dataset_root_flag} <DATASET_ROOT> --recursive for discovery"
)
def raise_for_legacy_extra_args(
extra_args: list[str],
*,
dataset_root_flag: str = "--dataset-root",
) -> None:
if not extra_args:
return
first = extra_args[0]
if first.startswith("-"):
extras_text = " ".join(extra_args)
raise click.ClickException(f"unexpected extra arguments: {extras_text}")
resolved = Path(first).expanduser().resolve()
raise click.ClickException(
f"positional dataset paths are no longer supported; use {dataset_root_flag} {resolved}"
)
def raise_if_segment_path_looks_like_dataset_root(
segment_dir: Path,
*,
scan_segment_dir: Callable[[Path], ScanT],
dataset_root_flag: str = "--dataset-root",
segment_flag: str = "--segment",
) -> None:
resolved = segment_dir.expanduser().resolve()
if not resolved.is_dir():
return
scan = scan_segment_dir(resolved)
if scan.is_valid or scan.matched_files > 0:
return
nested_segments = _find_nested_valid_segment_dirs(resolved, scan_segment_dir=scan_segment_dir)
if not nested_segments:
return
example = nested_segments[0]
raise click.ClickException(
f"{resolved} looks like a dataset root, not a segment directory. "
f"{segment_flag} expects a directory that directly contains *_zedN.svo or *_zedN.svo2 files. "
f"Use {dataset_root_flag} {resolved} to discover nested segments such as {example}"
)
def resolve_sources(
dataset_root: Path | None,
segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
*,
scan_segment_dir: Callable[[Path], ScanT],
no_matches_message: Callable[[Path], str],
) -> SourceResolution[ScanT]:
source_count = sum(
(
1 if dataset_root is not None else 0,
1 if segment_dirs else 0,
1 if segments_csv is not None else 0,
)
)
if source_count != 1:
raise click.ClickException(
"provide exactly one source mode: --dataset-root, --segment, or --segments-csv"
)
if dataset_root is not None:
return discover_segment_dirs(
dataset_root,
recursive,
scan_segment_dir=scan_segment_dir,
no_matches_message=no_matches_message,
)
if segment_dirs:
ordered_dirs = dedupe_paths(list(segment_dirs))
for segment_dir in ordered_dirs:
raise_if_segment_path_looks_like_dataset_root(
segment_dir,
scan_segment_dir=scan_segment_dir,
)
return SourceResolution(mode="segments", segment_dirs=tuple(ordered_dirs), ignored_partial_dirs=())
return SourceResolution(
mode="segments-csv",
segment_dirs=parse_segments_csv(segments_csv, csv_root),
ignored_partial_dirs=(),
)
def _find_nested_valid_segment_dirs(
root: Path,
*,
scan_segment_dir: Callable[[Path], ScanT],
limit: int = 3,
) -> tuple[Path, ...]:
matches: list[Path] = []
for path in sorted(root.rglob("*")):
if not path.is_dir():
continue
resolved = path.resolve()
if resolved == root:
continue
scan = scan_segment_dir(resolved)
if scan.is_valid:
matches.append(resolved)
if len(matches) >= limit:
break
return tuple(matches)
-747
View File
@@ -1,747 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import json
import math
import os
import re
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import click
from tqdm import tqdm
try:
from scripts import zed_batch_segment_sources as segment_sources
except ModuleNotFoundError:
import zed_batch_segment_sources as segment_sources
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
SEGMENT_FILE_PATTERN = re.compile(r".*_zed([1-4])\.svo2?$", re.IGNORECASE)
EXPECTED_CAMERAS = ("zed1", "zed2", "zed3", "zed4")
@dataclass(slots=True, frozen=True)
class BatchConfig:
zed_bin: Path | None
ffprobe_bin: Path | None
probe_existing: bool
cuda_visible_devices: str | None
overwrite: bool
fail_fast: bool
codec: str
encoder_device: str
preset: str
tune: str
quality: int
gop: int
b_frames: int
start_offset_seconds: float
duration_seconds: float | None
output_fps: float | None
tile_scale: float
@dataclass(slots=True, frozen=True)
class ConversionJob:
segment_dir: Path
output_path: Path
@dataclass(slots=True, frozen=True)
class JobResult:
status: str
segment_dir: Path
output_path: Path
command: tuple[str, ...]
return_code: int = 0
stdout: str = ""
stderr: str = ""
@dataclass(slots=True, frozen=True)
class SegmentScan:
segment_dir: Path
matched_files: int
is_valid: bool
reason: str | None = None
@dataclass(slots=True, frozen=True)
class OutputProbeResult:
output_path: Path
status: str
reason: str = ""
duration_seconds: float | None = None
def locate_binary(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / "zed_svo_grid_to_mp4",
REPO_ROOT / "build" / "zed_svo_grid_to_mp4",
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find zed_svo_grid_to_mp4 under {REPO_ROOT / 'build'}")
def locate_ffprobe(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"ffprobe binary not found: {candidate}")
return candidate
resolved = shutil.which("ffprobe")
if resolved is None:
raise click.ClickException("could not find ffprobe on PATH")
return Path(resolved).resolve()
def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir():
return SegmentScan(
segment_dir=segment_dir,
matched_files=0,
is_valid=False,
reason=f"segment directory does not exist: {segment_dir}",
)
matched_by_camera: dict[str, list[Path]] = {camera: [] for camera in EXPECTED_CAMERAS}
for child in segment_dir.iterdir():
if not child.is_file():
continue
match = SEGMENT_FILE_PATTERN.fullmatch(child.name)
if match is None:
continue
matched_by_camera[f"zed{match.group(1)}"].append(child)
matched_files = sum(len(paths) for paths in matched_by_camera.values())
duplicate_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) > 1]
missing_cameras = [camera for camera, paths in matched_by_camera.items() if len(paths) == 0]
if duplicate_cameras:
duplicate_text = ", ".join(duplicate_cameras)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
is_valid=False,
reason=f"duplicate camera inputs under {segment_dir}: {duplicate_text}",
)
if missing_cameras:
missing_text = ", ".join(missing_cameras)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
is_valid=False,
reason=f"missing camera inputs under {segment_dir}: {missing_text}",
)
return SegmentScan(segment_dir=segment_dir, matched_files=matched_files, is_valid=True)
def output_path_for(segment_dir: Path) -> Path:
return segment_dir / f"{segment_dir.name}_grid.mp4"
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
if config.zed_bin is None:
raise RuntimeError("zed_svo_grid_to_mp4 binary is not configured")
command = [
str(config.zed_bin),
"--segment-dir",
str(job.segment_dir),
"--codec",
config.codec,
"--encoder-device",
config.encoder_device,
"--preset",
config.preset,
"--tune",
config.tune,
"--quality",
str(config.quality),
"--gop",
str(config.gop),
"--b-frames",
str(config.b_frames),
"--start-offset-seconds",
str(config.start_offset_seconds),
"--tile-scale",
str(config.tile_scale),
]
if config.duration_seconds is not None:
command.extend(["--duration-seconds", str(config.duration_seconds)])
if config.output_fps is not None:
command.extend(["--output-fps", str(config.output_fps)])
return command
def env_for_job(config: BatchConfig) -> dict[str, str]:
env = dict(os.environ)
if config.cuda_visible_devices is not None:
env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices
return env
def probe_output(output_path: Path, ffprobe_bin: Path | None) -> OutputProbeResult:
if not output_path.is_file():
return OutputProbeResult(output_path=output_path, status="missing")
if ffprobe_bin is None:
raise RuntimeError("ffprobe binary is not configured")
completed = subprocess.run(
[
str(ffprobe_bin),
"-v",
"error",
"-print_format",
"json",
"-show_entries",
"format=duration,size:stream=codec_type,codec_name,width,height,nb_frames",
str(output_path),
],
check=False,
capture_output=True,
text=True,
)
if completed.returncode != 0:
reason = completed.stderr.strip() or completed.stdout.strip() or "ffprobe failed"
return OutputProbeResult(output_path=output_path, status="invalid", reason=reason)
try:
payload = json.loads(completed.stdout)
except json.JSONDecodeError as error:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe returned invalid JSON: {error}",
)
streams = payload.get("streams", [])
has_video_stream = any(stream.get("codec_type") == "video" for stream in streams)
if not has_video_stream:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe found no video stream",
)
format_payload = payload.get("format", {})
duration_text = format_payload.get("duration")
if duration_text in (None, ""):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason="ffprobe did not report a duration",
)
try:
duration_seconds = float(duration_text)
except (TypeError, ValueError):
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-numeric duration: {duration_text!r}",
)
if not math.isfinite(duration_seconds) or duration_seconds <= 0.0:
return OutputProbeResult(
output_path=output_path,
status="invalid",
reason=f"ffprobe reported a non-positive duration: {duration_seconds}",
)
return OutputProbeResult(
output_path=output_path,
status="valid",
duration_seconds=duration_seconds,
)
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
env=env_for_job(config),
)
status = "converted" if completed.returncode == 0 else "failed"
return JobResult(
status=status,
segment_dir=job.segment_dir,
output_path=job.output_path,
command=tuple(command),
return_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def summarize_failures(results: list[JobResult]) -> None:
failed_results = [result for result in results if result.status == "failed"]
if not failed_results:
return
click.echo("\nFailed conversions:", err=True)
for result in failed_results:
click.echo(f"- {result.segment_dir} (exit {result.return_code})", err=True)
if result.stderr.strip():
click.echo(result.stderr.rstrip(), err=True)
elif result.stdout.strip():
click.echo(result.stdout.rstrip(), err=True)
def report_invalid_existing_outputs(
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]],
) -> None:
if not invalid_existing:
return
click.echo("\nInvalid existing outputs:", err=True)
for job, probe in invalid_existing:
click.echo(f"- {job.segment_dir}", err=True)
click.echo(f" output: {probe.output_path}", err=True)
reason_lines = probe.reason.splitlines() or [probe.reason]
click.echo(f" reason: {reason_lines[0]}", err=True)
for line in reason_lines[1:]:
click.echo(f" {line}", err=True)
def report_dry_run_plan(
pending_jobs: list[ConversionJob],
pending_reasons: dict[Path, str],
pending_details: dict[Path, str],
) -> None:
if not pending_jobs:
click.echo("dry-run: no conversions would be launched", err=True)
return
click.echo("\nDry-run plan:", err=True)
for job in pending_jobs:
reason = pending_reasons[job.segment_dir]
detail = pending_details.get(job.segment_dir)
line = f"- {job.segment_dir} [{reason}]"
if detail:
line = f"{line}: {detail.replace(chr(10), ' | ')}"
click.echo(line, err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = []
aborted_count = 0
if not jobs:
return results, aborted_count
future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {}
job_iter = iter(jobs)
stop_submitting = False
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor:
with tqdm(total=len(jobs), unit="segment", dynamic_ncols=True) as progress:
def submit_next() -> bool:
if stop_submitting:
return False
try:
job = next(job_iter)
except StopIteration:
return False
future = executor.submit(run_conversion, job, config)
future_to_job[future] = job
return True
for _ in range(min(jobs_limit, len(jobs))):
submit_next()
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
job = future_to_job.pop(future)
result = future.result()
results.append(result)
progress.update(1)
if result.status == "failed":
tqdm.write(
f"failed: {job.segment_dir} (exit {result.return_code})",
file=sys.stderr,
)
if config.fail_fast:
stop_submitting = True
if not stop_submitting:
submit_next()
if stop_submitting:
remaining = sum(1 for _ in job_iter)
aborted_count = remaining
progress.total = progress.n + len(future_to_job)
progress.refresh()
return results, aborted_count
@click.command(context_settings={"allow_extra_args": True})
@click.option(
"--dataset-root",
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
help="Dataset root containing segment directories. Mutually exclusive with --segment and --segments-csv.",
)
@click.option(
"--segment",
"segment_dirs",
multiple=True,
type=click.Path(exists=True, path_type=Path, file_okay=False, dir_okay=True),
help=(
"Explicit segment directory. Repeatable. The directory must directly contain "
"*_zedN.svo or *_zedN.svo2 files. Mutually exclusive with --dataset-root and --segments-csv."
),
)
@click.option(
"--segment-dir",
"legacy_segment_dirs",
multiple=True,
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
hidden=True,
)
@click.option(
"--segments-csv",
type=click.Path(path_type=Path, dir_okay=False),
help="CSV file containing a segment_dir column. Mutually exclusive with --dataset-root and --segment.",
)
@click.option(
"--csv-root",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
help="Base directory for relative segment_dir entries in --segments-csv. Defaults to the CSV parent directory.",
)
@click.option(
"--recursive/--no-recursive",
default=True,
show_default=True,
help="Recurse when discovering segment directories from --dataset-root.",
)
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_grid_to_mp4 binary.",
)
@click.option(
"--ffprobe-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to ffprobe. Required when probing existing outputs and ffprobe is not on PATH.",
)
@click.option(
"--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
)
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing grid MP4 files.")
@click.option(
"--probe-existing/--trust-existing",
default=False,
show_default=True,
help="Validate existing grid MP4 files with ffprobe before skipping them. Invalid outputs are treated as missing.",
)
@click.option(
"--report-existing",
is_flag=True,
help="Probe existing grid MP4 files with ffprobe, report invalid ones, and do not launch conversions.",
)
@click.option(
"--dry-run",
is_flag=True,
help="Show which segments would be converted after applying skip/probe logic, without launching conversions.",
)
@click.option(
"--fail-fast/--continue-on-error",
default=False,
show_default=True,
help="Stop submitting new work after the first failed conversion.",
)
@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True)
@click.option(
"--encoder-device",
type=click.Choice(("auto", "nvidia", "software")),
default="auto",
show_default=True,
)
@click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True)
@click.option(
"--tune",
type=click.Choice(("low-latency", "balanced")),
default="low-latency",
show_default=True,
)
@click.option(
"--quality",
type=click.IntRange(min=0, max=51),
default=23,
show_default=True,
help="Lower values mean higher quality.",
)
@click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True)
@click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True)
@click.option(
"--start-offset-seconds",
type=click.FloatRange(min=0.0),
default=0.0,
show_default=True,
help="Offset applied after the synced common start time.",
)
@click.option(
"--duration-seconds",
type=click.FloatRange(min=0.0, min_open=True),
default=None,
help="Limit export duration in seconds after sync.",
)
@click.option(
"--output-fps",
type=click.FloatRange(min=0.0, min_open=True),
default=None,
help="Composite output frame rate. Defaults to the grid tool's native behavior.",
)
@click.option(
"--tile-scale",
type=click.FloatRange(min=0.1, max=1.0),
default=0.5,
show_default=True,
help="Scale each tile relative to the source resolution.",
)
@click.pass_context
def main(
ctx: click.Context,
dataset_root: Path | None,
segment_dirs: tuple[Path, ...],
legacy_segment_dirs: tuple[Path, ...],
segments_csv: Path | None,
csv_root: Path | None,
recursive: bool,
jobs: int,
zed_bin: Path | None,
ffprobe_bin: Path | None,
cuda_visible_devices: str | None,
overwrite: bool,
probe_existing: bool,
report_existing: bool,
dry_run: bool,
fail_fast: bool,
codec: str,
encoder_device: str,
preset: str,
tune: str,
quality: int,
gop: int,
b_frames: int,
start_offset_seconds: float,
duration_seconds: float | None,
output_fps: float | None,
tile_scale: float,
) -> None:
"""Batch-convert synced four-camera ZED segments into grid MP4 files."""
segment_sources.raise_for_legacy_extra_args(ctx.args)
segment_sources.raise_for_legacy_source_args(None, legacy_segment_dirs)
segment_sources.raise_if_recursive_flag_is_incompatible(ctx, dataset_root)
if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if report_existing and dry_run:
raise click.ClickException("--report-existing and --dry-run are mutually exclusive")
sources = segment_sources.resolve_sources(
dataset_root,
segment_dirs,
segments_csv,
csv_root,
recursive,
scan_segment_dir=scan_segment_dir,
no_matches_message=lambda root: f"no complete four-camera segments found under {root}",
)
ffprobe_path = locate_ffprobe(ffprobe_bin) if (probe_existing or report_existing) else None
binary_path = None if report_existing else locate_binary(zed_bin)
config = BatchConfig(
zed_bin=binary_path,
ffprobe_bin=ffprobe_path,
probe_existing=probe_existing or report_existing,
cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite,
fail_fast=fail_fast,
codec=codec,
encoder_device=encoder_device,
preset=preset,
tune=tune,
quality=quality,
gop=gop,
b_frames=b_frames,
start_offset_seconds=start_offset_seconds,
duration_seconds=duration_seconds,
output_fps=output_fps,
tile_scale=tile_scale,
)
skipped_results: list[JobResult] = []
failed_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = []
pending_reasons: dict[Path, str] = {}
pending_details: dict[Path, str] = {}
valid_existing: list[OutputProbeResult] = []
invalid_existing: list[tuple[ConversionJob, OutputProbeResult]] = []
missing_outputs: list[ConversionJob] = []
for segment_dir in sources.segment_dirs:
output_path = output_path_for(segment_dir)
job = ConversionJob(segment_dir=segment_dir, output_path=output_path)
command = tuple(command_for_job(job, config)) if config.zed_bin is not None else ()
scan = scan_segment_dir(segment_dir)
if not scan.is_valid:
failed_results.append(
JobResult(
status="failed",
segment_dir=segment_dir,
output_path=output_path,
command=command,
return_code=2,
stderr=scan.reason or "",
)
)
continue
if report_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
elif probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
else:
missing_outputs.append(job)
continue
if overwrite:
pending_jobs.append(job)
pending_reasons[segment_dir] = "overwrite"
continue
if config.probe_existing:
probe_result = probe_output(output_path, config.ffprobe_bin)
if probe_result.status == "valid":
valid_existing.append(probe_result)
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
if probe_result.status == "invalid":
invalid_existing.append((job, probe_result))
pending_jobs.append(job)
pending_reasons[segment_dir] = "invalid-existing-output"
pending_details[segment_dir] = probe_result.reason
continue
missing_outputs.append(job)
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
continue
if output_path.exists():
skipped_results.append(
JobResult(
status="skipped",
segment_dir=segment_dir,
output_path=output_path,
command=command,
)
)
continue
pending_jobs.append(job)
pending_reasons[segment_dir] = "missing-output"
if report_existing:
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} valid={len(valid_existing)} "
f"invalid={len(invalid_existing)} missing={len(missing_outputs)} "
f"invalid-segments={len(failed_results)}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
report_invalid_existing_outputs(invalid_existing)
summarize_failures(failed_results)
if failed_results or invalid_existing:
raise SystemExit(1)
return
click.echo(
(
f"source={sources.mode} matched={len(sources.segment_dirs)} pending={len(pending_jobs)} "
f"skipped={len(skipped_results)} invalid={len(failed_results)} jobs={jobs} "
f"dry_run={'yes' if dry_run else 'no'}"
),
err=True,
)
if sources.ignored_partial_dirs:
click.echo(f"ignored_incomplete={len(sources.ignored_partial_dirs)}", err=True)
if config.probe_existing:
click.echo(
(
f"probed-existing: valid={len(valid_existing)} invalid={len(invalid_existing)} "
f"missing={len(missing_outputs)}"
),
err=True,
)
if dry_run:
report_dry_run_plan(pending_jobs, pending_reasons, pending_details)
summarize_failures(failed_results)
if failed_results:
raise SystemExit(1)
return
results = list(skipped_results)
results.extend(failed_results)
conversion_results, aborted_count = run_batch(pending_jobs, config, jobs)
results.extend(conversion_results)
converted_count = sum(1 for result in results if result.status == "converted")
skipped_count = sum(1 for result in results if result.status == "skipped")
failed_count = sum(1 for result in results if result.status == "failed")
click.echo(
(
f"summary: matched={len(sources.segment_dirs)} converted={converted_count} "
f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}"
),
err=True,
)
summarize_failures(results)
if failed_count > 0 or aborted_count > 0:
raise SystemExit(1)
if __name__ == "__main__":
main()
File diff suppressed because it is too large Load Diff
-361
View File
@@ -1,361 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import os
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import click
from tqdm import tqdm
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
DEFAULT_PATTERNS = ("*.svo2",)
SUPPORTED_SUFFIXES = {".svo", ".svo2"}
@dataclass(slots=True, frozen=True)
class BatchConfig:
zed_bin: Path
cuda_visible_devices: str | None
overwrite: bool
fail_fast: bool
codec: str
encoder_device: str
preset: str
tune: str
quality: int
gop: int
b_frames: int
start_frame: int
end_frame: int | None
@dataclass(slots=True, frozen=True)
class ConversionJob:
input_path: Path
output_path: Path
@dataclass(slots=True, frozen=True)
class JobResult:
status: str
input_path: Path
output_path: Path
command: tuple[str, ...]
return_code: int = 0
stdout: str = ""
stderr: str = ""
def locate_binary(override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / "zed_svo_to_mp4",
REPO_ROOT / "build" / "zed_svo_to_mp4",
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find zed_svo_to_mp4 under {REPO_ROOT / 'build'}")
def discover_inputs(root: Path, patterns: Iterable[str], recursive: bool) -> list[Path]:
discovered: set[Path] = set()
for pattern in patterns:
iterator = root.rglob(pattern) if recursive else root.glob(pattern)
for path in iterator:
if path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES:
discovered.add(path.absolute())
return sorted(discovered)
def output_path_for(input_path: Path) -> Path:
if input_path.suffix:
return input_path.with_suffix(".mp4")
return input_path.with_name(f"{input_path.name}.mp4")
def command_for_job(job: ConversionJob, config: BatchConfig) -> list[str]:
command = [
str(config.zed_bin),
"--input",
str(job.input_path),
"--codec",
config.codec,
"--encoder-device",
config.encoder_device,
"--preset",
config.preset,
"--tune",
config.tune,
"--quality",
str(config.quality),
"--gop",
str(config.gop),
"--b-frames",
str(config.b_frames),
"--start-frame",
str(config.start_frame),
]
if config.end_frame is not None:
command.extend(["--end-frame", str(config.end_frame)])
return command
def env_for_job(config: BatchConfig) -> dict[str, str]:
env = dict(os.environ)
if config.cuda_visible_devices is not None:
env["CUDA_VISIBLE_DEVICES"] = config.cuda_visible_devices
return env
def run_conversion(job: ConversionJob, config: BatchConfig) -> JobResult:
command = command_for_job(job, config)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
env=env_for_job(config),
)
status = "converted" if completed.returncode == 0 else "failed"
return JobResult(
status=status,
input_path=job.input_path,
output_path=job.output_path,
command=tuple(command),
return_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def summarize_failures(results: list[JobResult]) -> None:
failed_results = [result for result in results if result.status == "failed"]
if not failed_results:
return
click.echo("\nFailed conversions:", err=True)
for result in failed_results:
click.echo(f"- {result.input_path} (exit {result.return_code})", err=True)
if result.stderr.strip():
click.echo(result.stderr.rstrip(), err=True)
elif result.stdout.strip():
click.echo(result.stdout.rstrip(), err=True)
def run_batch(jobs: list[ConversionJob], config: BatchConfig, jobs_limit: int) -> tuple[list[JobResult], int]:
results: list[JobResult] = []
aborted_count = 0
if not jobs:
return results, aborted_count
future_to_job: dict[concurrent.futures.Future[JobResult], ConversionJob] = {}
job_iter = iter(jobs)
stop_submitting = False
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs_limit) as executor:
with tqdm(total=len(jobs), unit="file", dynamic_ncols=True) as progress:
def submit_next() -> bool:
if stop_submitting:
return False
try:
job = next(job_iter)
except StopIteration:
return False
future = executor.submit(run_conversion, job, config)
future_to_job[future] = job
return True
for _ in range(min(jobs_limit, len(jobs))):
submit_next()
while future_to_job:
done, _ = concurrent.futures.wait(
future_to_job,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
job = future_to_job.pop(future)
result = future.result()
results.append(result)
progress.update(1)
if result.status == "failed":
tqdm.write(f"failed: {job.input_path} (exit {result.return_code})", file=sys.stderr)
if config.fail_fast:
stop_submitting = True
if not stop_submitting:
submit_next()
if stop_submitting:
remaining = sum(1 for _ in job_iter)
aborted_count = remaining
progress.total = progress.n + len(future_to_job)
progress.refresh()
return results, aborted_count
@click.command()
@click.argument("input_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
@click.option(
"--pattern",
"patterns",
multiple=True,
default=DEFAULT_PATTERNS,
show_default=True,
help="Glob pattern to match under the input directory. Repeatable.",
)
@click.option("--recursive/--no-recursive", default=True, show_default=True, help="Use rglob instead of glob.")
@click.option("--jobs", default=1, show_default=True, type=click.IntRange(min=1), help="Parallel conversion jobs.")
@click.option(
"--zed-bin",
type=click.Path(path_type=Path, dir_okay=False),
help="Explicit path to the zed_svo_to_mp4 binary.",
)
@click.option(
"--cuda-visible-devices",
help="Optional CUDA_VISIBLE_DEVICES value exported for each conversion subprocess.",
)
@click.option("--overwrite/--skip-existing", default=False, show_default=True, help="Overwrite existing MP4 files.")
@click.option(
"--fail-fast/--continue-on-error",
default=False,
show_default=True,
help="Stop submitting new work after the first failed conversion.",
)
@click.option("--codec", type=click.Choice(("h264", "h265")), default="h265", show_default=True)
@click.option(
"--encoder-device",
type=click.Choice(("auto", "nvidia", "software")),
default="auto",
show_default=True,
)
@click.option("--preset", type=click.Choice(("fast", "balanced", "quality")), default="fast", show_default=True)
@click.option(
"--tune",
type=click.Choice(("low-latency", "balanced")),
default="low-latency",
show_default=True,
)
@click.option(
"--quality",
type=click.IntRange(min=0, max=51),
default=23,
show_default=True,
help="Lower values mean higher quality.",
)
@click.option("--gop", type=click.IntRange(min=1), default=30, show_default=True)
@click.option("--b-frames", "b_frames", type=click.IntRange(min=0), default=0, show_default=True)
@click.option("--start-frame", type=click.IntRange(min=0), default=0, show_default=True)
@click.option("--end-frame", type=click.IntRange(min=0), default=None)
def main(
input_dir: Path,
patterns: tuple[str, ...],
recursive: bool,
jobs: int,
zed_bin: Path | None,
cuda_visible_devices: str | None,
overwrite: bool,
fail_fast: bool,
codec: str,
encoder_device: str,
preset: str,
tune: str,
quality: int,
gop: int,
b_frames: int,
start_frame: int,
end_frame: int | None,
) -> None:
"""Batch-convert ZED SVO/SVO2 recordings in a folder to MP4."""
if b_frames > gop:
raise click.BadParameter(f"b-frames {b_frames} must be <= gop {gop}", param_hint="--b-frames")
if end_frame is not None and end_frame < start_frame:
raise click.BadParameter(
f"end-frame {end_frame} must be >= start-frame {start_frame}",
param_hint="--end-frame",
)
binary_path = locate_binary(zed_bin)
inputs = discover_inputs(input_dir.absolute(), patterns, recursive)
if not inputs:
raise click.ClickException(f"no .svo/.svo2 files matched under {input_dir}")
config = BatchConfig(
zed_bin=binary_path,
cuda_visible_devices=cuda_visible_devices,
overwrite=overwrite,
fail_fast=fail_fast,
codec=codec,
encoder_device=encoder_device,
preset=preset,
tune=tune,
quality=quality,
gop=gop,
b_frames=b_frames,
start_frame=start_frame,
end_frame=end_frame,
)
skipped_results: list[JobResult] = []
pending_jobs: list[ConversionJob] = []
for input_path in inputs:
output_path = output_path_for(input_path)
command = tuple(command_for_job(ConversionJob(input_path, output_path), config))
if output_path.exists() and not overwrite:
skipped_results.append(
JobResult(
status="skipped",
input_path=input_path,
output_path=output_path,
command=command,
)
)
continue
pending_jobs.append(ConversionJob(input_path=input_path, output_path=output_path))
click.echo(
f"matched={len(inputs)} pending={len(pending_jobs)} skipped={len(skipped_results)} jobs={jobs}",
err=True,
)
results = list(skipped_results)
conversion_results, aborted_count = run_batch(pending_jobs, config, jobs)
results.extend(conversion_results)
converted_count = sum(1 for result in results if result.status == "converted")
skipped_count = sum(1 for result in results if result.status == "skipped")
failed_count = sum(1 for result in results if result.status == "failed")
click.echo(
(
f"summary: matched={len(inputs)} converted={converted_count} "
f"skipped={skipped_count} failed={failed_count} aborted={aborted_count}"
),
err=True,
)
summarize_failures(results)
if failed_count > 0:
raise SystemExit(1)
if aborted_count > 0:
raise SystemExit(1)
if __name__ == "__main__":
main()
-374
View File
@@ -1,374 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import math
import os
import shlex
import subprocess
import sys
import tempfile
from collections import Counter
from pathlib import Path
from typing import Iterable
import cv2
import numpy as np
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
WORKSPACE_ROOT = REPO_ROOT.parent
MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap"
if str(MCAP_PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(MCAP_PYTHON_ROOT))
from mcap.reader import make_reader # noqa: E402
VIDEO_FORMATS = ("h264", "h265")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Convert ZED SVO/SVO2 recordings to MCAP and generate a lightweight preview. "
"If the input is already an MCAP file, conversion is skipped."
)
)
parser.add_argument("input", help="Input .svo/.svo2 file, .mcap file, or a directory containing SVO files")
parser.add_argument("--output-dir", help="Directory for generated MCAP files and previews")
parser.add_argument(
"--preview-all",
action="store_true",
help="When the input is a directory, generate a preview for every converted MCAP instead of just the first one",
)
parser.add_argument("--no-preview", action="store_true", help="Convert only, do not generate preview images")
parser.add_argument(
"--format",
choices=("auto", "h264", "h265"),
default="auto",
help="CompressedVideo format to extract from MCAP during preview",
)
parser.add_argument("--codec", choices=VIDEO_FORMATS, default="h264", help="Video codec for SVO to MCAP conversion")
parser.add_argument(
"--encoder-device",
choices=("auto", "nvidia", "software"),
default="software",
help="Encoder device passed to zed_svo_to_mcap",
)
parser.add_argument(
"--mcap-compression",
choices=("none", "lz4", "zstd"),
default="none",
help="MCAP chunk compression passed to zed_svo_to_mcap",
)
parser.add_argument(
"--depth-mode",
choices=("neural_light", "neural", "neural_plus"),
default="neural_plus",
help="Depth mode passed to zed_svo_to_mcap",
)
parser.add_argument(
"--depth-size",
default="optimal",
help="Depth size passed to zed_svo_to_mcap (optimal|native|<width>x<height>)",
)
parser.add_argument("--start-frame", type=int, default=0, help="First SVO frame to convert")
parser.add_argument("--end-frame", type=int, help="Last SVO frame to convert")
parser.add_argument(
"--sample-count",
type=int,
default=9,
help="Number of decoded frames to place in the preview contact sheet",
)
parser.add_argument(
"--frame-step",
type=int,
default=15,
help="Decode every Nth frame for the contact sheet",
)
parser.add_argument(
"--contact-sheet-width",
type=int,
default=480,
help="Width of each preview tile in pixels",
)
parser.add_argument(
"--cuda-visible-devices",
help=(
"Optional CUDA_VISIBLE_DEVICES value to export while running zed_svo_to_mcap. "
"Useful when the ZED SDK must be pinned to a specific GPU UUID."
),
)
parser.add_argument("--zed-bin", help="Explicit path to zed_svo_to_mcap")
parser.add_argument("--reader-bin", help="Explicit path to mcap_reader_tester")
return parser.parse_args()
def locate_binary(name: str, override: str | None) -> Path:
if override:
path = Path(override).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"binary not found: {path}")
return path
candidates = (
REPO_ROOT / "build" / "bin" / name,
REPO_ROOT / "build" / name,
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise FileNotFoundError(f"could not find {name} under {REPO_ROOT / 'build'}")
def quote_command(args: Iterable[str]) -> str:
return " ".join(shlex.quote(arg) for arg in args)
def run(args: list[str], env: dict[str, str] | None = None) -> None:
print(f"$ {quote_command(args)}", flush=True)
subprocess.run(args, check=True, env=env)
def summarize_mcap(mcap_path: Path) -> list[tuple[str, str, str, int]]:
counts: Counter[tuple[str, str, str]] = Counter()
with mcap_path.open("rb") as stream:
reader = make_reader(stream)
for schema, channel, _message in reader.iter_messages():
schema_name = schema.name if schema is not None else "<none>"
counts[(channel.topic, channel.message_encoding, schema_name)] += 1
summary_rows = [
(topic, encoding, schema_name, count)
for (topic, encoding, schema_name), count in sorted(counts.items())
]
print(f"MCAP summary: {mcap_path}")
for topic, encoding, schema_name, count in summary_rows:
print(f" {count:6d} topic={topic} encoding={encoding} schema={schema_name}")
return summary_rows
def infer_video_format(reader_bin: Path, mcap_path: Path, requested: str) -> str:
if requested != "auto":
return requested
for candidate in VIDEO_FORMATS:
result = subprocess.run(
[str(reader_bin), str(mcap_path), "--expect-format", candidate, "--min-messages", "1"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=False,
)
if result.returncode == 0:
return candidate
raise RuntimeError(f"could not infer video format from {mcap_path}")
def dump_annexb(reader_bin: Path, mcap_path: Path, video_format: str, output_path: Path) -> None:
run(
[
str(reader_bin),
str(mcap_path),
"--expect-format",
video_format,
"--min-messages",
"1",
"--dump-annexb-output",
str(output_path),
]
)
def make_contact_sheet(stream_path: Path, image_path: Path, sample_count: int, frame_step: int, tile_width: int) -> int:
capture = cv2.VideoCapture(str(stream_path))
if not capture.isOpened():
raise RuntimeError(f"OpenCV could not open decoded stream {stream_path}")
frames: list[np.ndarray] = []
frame_index = 0
while len(frames) < sample_count:
ok, frame = capture.read()
if not ok:
break
if frame_index % frame_step == 0:
annotated = frame.copy()
cv2.putText(
annotated,
f"frame {frame_index}",
(20, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 255, 0),
2,
cv2.LINE_AA,
)
frames.append(annotated)
frame_index += 1
capture.release()
if not frames:
raise RuntimeError(f"no frames decoded from {stream_path}")
tile_width = max(64, tile_width)
resized: list[np.ndarray] = []
for frame in frames:
scale = tile_width / frame.shape[1]
tile_height = max(1, int(round(frame.shape[0] * scale)))
resized.append(cv2.resize(frame, (tile_width, tile_height), interpolation=cv2.INTER_AREA))
max_height = max(frame.shape[0] for frame in resized)
padded: list[np.ndarray] = []
for frame in resized:
if frame.shape[0] == max_height:
padded.append(frame)
continue
canvas = np.zeros((max_height, frame.shape[1], 3), dtype=np.uint8)
canvas[: frame.shape[0], :, :] = frame
padded.append(canvas)
columns = max(1, math.ceil(math.sqrt(len(padded))))
rows = math.ceil(len(padded) / columns)
blank = np.zeros_like(padded[0])
row_images: list[np.ndarray] = []
for row_index in range(rows):
row_frames = padded[row_index * columns : (row_index + 1) * columns]
while len(row_frames) < columns:
row_frames.append(blank)
row_images.append(np.concatenate(row_frames, axis=1))
sheet = np.concatenate(row_images, axis=0)
image_path.parent.mkdir(parents=True, exist_ok=True)
if not cv2.imwrite(str(image_path), sheet):
raise RuntimeError(f"failed to write preview image {image_path}")
print(f"Preview contact sheet: {image_path}")
return len(frames)
def collect_svo_inputs(input_path: Path) -> list[Path]:
if input_path.is_file():
if input_path.suffix.lower() in {".svo", ".svo2"}:
return [input_path]
if input_path.suffix.lower() == ".mcap":
return []
raise ValueError(f"unsupported input file: {input_path}")
if input_path.is_dir():
return sorted(
path for path in input_path.rglob("*") if path.suffix.lower() in {".svo", ".svo2"}
)
raise FileNotFoundError(f"input not found: {input_path}")
def default_output_dir(input_path: Path) -> Path:
if input_path.is_dir():
return input_path / "mcap_preview"
return input_path.parent / "mcap_preview"
def convert_svo(
zed_bin: Path,
svo_path: Path,
mcap_path: Path,
args: argparse.Namespace,
) -> None:
env = os.environ.copy()
if args.cuda_visible_devices:
env["CUDA_VISIBLE_DEVICES"] = args.cuda_visible_devices
command = [
str(zed_bin),
"--input",
str(svo_path),
"--output",
str(mcap_path),
"--codec",
args.codec,
"--encoder-device",
args.encoder_device,
"--mcap-compression",
args.mcap_compression,
"--depth-mode",
args.depth_mode,
"--depth-size",
args.depth_size,
"--start-frame",
str(args.start_frame),
]
if args.end_frame is not None:
command.extend(["--end-frame", str(args.end_frame)])
mcap_path.parent.mkdir(parents=True, exist_ok=True)
run(command, env=env)
def preview_mcap(reader_bin: Path, mcap_path: Path, args: argparse.Namespace) -> None:
summarize_mcap(mcap_path)
video_format = infer_video_format(reader_bin, mcap_path, args.format)
print(f"Detected video format: {video_format}")
stream_extension = ".h265" if video_format == "h265" else ".h264"
with tempfile.TemporaryDirectory(prefix="zed_mcap_preview_") as temp_dir:
temp_root = Path(temp_dir)
stream_path = temp_root / f"preview{stream_extension}"
dump_annexb(reader_bin, mcap_path, video_format, stream_path)
preview_path = mcap_path.with_suffix(".preview.png")
decoded = make_contact_sheet(
stream_path,
preview_path,
sample_count=args.sample_count,
frame_step=args.frame_step,
tile_width=args.contact_sheet_width,
)
print(f"Decoded {decoded} preview frame(s)")
def main() -> int:
args = parse_args()
input_path = Path(args.input).expanduser().resolve()
output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else default_output_dir(input_path)
output_dir.mkdir(parents=True, exist_ok=True)
reader_bin = locate_binary("mcap_reader_tester", args.reader_bin)
zed_bin = locate_binary("zed_svo_to_mcap", args.zed_bin) if input_path.suffix.lower() != ".mcap" or input_path.is_dir() else None
if input_path.is_file() and input_path.suffix.lower() == ".mcap":
if not args.no_preview:
preview_mcap(reader_bin, input_path, args)
return 0
svo_inputs = collect_svo_inputs(input_path)
if not svo_inputs:
raise RuntimeError(f"no .svo/.svo2 files found under {input_path}")
converted_paths: list[Path] = []
for svo_path in svo_inputs:
output_name = f"{svo_path.stem}.mcap"
mcap_path = output_dir / output_name
convert_svo(zed_bin, svo_path, mcap_path, args)
converted_paths.append(mcap_path)
if args.no_preview:
return 0
preview_targets = converted_paths if args.preview_all else converted_paths[:1]
for mcap_path in preview_targets:
preview_mcap(reader_bin, mcap_path, args)
print("Generated MCAP files:")
for mcap_path in converted_paths:
print(f" {mcap_path}")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)
-658
View File
@@ -1,658 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import datetime as dt
import json
import os
import re
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import click
import duckdb
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
DEFAULT_INDEX_NAME = "segment_time_index.duckdb"
INDEX_SCHEMA_VERSION = "1"
SEGMENT_FILE_PATTERN = re.compile(r".*_zed([0-9]+)\.svo2?$", re.IGNORECASE)
FOLDER_TIMESTAMP_PATTERN = re.compile(
r"^(?P<date>\d{4}-\d{2}-\d{2})[T ](?P<hour>\d{2})-(?P<minute>\d{2})-(?P<second>\d{2})(?P<fraction>\.\d+)?(?P<timezone>Z|[+-]\d{2}:\d{2})?$"
)
@dataclass(slots=True, frozen=True)
class SegmentScan:
segment_dir: Path
matched_files: int
camera_labels: tuple[str, ...]
is_valid: bool
reason: str | None = None
@dataclass(slots=True, frozen=True)
class BoundsRow:
segment_dir: Path
relative_segment_dir: str
group_path: str
activity: str
segment_name: str
mcap_path: Path
start_ns: int
end_ns: int
duration_ns: int
start_iso_utc: str
end_iso_utc: str
camera_count: int
camera_labels: str
video_message_count: int
index_source: str
def sorted_camera_labels(labels: set[str]) -> tuple[str, ...]:
return tuple(sorted(labels, key=lambda label: int(label[3:])))
def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir():
return SegmentScan(
segment_dir=segment_dir,
matched_files=0,
camera_labels=(),
is_valid=False,
reason=f"segment directory does not exist: {segment_dir}",
)
matched_by_camera: dict[str, list[Path]] = {}
for child in segment_dir.iterdir():
if not child.is_file():
continue
match = SEGMENT_FILE_PATTERN.fullmatch(child.name)
if match is None:
continue
label = f"zed{int(match.group(1))}"
matched_by_camera.setdefault(label, []).append(child)
matched_files = sum(len(paths) for paths in matched_by_camera.values())
camera_labels = sorted_camera_labels(set(matched_by_camera))
duplicate_cameras = [label for label, paths in sorted(matched_by_camera.items()) if len(paths) > 1]
if duplicate_cameras:
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"duplicate camera inputs under {segment_dir}: {', '.join(duplicate_cameras)}",
)
if len(camera_labels) < 2:
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"expected at least 2 camera inputs under {segment_dir}, found {len(camera_labels)}",
)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=True,
)
def discover_segment_dirs(root: Path, recursive: bool) -> tuple[list[SegmentScan], list[SegmentScan]]:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_scans: list[SegmentScan] = []
ignored_partial_scans: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_scans.append(scan)
elif scan.matched_files > 0:
ignored_partial_scans.append(scan)
if not valid_scans:
raise click.ClickException(f"no multi-camera segments found under {root}")
return valid_scans, ignored_partial_scans
def locate_binary(name: str, override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / name,
REPO_ROOT / "build" / name,
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find {name} under {REPO_ROOT / 'build'}")
def default_index_path(dataset_root: Path) -> Path:
return dataset_root / DEFAULT_INDEX_NAME
def find_unique_mcap(segment_dir: Path) -> Path | None:
matches = sorted(path for path in segment_dir.iterdir() if path.is_file() and path.suffix.lower() == ".mcap")
if len(matches) == 1:
return matches[0]
return None
def format_ns_iso(ns: int, tzinfo: dt.tzinfo) -> str:
seconds, nanos = divmod(ns, 1_000_000_000)
stamp = dt.datetime.fromtimestamp(seconds, tz=dt.timezone.utc).astimezone(tzinfo)
offset = stamp.strftime("%z")
offset = f"{offset[:3]}:{offset[3:]}" if offset else ""
return f"{stamp.strftime('%Y-%m-%dT%H:%M:%S')}.{nanos:09d}{offset}"
def format_ns_utc(ns: int) -> str:
return format_ns_iso(ns, dt.timezone.utc).replace("+00:00", "Z")
def resolve_timezone(name: str) -> dt.tzinfo:
if name == "local":
local = dt.datetime.now().astimezone().tzinfo
if local is None:
raise click.ClickException("could not resolve local timezone")
return local
if name == "UTC":
return dt.timezone.utc
if name.startswith("UTC") and len(name) == len("UTC+00:00"):
try:
sign = 1 if name[3] == "+" else -1
hours = int(name[4:6])
minutes = int(name[7:9])
except ValueError as exc:
raise click.ClickException(f"invalid fixed UTC offset '{name}'") from exc
return dt.timezone(sign * dt.timedelta(hours=hours, minutes=minutes))
try:
return ZoneInfo(name)
except Exception as exc: # pragma: no cover - defensive wrapper around system tzdb
raise click.ClickException(f"unknown timezone '{name}': {exc}") from exc
def normalize_timestamp_text(value: str) -> str:
match = FOLDER_TIMESTAMP_PATTERN.fullmatch(value)
if match is None:
return value
parts = match.groupdict()
fraction = parts["fraction"] or ""
timezone_text = parts["timezone"] or ""
return f"{parts['date']}T{parts['hour']}:{parts['minute']}:{parts['second']}{fraction}{timezone_text}"
def parse_folder_name_naive(value: str) -> dt.datetime | None:
normalized = normalize_timestamp_text(value)
try:
parsed = dt.datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return None
return parsed
def datetime_to_ns(value: dt.datetime) -> int:
utc_value = value.astimezone(dt.timezone.utc)
return int(utc_value.timestamp()) * 1_000_000_000 + utc_value.microsecond * 1_000
def parse_timestamp_to_ns(value: str, timezone_name: str) -> int:
stripped = value.strip()
if not stripped:
raise click.ClickException("timestamp value is empty")
digit_text = stripped.lstrip("+-")
if digit_text.isdigit():
raw_value = int(stripped)
digits = len(digit_text)
if digits <= 10:
return raw_value * 1_000_000_000
if digits <= 13:
return raw_value * 1_000_000
if digits <= 16:
return raw_value * 1_000
return raw_value
normalized = normalize_timestamp_text(stripped)
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(normalized)
except ValueError as exc:
raise click.ClickException(f"invalid timestamp '{value}': {exc}") from exc
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=resolve_timezone(timezone_name))
return datetime_to_ns(parsed)
def parse_timestamp_window(value: str, timezone_name: str) -> tuple[int, int]:
stripped = value.strip()
if not stripped:
raise click.ClickException("timestamp value is empty")
digit_text = stripped.lstrip("+-")
if digit_text.isdigit():
base_ns = parse_timestamp_to_ns(stripped, timezone_name)
digits = len(digit_text)
if digits <= 10:
precision_ns = 1_000_000_000
elif digits <= 13:
precision_ns = 1_000_000
elif digits <= 16:
precision_ns = 1_000
else:
precision_ns = 1
return base_ns, base_ns + precision_ns - 1
normalized = normalize_timestamp_text(stripped)
base_ns = parse_timestamp_to_ns(stripped, timezone_name)
fraction_match = re.search(r"\.(\d+)", normalized)
if fraction_match is None:
precision_ns = 1_000_000_000
else:
digits = min(len(fraction_match.group(1)), 9)
precision_ns = 10 ** (9 - digits)
return base_ns, base_ns + precision_ns - 1
def probe_mcap_bounds(bounds_bin: Path, mcap_path: Path) -> dict[str, Any]:
result = subprocess.run(
[str(bounds_bin), str(mcap_path), "--json"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
stderr = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
raise RuntimeError(f"{mcap_path}: {stderr}")
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"{mcap_path}: failed to parse helper JSON: {exc}") from exc
def build_row(dataset_root: Path, scan: SegmentScan, bounds_bin: Path) -> BoundsRow | None:
mcap_path = find_unique_mcap(scan.segment_dir)
if mcap_path is None:
return None
bounds = probe_mcap_bounds(bounds_bin, mcap_path)
relative_segment_dir = scan.segment_dir.relative_to(dataset_root).as_posix()
parent = Path(relative_segment_dir).parent
group_path = "" if str(parent) == "." else parent.as_posix()
parts = Path(relative_segment_dir).parts
activity = parts[0] if parts else scan.segment_dir.name
start_ns = int(bounds["start_ns"])
end_ns = int(bounds["end_ns"])
return BoundsRow(
segment_dir=scan.segment_dir,
relative_segment_dir=relative_segment_dir,
group_path=group_path,
activity=activity,
segment_name=scan.segment_dir.name,
mcap_path=mcap_path,
start_ns=start_ns,
end_ns=end_ns,
duration_ns=max(0, end_ns - start_ns),
start_iso_utc=str(bounds["start_iso_utc"]),
end_iso_utc=str(bounds["end_iso_utc"]),
camera_count=len(scan.camera_labels),
camera_labels=",".join(scan.camera_labels),
video_message_count=int(bounds["video_message_count"]),
index_source="mcap_video_bounds",
)
def init_db(conn: duckdb.DuckDBPyConnection) -> None:
conn.execute(
"""
CREATE TABLE meta (
key VARCHAR PRIMARY KEY,
value VARCHAR NOT NULL
);
"""
)
conn.execute(
"""
CREATE TABLE segments (
segment_dir VARCHAR PRIMARY KEY,
relative_segment_dir VARCHAR NOT NULL,
group_path VARCHAR NOT NULL,
activity VARCHAR NOT NULL,
segment_name VARCHAR NOT NULL,
mcap_path VARCHAR NOT NULL,
start_ns BIGINT NOT NULL,
end_ns BIGINT NOT NULL,
duration_ns BIGINT NOT NULL,
start_iso_utc VARCHAR NOT NULL,
end_iso_utc VARCHAR NOT NULL,
camera_count INTEGER NOT NULL,
camera_labels VARCHAR NOT NULL,
video_message_count BIGINT NOT NULL,
index_source VARCHAR NOT NULL
);
"""
)
conn.execute("CREATE INDEX segments_start_ns_idx ON segments(start_ns);")
conn.execute("CREATE INDEX segments_end_ns_idx ON segments(end_ns);")
def write_index(index_path: Path, dataset_root: Path, rows: list[BoundsRow]) -> None:
index_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(prefix=f"{index_path.name}.", suffix=".tmp", dir=index_path.parent, delete=False) as handle:
temp_path = Path(handle.name)
temp_path.unlink(missing_ok=True)
inferred_timezone = infer_dataset_timezone(rows)
try:
conn = duckdb.connect(str(temp_path))
try:
init_db(conn)
conn.executemany(
"INSERT INTO meta (key, value) VALUES (?, ?)",
[
("schema_version", INDEX_SCHEMA_VERSION),
("dataset_root", str(dataset_root)),
("built_at_utc", dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")),
("default_timezone", inferred_timezone),
],
)
conn.executemany(
"""
INSERT INTO segments (
segment_dir,
relative_segment_dir,
group_path,
activity,
segment_name,
mcap_path,
start_ns,
end_ns,
duration_ns,
start_iso_utc,
end_iso_utc,
camera_count,
camera_labels,
video_message_count,
index_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
str(row.segment_dir),
row.relative_segment_dir,
row.group_path,
row.activity,
row.segment_name,
str(row.mcap_path),
row.start_ns,
row.end_ns,
row.duration_ns,
row.start_iso_utc,
row.end_iso_utc,
row.camera_count,
row.camera_labels,
row.video_message_count,
row.index_source,
)
for row in rows
],
)
finally:
conn.close()
temp_path.replace(index_path)
except Exception:
temp_path.unlink(missing_ok=True)
raise
def infer_dataset_timezone(rows: list[BoundsRow]) -> str:
offset_counts: dict[int, int] = {}
for row in rows:
folder_time = parse_folder_name_naive(row.segment_name)
if folder_time is None:
continue
actual_utc = dt.datetime.fromtimestamp(row.start_ns / 1_000_000_000, tz=dt.timezone.utc).replace(tzinfo=None)
offset_minutes = round((folder_time - actual_utc).total_seconds() / 60.0)
offset_counts[offset_minutes] = offset_counts.get(offset_minutes, 0) + 1
if not offset_counts:
return "local"
minutes = max(offset_counts.items(), key=lambda item: item[1])[0]
if minutes == 0:
return "UTC"
sign = "+" if minutes >= 0 else "-"
absolute_minutes = abs(minutes)
hours, mins = divmod(absolute_minutes, 60)
return f"UTC{sign}{hours:02d}:{mins:02d}"
def require_query_window(at: str | None, start: str | None, end: str | None, timezone_name: str) -> tuple[int, int]:
if at is not None and (start is not None or end is not None):
raise click.ClickException("use either --at or --start/--end, not both")
if at is not None:
return parse_timestamp_window(at, timezone_name)
if start is None or end is None:
raise click.ClickException("provide --at or both --start and --end")
start_ns = parse_timestamp_to_ns(start, timezone_name)
end_ns = parse_timestamp_to_ns(end, timezone_name)
if start_ns > end_ns:
raise click.ClickException("query start must be before or equal to query end")
return start_ns, end_ns
def load_meta(conn: duckdb.DuckDBPyConnection) -> dict[str, str]:
rows = conn.execute("SELECT key, value FROM meta").fetchall()
return {str(key): str(value) for key, value in rows}
def format_duration(duration_ns: int) -> str:
return f"{duration_ns / 1_000_000_000:.3f}s"
@click.group()
def cli() -> None:
"""Build and query a DuckDB index of bundled ZED segment timestamps."""
@cli.command()
@click.argument("dataset_root", type=click.Path(path_type=Path, file_okay=False))
@click.option("--index", "index_path", type=click.Path(path_type=Path, dir_okay=False))
@click.option("--recursive/--no-recursive", default=True, show_default=True)
@click.option("--jobs", type=click.IntRange(min=1), default=min(8, os.cpu_count() or 1), show_default=True)
@click.option("--bounds-bin", type=click.Path(path_type=Path, dir_okay=False))
def build(dataset_root: Path, index_path: Path | None, recursive: bool, jobs: int, bounds_bin: Path | None) -> None:
"""Build or replace the embedded DuckDB time index for DATASET_ROOT."""
dataset_root = dataset_root.expanduser().resolve()
index_path = (index_path or default_index_path(dataset_root)).expanduser().resolve()
bounds_binary = locate_binary("mcap_video_bounds", bounds_bin)
valid_scans, ignored_partial_scans = discover_segment_dirs(dataset_root, recursive)
click.echo(
f"discovered {len(valid_scans)} valid segment directories under {dataset_root}",
err=True,
)
if ignored_partial_scans:
click.echo(f"ignored {len(ignored_partial_scans)} partial segment directories", err=True)
rows: list[BoundsRow] = []
skipped_missing_mcap: list[Path] = []
errors: list[str] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
future_to_scan: dict[concurrent.futures.Future[BoundsRow | None], SegmentScan] = {
executor.submit(build_row, dataset_root, scan, bounds_binary): scan for scan in valid_scans
}
for future in concurrent.futures.as_completed(future_to_scan):
scan = future_to_scan[future]
try:
row = future.result()
except Exception as exc:
errors.append(f"{scan.segment_dir}: {exc}")
continue
if row is None:
skipped_missing_mcap.append(scan.segment_dir)
continue
rows.append(row)
rows.sort(key=lambda row: (row.start_ns, row.segment_dir.as_posix()))
if skipped_missing_mcap:
click.echo(f"skipped {len(skipped_missing_mcap)} segments with missing or ambiguous MCAP files", err=True)
if errors:
for error in errors:
click.echo(f"error: {error}", err=True)
raise click.ClickException(f"failed to probe {len(errors)} segment(s)")
if not rows:
raise click.ClickException("no indexable MCAP segments were found")
write_index(index_path, dataset_root, rows)
click.echo(
f"wrote {len(rows)} segments to {index_path} (skipped_missing_mcap={len(skipped_missing_mcap)})",
err=True,
)
@cli.command()
@click.argument("dataset_root", type=click.Path(path_type=Path, file_okay=False))
@click.option("--index", "index_path", type=click.Path(path_type=Path, dir_okay=False))
@click.option("--at")
@click.option("--start")
@click.option("--end")
@click.option("--json", "as_json", is_flag=True)
@click.option("--timezone", "timezone_name", default="dataset", show_default=True)
def query(
dataset_root: Path,
index_path: Path | None,
at: str | None,
start: str | None,
end: str | None,
as_json: bool,
timezone_name: str,
) -> None:
"""Query the embedded time index for matching segment folders."""
dataset_root = dataset_root.expanduser().resolve()
index_path = (index_path or default_index_path(dataset_root)).expanduser().resolve()
if not index_path.is_file():
raise click.ClickException(f"index not found: {index_path}")
conn = duckdb.connect(str(index_path), read_only=True)
try:
meta = load_meta(conn)
indexed_root = Path(meta.get("dataset_root", "")).expanduser().resolve()
if indexed_root != dataset_root:
raise click.ClickException(
f"index root mismatch: index was built for {indexed_root}, not {dataset_root}"
)
effective_timezone_name = meta.get("default_timezone", "local") if timezone_name == "dataset" else timezone_name
query_start_ns, query_end_ns = require_query_window(at, start, end, effective_timezone_name)
display_timezone = resolve_timezone(effective_timezone_name)
result_rows = conn.execute(
"""
SELECT
segment_dir,
relative_segment_dir,
group_path,
activity,
segment_name,
mcap_path,
start_ns,
end_ns,
duration_ns,
start_iso_utc,
end_iso_utc,
camera_count,
camera_labels,
video_message_count,
index_source
FROM segments
WHERE start_ns <= ? AND end_ns >= ?
ORDER BY start_ns, segment_dir
""",
[query_end_ns, query_start_ns],
).fetchall()
finally:
conn.close()
payload = [
{
"segment_dir": row[0],
"relative_segment_dir": row[1],
"group_path": row[2],
"activity": row[3],
"segment_name": row[4],
"mcap_path": row[5],
"start_ns": row[6],
"end_ns": row[7],
"duration_ns": row[8],
"start_iso_utc": row[9],
"end_iso_utc": row[10],
"camera_count": row[11],
"camera_labels": row[12].split(",") if row[12] else [],
"video_message_count": row[13],
"index_source": row[14],
"start_display": format_ns_iso(row[6], display_timezone),
"end_display": format_ns_iso(row[7], display_timezone),
}
for row in result_rows
]
if as_json:
click.echo(json.dumps(payload, indent=2, ensure_ascii=False))
return
if not payload:
click.echo("no matching segments")
return
click.echo(f"matched {len(payload)} segment(s)")
for row in payload:
click.echo(
" | ".join(
(
row["start_display"],
row["end_display"],
format_duration(int(row["duration_ns"])),
row["segment_dir"],
row["mcap_path"],
)
)
)
if __name__ == "__main__":
cli()