From 69a3d1e8bfbdda7281147c201c6be4d12d6e533b Mon Sep 17 00:00:00 2001 From: crosstyan Date: Wed, 4 Feb 2026 11:44:49 +0000 Subject: [PATCH] Add extrinsic calibration CLI tool Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus --- py_workspace/calibrate_extrinsics.py | 243 +++++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 py_workspace/calibrate_extrinsics.py diff --git a/py_workspace/calibrate_extrinsics.py b/py_workspace/calibrate_extrinsics.py new file mode 100644 index 0000000..e81e3ac --- /dev/null +++ b/py_workspace/calibrate_extrinsics.py @@ -0,0 +1,243 @@ +import click +import cv2 +import json +import numpy as np +import pyzed.sl as sl +from pathlib import Path +from typing import List, Dict, Any, Optional + +from aruco.marker_geometry import load_marker_geometry, validate_marker_geometry +from aruco.svo_sync import SVOReader +from aruco.detector import ( + create_detector, + detect_markers, + build_camera_matrix_from_zed, + estimate_pose_from_detections, +) +from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec +from aruco.pose_averaging import PoseAccumulator +from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview + + +@click.command() +@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.") +@click.option("--markers", "-m", required=True, help="Path to markers parquet file.") +@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.") +@click.option( + "--sample-interval", "-n", default=30, type=int, help="Sample every N frames." +) +@click.option( + "--max-reproj-error", + "-e", + default=2.0, + type=float, + help="Max reprojection error for pose.", +) +@click.option("--preview/--no-preview", default=True, help="Show preview window.") +@click.option( + "--validate-markers/--no-validate", default=True, help="Validate marker geometry." +) +@click.option( + "--self-check/--no-self-check", default=False, help="Perform self-check on result." +) +def main( + svo, + markers, + output, + sample_interval, + max_reproj_error, + preview, + validate_markers, + self_check, +): + """ + Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers. + """ + # 1. Load Marker Geometry + try: + marker_geometry = load_marker_geometry(markers) + if validate_markers: + validate_marker_geometry(marker_geometry) + click.echo(f"Loaded {len(marker_geometry)} markers from {markers}") + except Exception as e: + click.echo(f"Error loading markers: {e}", err=True) + raise SystemExit(1) + + if not svo: + if validate_markers: + click.echo("Marker validation successful. No SVOs provided, exiting.") + return + else: + click.echo( + "Error: --svo is required unless --validate-markers is used.", err=True + ) + raise click.UsageError("Missing option '--svo' / '-s'.") + + # 2. Initialize SVO Reader + reader = SVOReader(svo) + if not reader.cameras: + click.echo("No SVO files could be opened.", err=True) + return + + # Align SVOs + reader.sync_to_latest_start() + + # Calculate max frames to process to avoid infinite loop + max_frames = 10000 # Default safety limit + if reader.cameras: + remaining = [] + for i, cam in enumerate(reader.cameras): + total = reader.camera_info[i]["total_frames"] + if total > 0: + current = cam.get_svo_position() + remaining.append(total - current) + else: + # If any total_frames is unknown (<= 0), use a hard limit + remaining = [10000] + break + if remaining: + max_frames = min(remaining) + else: + click.echo( + "Warning: Could not determine SVO lengths, using safety limit of 10,000 frames." + ) + + serials = [info["serial"] for info in reader.camera_info] + accumulators = {serial: PoseAccumulator() for serial in serials} + camera_matrices = { + serial: build_camera_matrix_from_zed(cam) + for serial, cam in zip(serials, reader.cameras) + } + + detector = create_detector() + + frame_count = 0 + sampled_count = 0 + + click.echo(f"Processing SVOs: {serials}") + + try: + while frame_count < max_frames: + frames = reader.grab_synced() + if not any(frames): + break + + if frame_count % sample_interval == 0: + preview_frames = {} + for i, frame in enumerate(frames): + if frame is None: + continue + + serial = frame.serial_number + K = camera_matrices[serial] + + # Detect markers + corners, ids = detect_markers(frame.image, detector) + + if ids is None: + if preview: + preview_frames[serial] = frame.image + continue + + # Estimate pose (T_cam_from_world) + pose_res = estimate_pose_from_detections( + corners, ids, marker_geometry, K, min_markers=4 + ) + + if pose_res: + rvec, tvec, reproj_err, n_markers = pose_res + if reproj_err <= max_reproj_error: + T_cam_world = rvec_tvec_to_matrix(rvec, tvec) + # We want T_world_from_cam + T_world_cam = invert_transform(T_cam_world) + accumulators[serial].add_pose( + T_world_cam, reproj_err, frame_count + ) + + if preview: + img = draw_detected_markers( + frame.image.copy(), corners, ids + ) + img = draw_pose_axes(img, rvec, tvec, K, length=0.2) + preview_frames[serial] = img + elif preview: + preview_frames[serial] = frame.image + + if preview and preview_frames: + key = show_preview(preview_frames) + if key == 27 or key == ord("q"): + break + + sampled_count += 1 + + frame_count += 1 + if frame_count % 100 == 0: + counts = [len(acc.poses) for acc in accumulators.values()] + click.echo( + f"Frame {frame_count}, Detections: {dict(zip(serials, counts))}" + ) + + except KeyboardInterrupt: + click.echo("\nInterrupted by user.") + finally: + reader.close() + cv2.destroyAllWindows() + + # 3. Compute Final Poses + results = {} + for serial, acc in accumulators.items(): + if not acc.poses: + click.echo(f"Warning: No valid poses for camera {serial}") + continue + + # Use RANSAC to find best consensus + inliers = acc.ransac_filter() + T_mean, stats = acc.compute_robust_mean(inliers) + + # Flatten for JSON as space-separated string + pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten()) + + results[str(serial)] = {"pose": pose_str, "stats": stats} + click.echo( + f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}" + ) + + if not results: + click.echo("No extrinsics computed.", err=True) + return + + # 4. Save to JSON + with open(output, "w") as f: + json.dump(results, f, indent=4, sort_keys=True) + click.echo(f"Saved extrinsics to {output}") + + # 5. Optional Self-Check + if self_check: + # Verify reprojection error + for serial, data in results.items(): + if data["stats"]["median_reproj_error"] > max_reproj_error: + click.echo( + f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})", + err=True, + ) + raise SystemExit(1) + + # Simple check: verify distance between cameras if multiple + if len(results) >= 2: + serials_list = sorted(results.keys()) + for i in range(len(serials_list)): + for j in range(i + 1, len(serials_list)): + s1 = serials_list[i] + s2 = serials_list[j] + p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[ + :3, 3 + ] + p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[ + :3, 3 + ] + dist = np.linalg.norm(p1 - p2) + click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m") + + +if __name__ == "__main__": + main()