Add extrinsic calibration CLI tool

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
2026-02-04 11:44:49 +00:00
parent 43e736faac
commit 69a3d1e8bf
+243
View File
@@ -0,0 +1,243 @@
import click
import cv2
import json
import numpy as np
import pyzed.sl as sl
from pathlib import Path
from typing import List, Dict, Any, Optional
from aruco.marker_geometry import load_marker_geometry, validate_marker_geometry
from aruco.svo_sync import SVOReader
from aruco.detector import (
create_detector,
detect_markers,
build_camera_matrix_from_zed,
estimate_pose_from_detections,
)
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
from aruco.pose_averaging import PoseAccumulator
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
@click.command()
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
@click.option(
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
)
@click.option(
"--max-reproj-error",
"-e",
default=2.0,
type=float,
help="Max reprojection error for pose.",
)
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
@click.option(
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
)
@click.option(
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
)
def main(
svo,
markers,
output,
sample_interval,
max_reproj_error,
preview,
validate_markers,
self_check,
):
"""
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
"""
# 1. Load Marker Geometry
try:
marker_geometry = load_marker_geometry(markers)
if validate_markers:
validate_marker_geometry(marker_geometry)
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
except Exception as e:
click.echo(f"Error loading markers: {e}", err=True)
raise SystemExit(1)
if not svo:
if validate_markers:
click.echo("Marker validation successful. No SVOs provided, exiting.")
return
else:
click.echo(
"Error: --svo is required unless --validate-markers is used.", err=True
)
raise click.UsageError("Missing option '--svo' / '-s'.")
# 2. Initialize SVO Reader
reader = SVOReader(svo)
if not reader.cameras:
click.echo("No SVO files could be opened.", err=True)
return
# Align SVOs
reader.sync_to_latest_start()
# Calculate max frames to process to avoid infinite loop
max_frames = 10000 # Default safety limit
if reader.cameras:
remaining = []
for i, cam in enumerate(reader.cameras):
total = reader.camera_info[i]["total_frames"]
if total > 0:
current = cam.get_svo_position()
remaining.append(total - current)
else:
# If any total_frames is unknown (<= 0), use a hard limit
remaining = [10000]
break
if remaining:
max_frames = min(remaining)
else:
click.echo(
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
)
serials = [info["serial"] for info in reader.camera_info]
accumulators = {serial: PoseAccumulator() for serial in serials}
camera_matrices = {
serial: build_camera_matrix_from_zed(cam)
for serial, cam in zip(serials, reader.cameras)
}
detector = create_detector()
frame_count = 0
sampled_count = 0
click.echo(f"Processing SVOs: {serials}")
try:
while frame_count < max_frames:
frames = reader.grab_synced()
if not any(frames):
break
if frame_count % sample_interval == 0:
preview_frames = {}
for i, frame in enumerate(frames):
if frame is None:
continue
serial = frame.serial_number
K = camera_matrices[serial]
# Detect markers
corners, ids = detect_markers(frame.image, detector)
if ids is None:
if preview:
preview_frames[serial] = frame.image
continue
# Estimate pose (T_cam_from_world)
pose_res = estimate_pose_from_detections(
corners, ids, marker_geometry, K, min_markers=4
)
if pose_res:
rvec, tvec, reproj_err, n_markers = pose_res
if reproj_err <= max_reproj_error:
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
# We want T_world_from_cam
T_world_cam = invert_transform(T_cam_world)
accumulators[serial].add_pose(
T_world_cam, reproj_err, frame_count
)
if preview:
img = draw_detected_markers(
frame.image.copy(), corners, ids
)
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
preview_frames[serial] = img
elif preview:
preview_frames[serial] = frame.image
if preview and preview_frames:
key = show_preview(preview_frames)
if key == 27 or key == ord("q"):
break
sampled_count += 1
frame_count += 1
if frame_count % 100 == 0:
counts = [len(acc.poses) for acc in accumulators.values()]
click.echo(
f"Frame {frame_count}, Detections: {dict(zip(serials, counts))}"
)
except KeyboardInterrupt:
click.echo("\nInterrupted by user.")
finally:
reader.close()
cv2.destroyAllWindows()
# 3. Compute Final Poses
results = {}
for serial, acc in accumulators.items():
if not acc.poses:
click.echo(f"Warning: No valid poses for camera {serial}")
continue
# Use RANSAC to find best consensus
inliers = acc.ransac_filter()
T_mean, stats = acc.compute_robust_mean(inliers)
# Flatten for JSON as space-separated string
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
results[str(serial)] = {"pose": pose_str, "stats": stats}
click.echo(
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
)
if not results:
click.echo("No extrinsics computed.", err=True)
return
# 4. Save to JSON
with open(output, "w") as f:
json.dump(results, f, indent=4, sort_keys=True)
click.echo(f"Saved extrinsics to {output}")
# 5. Optional Self-Check
if self_check:
# Verify reprojection error
for serial, data in results.items():
if data["stats"]["median_reproj_error"] > max_reproj_error:
click.echo(
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
err=True,
)
raise SystemExit(1)
# Simple check: verify distance between cameras if multiple
if len(results) >= 2:
serials_list = sorted(results.keys())
for i in range(len(serials_list)):
for j in range(i + 1, len(serials_list)):
s1 = serials_list[i]
s2 = serials_list[j]
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
dist = np.linalg.norm(p1 - p2)
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
if __name__ == "__main__":
main()