Files
zed-playground/py_workspace/calibrate_extrinsics.py
T

912 lines
32 KiB
Python

import click
import cv2
import json
import csv
import numpy as np
import pyzed.sl as sl
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from aruco.marker_geometry import (
load_marker_geometry,
validate_marker_geometry,
load_face_mapping,
)
from aruco.svo_sync import SVOReader
from aruco.detector import (
create_detector,
detect_markers,
build_camera_matrix_from_zed,
estimate_pose_from_detections,
)
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
from aruco.pose_averaging import PoseAccumulator
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
from aruco.depth_verify import verify_extrinsics_with_depth
from aruco.depth_refine import refine_extrinsics_with_depth
from aruco.alignment import (
get_face_normal_from_geometry,
detect_ground_face,
rotation_align_vectors,
apply_alignment_to_pose,
Vec3,
Mat44,
)
from loguru import logger
from jaxtyping import Float
from typing import TYPE_CHECKING
# Type aliases
if TYPE_CHECKING:
Mat33 = Float[np.ndarray, "3 3"]
CornersNC = Float[np.ndarray, "N 3"]
else:
Mat33 = np.ndarray
CornersNC = np.ndarray
ARUCO_DICT_MAP = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11,
}
def score_frame(
n_markers: int,
reproj_err: float,
corners: np.ndarray,
depth_map: Optional[np.ndarray],
depth_confidence_threshold: int = 50,
confidence_map: Optional[np.ndarray] = None,
) -> float:
"""
Compute a quality score for a frame to select the best one for depth verification.
Higher is better.
"""
# Base score: more markers is better, lower reprojection error is better.
# We weight markers heavily as they provide more constraints.
score = n_markers * 100.0 - reproj_err
if depth_map is not None:
# Calculate depth validity ratio at marker corners.
# This ensures we pick a frame where depth is actually available where we need it.
valid_count = 0
total_count = 0
h, w = depth_map.shape[:2]
# corners shape is (N, 4, 2)
flat_corners = corners.reshape(-1, 2)
for pt in flat_corners:
x, y = int(round(pt[0])), int(round(pt[1]))
if 0 <= x < w and 0 <= y < h:
total_count += 1
d = depth_map[y, x]
if np.isfinite(d) and d > 0:
if confidence_map is not None:
# ZED confidence: lower is more confident
if confidence_map[y, x] <= depth_confidence_threshold:
valid_count += 1
else:
valid_count += 1
if total_count > 0:
depth_ratio = valid_count / total_count
score += depth_ratio * 50.0
return score
def apply_depth_verify_refine_postprocess(
results: Dict[str, Any],
verification_frames: Dict[str, Any],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[str, Any],
verify_depth: bool,
refine_depth: bool,
use_confidence_weights: bool,
depth_confidence_threshold: int,
report_csv_path: Optional[str] = None,
) -> Tuple[Dict[str, Any], List[List[Any]]]:
"""
Apply depth verification and refinement to computed extrinsics.
Returns updated results and list of CSV rows.
"""
csv_rows: List[List[Any]] = []
if not (verify_depth or refine_depth):
return results, csv_rows
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
for serial, vf in verification_frames.items():
if str(serial) not in results:
continue
frame = vf["frame"]
ids = vf["ids"]
# Use the FINAL COMPUTED POSE for verification
pose_str = results[str(serial)]["pose"]
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_matrix = camera_matrices[serial]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if marker_corners_world and frame.depth_map is not None:
verify_res = verify_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
results[str(serial)]["depth_verify"] = {
"rmse": verify_res.rmse,
"mean_abs": verify_res.mean_abs,
"median": verify_res.median,
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
"n_valid": verify_res.n_valid,
"n_total": verify_res.n_total,
}
click.echo(
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
)
if refine_depth:
if verify_res.n_valid < 4:
click.echo(
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
)
else:
click.echo(f"Camera {serial}: Refining extrinsics with depth...")
T_refined, refine_stats = refine_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map
if use_confidence_weights
else None,
confidence_thresh=depth_confidence_threshold,
)
verify_res_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
pose_str_refined = " ".join(f"{x:.6f}" for x in T_refined.flatten())
results[str(serial)]["pose"] = pose_str_refined
results[str(serial)]["refine_depth"] = refine_stats
results[str(serial)]["depth_verify_post"] = {
"rmse": verify_res_post.rmse,
"mean_abs": verify_res_post.mean_abs,
"median": verify_res_post.median,
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
"n_valid": verify_res_post.n_valid,
"n_total": verify_res_post.n_total,
}
improvement = verify_res.rmse - verify_res_post.rmse
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
improvement
)
click.echo(
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
f"(Improved by {improvement:.3f}m). "
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
)
# Warning gates
if improvement < 1e-4 and refine_stats["nfev"] > 5:
click.echo(
f" WARNING: Optimization ran for {refine_stats['nfev']} steps but improvement was negligible ({improvement:.6f}m).",
err=True,
)
if not refine_stats["success"] or refine_stats["nfev"] <= 1:
click.echo(
f" WARNING: Optimization might have failed or stalled. Success: {refine_stats['success']}, Steps: {refine_stats['nfev']}. Message: {refine_stats['termination_message']}",
err=True,
)
verify_res = verify_res_post
if report_csv_path:
for mid, cidx, resid in verify_res.residuals:
csv_rows.append([serial, mid, cidx, resid])
if report_csv_path and csv_rows:
with open(report_csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
writer.writerows(csv_rows)
click.echo(f"Saved depth verification report to {report_csv_path}")
return results, csv_rows
def run_benchmark_matrix(
results: Dict[str, Any],
verification_frames: Dict[Any, Any],
first_frames: Dict[Any, Any],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[Any, Any],
depth_confidence_threshold: int,
) -> Dict[str, Any]:
"""
Run benchmark matrix comparing 4 configurations:
1) baseline (linear loss, no confidence weights)
2) robust (soft_l1, f_scale=0.1, no confidence)
3) robust+confidence
4) robust+confidence+best-frame
"""
benchmark_results = {}
configs = [
{
"name": "baseline",
"loss": "linear",
"use_confidence": False,
"use_best_frame": False,
},
{
"name": "robust",
"loss": "soft_l1",
"use_confidence": False,
"use_best_frame": False,
},
{
"name": "robust+confidence",
"loss": "soft_l1",
"use_confidence": True,
"use_best_frame": False,
},
{
"name": "robust+confidence+best-frame",
"loss": "soft_l1",
"use_confidence": True,
"use_best_frame": True,
},
]
click.echo("\nRunning Benchmark Matrix...")
for serial in results.keys():
serial_int = int(serial)
if serial_int not in first_frames or serial_int not in verification_frames:
continue
cam_matrix = camera_matrices[serial_int]
pose_str = results[serial]["pose"]
T_initial = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_bench = {}
for config in configs:
name = config["name"]
use_best = config["use_best_frame"]
vf = (
verification_frames[serial_int]
if use_best
else first_frames[serial_int]
)
frame = vf["frame"]
ids = vf["ids"]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if not marker_corners_world or frame.depth_map is None:
continue
# Pre-refinement verification
verify_pre = verify_extrinsics_with_depth(
T_initial,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
# Refinement
T_refined, refine_stats = refine_extrinsics_with_depth(
T_initial,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map
if config["use_confidence"]
else None,
confidence_thresh=depth_confidence_threshold,
loss=str(config["loss"]),
f_scale=0.1,
)
# Post-refinement verification
verify_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
cam_bench[name] = {
"rmse_pre": verify_pre.rmse,
"rmse_post": verify_post.rmse,
"improvement": verify_pre.rmse - verify_post.rmse,
"delta_rot_deg": refine_stats["delta_rotation_deg"],
"delta_trans_m": refine_stats["delta_translation_norm_m"],
"nfev": refine_stats["nfev"],
"success": refine_stats["success"],
"frame_index": vf["frame_index"],
}
benchmark_results[serial] = cam_bench
# Print summary table for this camera
click.echo(f"\nBenchmark Results for Camera {serial}:")
header = f"{'Config':<30} | {'RMSE Pre':<10} | {'RMSE Post':<10} | {'Improv':<10} | {'Iter':<5}"
click.echo(header)
click.echo("-" * len(header))
for name, stats in cam_bench.items():
click.echo(
f"{name:<30} | {stats['rmse_pre']:<10.4f} | {stats['rmse_post']:<10.4f} | "
f"{stats['improvement']:<10.4f} | {stats['nfev']:<5}"
)
return benchmark_results
@click.command()
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
@click.option(
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
)
@click.option(
"--max-reproj-error",
"-e",
default=2.0,
type=float,
help="Max reprojection error for pose.",
)
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
@click.option(
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
)
@click.option(
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
)
@click.option(
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
)
@click.option(
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
)
@click.option(
"--use-confidence-weights/--no-confidence-weights",
default=False,
help="Use confidence-weighted residuals in depth refinement.",
)
@click.option(
"--depth-mode",
default="NEURAL",
type=click.Choice(["NEURAL", "ULTRA", "PERFORMANCE", "NONE"]),
help="Depth computation mode.",
)
@click.option(
"--depth-confidence-threshold",
default=50,
type=int,
help="Confidence threshold for depth filtering (lower = more confident).",
)
@click.option(
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
)
@click.option(
"--auto-align/--no-auto-align",
default=False,
help="Automatically align ground plane.",
)
@click.option(
"--ground-face", type=str, help="Explicit face name for ground alignment."
)
@click.option(
"--ground-marker-id", type=int, help="Explicit marker ID to define ground face."
)
@click.option(
"--aruco-dictionary",
default="DICT_4X4_50",
type=click.Choice(list(ARUCO_DICT_MAP.keys())),
help="ArUco dictionary to use.",
)
@click.option(
"--min-markers",
default=1,
type=int,
help="Minimum markers required for pose estimation.",
)
@click.option(
"--debug/--no-debug",
default=False,
help="Enable verbose debug logging.",
)
@click.option(
"--max-samples",
default=None,
type=int,
help="Maximum number of samples to process before stopping.",
)
@click.option(
"--benchmark-matrix/--no-benchmark-matrix",
default=False,
help="Run benchmark matrix comparing different refinement configurations.",
)
def main(
svo: tuple[str, ...],
markers: str,
output: str,
sample_interval: int,
max_reproj_error: float,
preview: bool,
validate_markers: bool,
self_check: bool,
verify_depth: bool,
refine_depth: bool,
use_confidence_weights: bool,
depth_mode: str,
depth_confidence_threshold: int,
report_csv: str | None,
auto_align: bool,
ground_face: str | None,
ground_marker_id: int | None,
aruco_dictionary: str,
min_markers: int,
debug: bool,
max_samples: int | None,
benchmark_matrix: bool,
):
"""
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
"""
# Configure logging level
logger.remove()
logger.add(
lambda msg: click.echo(msg, nl=False),
level="DEBUG" if debug else "INFO",
format="{message}",
)
depth_mode_map = {
"NEURAL": sl.DEPTH_MODE.NEURAL,
"ULTRA": sl.DEPTH_MODE.ULTRA,
"PERFORMANCE": sl.DEPTH_MODE.PERFORMANCE,
"NONE": sl.DEPTH_MODE.NONE,
}
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
if not (verify_depth or refine_depth or benchmark_matrix):
sl_depth_mode = sl.DEPTH_MODE.NONE
# Expand SVO paths (files or directories)
expanded_svo = []
for path_str in svo:
path = Path(path_str)
if path.is_dir():
click.echo(f"Searching for SVO files in {path}...")
found = sorted(
[
str(p)
for p in path.iterdir()
if p.is_file() and p.suffix.lower() in (".svo", ".svo2")
]
)
if found:
click.echo(f"Found {len(found)} SVO files in {path}")
expanded_svo.extend(found)
else:
click.echo(f"Warning: No .svo/.svo2 files found in {path}", err=True)
elif path.is_file():
expanded_svo.append(str(path))
else:
click.echo(f"Warning: Path not found: {path}", err=True)
if not expanded_svo:
if validate_markers:
click.echo("Marker validation successful. No SVOs provided, exiting.")
return
else:
click.echo(
"Error: --svo is required unless --validate-markers is used.", err=True
)
raise click.UsageError("Missing option '--svo' / '-s'.")
# 1. Load Marker Geometry
try:
marker_geometry = load_marker_geometry(markers)
if validate_markers:
validate_marker_geometry(marker_geometry)
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
# Load face mapping if available
face_marker_map = load_face_mapping(markers)
if face_marker_map:
click.echo(f"Loaded face mapping for {len(face_marker_map)} faces.")
else:
click.echo("No face mapping found in parquet (missing 'name'/'ids').")
face_marker_map = None
except Exception as e:
click.echo(f"Error loading markers: {e}", err=True)
raise SystemExit(1)
# 2. Initialize SVO Reader
reader = SVOReader(expanded_svo, depth_mode=sl_depth_mode)
if not reader.cameras:
click.echo("No SVO files could be opened.", err=True)
return
# Align SVOs
reader.sync_to_latest_start()
# Calculate max frames to process to avoid infinite loop
max_frames = 10000 # Default safety limit
if reader.cameras:
remaining = []
for i, cam in enumerate(reader.cameras):
total = reader.camera_info[i]["total_frames"]
if total > 0:
current = cam.get_svo_position()
remaining.append(total - current)
else:
# If any total_frames is unknown (<= 0), use a hard limit
remaining = [10000]
break
if remaining:
max_frames = min(remaining)
else:
click.echo(
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
)
serials = [info["serial"] for info in reader.camera_info]
accumulators = {serial: PoseAccumulator() for serial in serials}
camera_matrices = {
serial: build_camera_matrix_from_zed(cam)
for serial, cam in zip(serials, reader.cameras)
}
# Store verification frames for post-process check
verification_frames = {}
# Store first valid frame for benchmarking
first_frames = {}
# Track all visible marker IDs for heuristic ground detection
all_visible_ids = set()
detector = create_detector(dictionary_id=ARUCO_DICT_MAP[aruco_dictionary])
frame_count = 0
sampled_count = 0
click.echo(f"Processing SVOs: {serials}")
try:
while frame_count < max_frames:
frames = reader.grab_synced()
if not any(frames):
break
if frame_count % sample_interval == 0:
preview_frames = {}
for i, frame in enumerate(frames):
if frame is None:
continue
serial = frame.serial_number
K = camera_matrices[serial]
# Detect markers
corners, ids = detect_markers(frame.image, detector)
if ids is not None:
all_visible_ids.update(ids.flatten().tolist())
logger.debug(
f"Cam {serial}: Detected {len(ids)} markers: {ids.flatten()}"
)
else:
logger.debug(f"Cam {serial}: No markers detected")
if ids is None:
if preview:
preview_frames[serial] = frame.image
continue
# Estimate pose (T_cam_from_world)
pose_res = estimate_pose_from_detections(
corners, ids, marker_geometry, K, min_markers=min_markers
)
if pose_res:
rvec, tvec, reproj_err, n_markers = pose_res
if reproj_err <= max_reproj_error:
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
# We want T_world_from_cam
T_world_cam = invert_transform(T_cam_world)
# Save best frame for verification based on scoring
if (
verify_depth or refine_depth or benchmark_matrix
) and frame.depth_map is not None:
current_score = score_frame(
n_markers,
reproj_err,
corners,
frame.depth_map,
depth_confidence_threshold,
frame.confidence_map,
)
if serial not in first_frames:
first_frames[serial] = {
"frame": frame,
"ids": ids,
"corners": corners,
"score": current_score,
"frame_index": frame_count,
}
best_so_far = verification_frames.get(serial)
if (
best_so_far is None
or current_score > best_so_far["score"]
):
verification_frames[serial] = {
"frame": frame,
"ids": ids,
"corners": corners,
"score": current_score,
"frame_index": frame_count,
}
logger.debug(
f"Cam {serial}: New best frame {frame_count} with score {current_score:.2f}"
)
accumulators[serial].add_pose(
T_world_cam, reproj_err, frame_count
)
logger.debug(
f"Cam {serial}: Pose accepted. Reproj={reproj_err:.3f}, Markers={n_markers}"
)
else:
logger.debug(
f"Cam {serial}: Pose rejected. Reproj {reproj_err:.3f} > {max_reproj_error}"
)
if preview:
img = draw_detected_markers(
frame.image.copy(), corners, ids
)
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
preview_frames[serial] = img
else:
if ids is not None:
logger.debug(
f"Cam {serial}: Pose estimation failed (insufficient markers < {min_markers} or solver failure)"
)
elif preview:
preview_frames[serial] = frame.image
if preview and preview_frames:
key = show_preview(preview_frames)
if key == 27 or key == ord("q"):
break
sampled_count += 1
if max_samples is not None and sampled_count >= max_samples:
click.echo(f"\nReached max samples ({max_samples}). Stopping.")
break
frame_count += 1
if frame_count % 100 == 0:
counts = [len(acc.poses) for acc in accumulators.values()]
click.echo(
f"Frame {frame_count}, Accepted Poses: {dict(zip(serials, counts))}"
)
except KeyboardInterrupt:
click.echo("\nInterrupted by user.")
finally:
reader.close()
cv2.destroyAllWindows()
# 3. Compute Final Poses
results = {}
for serial, acc in accumulators.items():
if not acc.poses:
click.echo(f"Warning: No valid poses for camera {serial}")
continue
# Use RANSAC to find best consensus
inliers = acc.ransac_filter()
T_mean, stats = acc.compute_robust_mean(inliers)
# Flatten for JSON as space-separated string
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
results[str(serial)] = {"pose": pose_str, "stats": stats}
click.echo(
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
)
if not results:
click.echo("No extrinsics computed.", err=True)
return
# 4. Run Depth Verification if requested
apply_depth_verify_refine_postprocess(
results,
verification_frames,
marker_geometry,
camera_matrices,
verify_depth,
refine_depth,
use_confidence_weights,
depth_confidence_threshold,
report_csv,
)
# 5. Run Benchmark Matrix if requested
if benchmark_matrix:
benchmark_results = run_benchmark_matrix(
results,
verification_frames,
first_frames,
marker_geometry,
camera_matrices,
depth_confidence_threshold,
)
# Add to results for saving
for serial, bench in benchmark_results.items():
if serial in results:
results[serial]["benchmark"] = bench
# 6. Optional Ground Plane Alignment
if auto_align:
click.echo("\nPerforming ground plane alignment...")
target_face = ground_face
# Use loaded map or skip if None
if face_marker_map is None:
click.echo(
"Warning: No face mapping available (missing 'name'/'ids' in parquet). Skipping alignment.",
err=True,
)
# Skip alignment logic by ensuring loop below doesn't run and heuristic fails gracefully
mapping_to_use = {}
else:
mapping_to_use = face_marker_map
if not target_face and ground_marker_id is not None:
# Map marker ID to face
for face, ids in mapping_to_use.items():
if ground_marker_id in ids:
target_face = face
logger.info(
f"Mapped ground-marker-id {ground_marker_id} to face '{face}' (markers={ids})"
)
break
ground_normal = None
if target_face:
ground_normal = get_face_normal_from_geometry(
target_face, marker_geometry, face_marker_map=face_marker_map
)
if ground_normal is not None:
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Using explicit ground face '{target_face}' (markers={ids})"
)
else:
# Heuristic detection
heuristic_res = detect_ground_face(
all_visible_ids, marker_geometry, face_marker_map=face_marker_map
)
if heuristic_res:
target_face, ground_normal = heuristic_res
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Heuristically detected ground face '{target_face}' (markers={ids})"
)
if ground_normal is not None:
R_align: Mat33 = rotation_align_vectors(ground_normal, np.array([0, 1, 0]))
logger.info(f"Computed alignment rotation for face '{target_face}'")
for serial, data in results.items():
T_mean: Mat44 = np.fromstring(data["pose"], sep=" ").reshape(4, 4)
T_aligned = apply_alignment_to_pose(T_mean, R_align)
data["pose"] = " ".join(f"{x:.6f}" for x in T_aligned.flatten())
logger.debug(f"Applied alignment to camera {serial}")
else:
click.echo(
"Warning: Could not determine ground normal. Skipping alignment."
)
# 6. Save to JSON
with open(output, "w") as f:
json.dump(results, f, indent=4, sort_keys=True)
click.echo(f"Saved extrinsics to {output}")
# 7. Optional Self-Check
if self_check:
# Verify reprojection error
for serial, data in results.items():
if data["stats"]["median_reproj_error"] > max_reproj_error:
click.echo(
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
err=True,
)
raise SystemExit(1)
# Simple check: verify distance between cameras if multiple
if len(results) >= 2:
serials_list = sorted(results.keys())
for i in range(len(serials_list)):
for j in range(i + 1, len(serials_list)):
s1 = serials_list[i]
s2 = serials_list[j]
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
dist = np.linalg.norm(p1 - p2)
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter