import click import cv2 import json import numpy as np import pyzed.sl as sl from pathlib import Path from typing import List, Dict, Any, Optional from aruco.marker_geometry import load_marker_geometry, validate_marker_geometry from aruco.svo_sync import SVOReader from aruco.detector import ( create_detector, detect_markers, build_camera_matrix_from_zed, estimate_pose_from_detections, ) from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec from aruco.pose_averaging import PoseAccumulator from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview from aruco.depth_verify import verify_extrinsics_with_depth from aruco.depth_refine import refine_extrinsics_with_depth @click.command() @click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.") @click.option("--markers", "-m", required=True, help="Path to markers parquet file.") @click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.") @click.option( "--sample-interval", "-n", default=30, type=int, help="Sample every N frames." ) @click.option( "--max-reproj-error", "-e", default=2.0, type=float, help="Max reprojection error for pose.", ) @click.option("--preview/--no-preview", default=True, help="Show preview window.") @click.option( "--validate-markers/--no-validate", default=True, help="Validate marker geometry." ) @click.option( "--self-check/--no-self-check", default=False, help="Perform self-check on result." ) @click.option( "--verify-depth/--no-verify-depth", default=False, help="Enable depth verification." ) @click.option( "--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement." ) @click.option( "--depth-mode", default="NEURAL", type=click.Choice(["NEURAL", "ULTRA", "PERFORMANCE", "NONE"]), help="Depth computation mode.", ) @click.option( "--depth-confidence-threshold", default=50, type=int, help="Confidence threshold for depth filtering (lower = more confident).", ) @click.option( "--report-csv", type=click.Path(), help="Optional path for per-frame CSV report." ) def main( svo, markers, output, sample_interval, max_reproj_error, preview, validate_markers, self_check, verify_depth, refine_depth, depth_mode, depth_confidence_threshold, report_csv, ): """ Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers. """ depth_mode_map = { "NEURAL": sl.DEPTH_MODE.NEURAL, "ULTRA": sl.DEPTH_MODE.ULTRA, "PERFORMANCE": sl.DEPTH_MODE.PERFORMANCE, "NONE": sl.DEPTH_MODE.NONE, } sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE) # 1. Load Marker Geometry try: marker_geometry = load_marker_geometry(markers) if validate_markers: validate_marker_geometry(marker_geometry) click.echo(f"Loaded {len(marker_geometry)} markers from {markers}") except Exception as e: click.echo(f"Error loading markers: {e}", err=True) raise SystemExit(1) if not svo: if validate_markers: click.echo("Marker validation successful. No SVOs provided, exiting.") return else: click.echo( "Error: --svo is required unless --validate-markers is used.", err=True ) raise click.UsageError("Missing option '--svo' / '-s'.") # 2. Initialize SVO Reader reader = SVOReader(svo, depth_mode=sl_depth_mode) if not reader.cameras: click.echo("No SVO files could be opened.", err=True) return # Align SVOs reader.sync_to_latest_start() # Calculate max frames to process to avoid infinite loop max_frames = 10000 # Default safety limit if reader.cameras: remaining = [] for i, cam in enumerate(reader.cameras): total = reader.camera_info[i]["total_frames"] if total > 0: current = cam.get_svo_position() remaining.append(total - current) else: # If any total_frames is unknown (<= 0), use a hard limit remaining = [10000] break if remaining: max_frames = min(remaining) else: click.echo( "Warning: Could not determine SVO lengths, using safety limit of 10,000 frames." ) serials = [info["serial"] for info in reader.camera_info] accumulators = {serial: PoseAccumulator() for serial in serials} camera_matrices = { serial: build_camera_matrix_from_zed(cam) for serial, cam in zip(serials, reader.cameras) } detector = create_detector() frame_count = 0 sampled_count = 0 click.echo(f"Processing SVOs: {serials}") try: while frame_count < max_frames: frames = reader.grab_synced() if not any(frames): break if frame_count % sample_interval == 0: preview_frames = {} for i, frame in enumerate(frames): if frame is None: continue serial = frame.serial_number K = camera_matrices[serial] # Detect markers corners, ids = detect_markers(frame.image, detector) if ids is None: if preview: preview_frames[serial] = frame.image continue # Estimate pose (T_cam_from_world) pose_res = estimate_pose_from_detections( corners, ids, marker_geometry, K, min_markers=4 ) if pose_res: rvec, tvec, reproj_err, n_markers = pose_res if reproj_err <= max_reproj_error: T_cam_world = rvec_tvec_to_matrix(rvec, tvec) # We want T_world_from_cam T_world_cam = invert_transform(T_cam_world) if refine_depth and frame.depth_map is not None: marker_corners_world = { int(mid): marker_geometry[int(mid)] for mid in ids.flatten() if int(mid) in marker_geometry } if marker_corners_world: T_world_cam_refined, refine_stats = ( refine_extrinsics_with_depth( T_world_cam, marker_corners_world, frame.depth_map, K, ) ) T_world_cam = T_world_cam_refined accumulators[serial].add_pose( T_world_cam, reproj_err, frame_count ) if verify_depth and frame.depth_map is not None: marker_corners_world = { int(mid): marker_geometry[int(mid)] for mid in ids.flatten() if int(mid) in marker_geometry } if marker_corners_world: verify_extrinsics_with_depth( T_world_cam, marker_corners_world, frame.depth_map, K, confidence_thresh=depth_confidence_threshold, ) if preview: img = draw_detected_markers( frame.image.copy(), corners, ids ) img = draw_pose_axes(img, rvec, tvec, K, length=0.2) preview_frames[serial] = img elif preview: preview_frames[serial] = frame.image if preview and preview_frames: key = show_preview(preview_frames) if key == 27 or key == ord("q"): break sampled_count += 1 frame_count += 1 if frame_count % 100 == 0: counts = [len(acc.poses) for acc in accumulators.values()] click.echo( f"Frame {frame_count}, Detections: {dict(zip(serials, counts))}" ) except KeyboardInterrupt: click.echo("\nInterrupted by user.") finally: reader.close() cv2.destroyAllWindows() # 3. Compute Final Poses results = {} for serial, acc in accumulators.items(): if not acc.poses: click.echo(f"Warning: No valid poses for camera {serial}") continue # Use RANSAC to find best consensus inliers = acc.ransac_filter() T_mean, stats = acc.compute_robust_mean(inliers) # Flatten for JSON as space-separated string pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten()) results[str(serial)] = {"pose": pose_str, "stats": stats} click.echo( f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}" ) if not results: click.echo("No extrinsics computed.", err=True) return # 4. Save to JSON with open(output, "w") as f: json.dump(results, f, indent=4, sort_keys=True) click.echo(f"Saved extrinsics to {output}") # 5. Optional Self-Check if self_check: # Verify reprojection error for serial, data in results.items(): if data["stats"]["median_reproj_error"] > max_reproj_error: click.echo( f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})", err=True, ) raise SystemExit(1) # Simple check: verify distance between cameras if multiple if len(results) >= 2: serials_list = sorted(results.keys()) for i in range(len(serials_list)): for j in range(i + 1, len(serials_list)): s1 = serials_list[i] s2 = serials_list[j] p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[ :3, 3 ] p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[ :3, 3 ] dist = np.linalg.norm(p1 - p2) click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m") if __name__ == "__main__": main() # pylint: disable=no-value-for-parameter