Files
zed-playground/py_workspace/calibrate_extrinsics.py
T

1072 lines
40 KiB
Python

import click
import cv2
import json
import csv
import numpy as np
import pyzed.sl as sl
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from aruco.marker_geometry import (
load_marker_geometry,
validate_marker_geometry,
load_face_mapping,
)
from aruco.svo_sync import SVOReader
from aruco.detector import (
create_detector,
detect_markers,
build_camera_matrix_from_zed,
estimate_pose_from_detections,
)
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
from aruco.pose_averaging import PoseAccumulator
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
from aruco.depth_verify import verify_extrinsics_with_depth
from aruco.depth_refine import refine_extrinsics_with_depth
from aruco.depth_pool import pool_depth_maps
from aruco.alignment import (
get_face_normal_from_geometry,
detect_ground_face,
rotation_align_vectors,
apply_alignment_to_pose,
Vec3,
Mat44,
)
from loguru import logger
from jaxtyping import Float
from typing import TYPE_CHECKING
# Type aliases
if TYPE_CHECKING:
Mat33 = Float[np.ndarray, "3 3"]
CornersNC = Float[np.ndarray, "N 3"]
else:
Mat33 = np.ndarray
CornersNC = np.ndarray
ARUCO_DICT_MAP = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11,
}
def score_frame(
n_markers: int,
reproj_err: float,
corners: np.ndarray,
depth_map: Optional[np.ndarray],
depth_confidence_threshold: int = 50,
confidence_map: Optional[np.ndarray] = None,
) -> float:
"""
Compute a quality score for a frame to select the best one for depth verification.
Higher is better.
"""
# Base score: more markers is better, lower reprojection error is better.
# We weight markers heavily as they provide more constraints.
score = n_markers * 100.0 - reproj_err
if depth_map is not None:
# Calculate depth validity ratio at marker corners.
# This ensures we pick a frame where depth is actually available where we need it.
valid_count = 0
total_count = 0
h, w = depth_map.shape[:2]
# corners shape is (N, 4, 2)
flat_corners = corners.reshape(-1, 2)
for pt in flat_corners:
x, y = int(round(pt[0])), int(round(pt[1]))
if 0 <= x < w and 0 <= y < h:
total_count += 1
d = depth_map[y, x]
if np.isfinite(d) and d > 0:
if confidence_map is not None:
# ZED confidence: lower is more confident
if confidence_map[y, x] <= depth_confidence_threshold:
valid_count += 1
else:
valid_count += 1
if total_count > 0:
depth_ratio = valid_count / total_count
score += depth_ratio * 50.0
return score
def apply_depth_verify_refine_postprocess(
results: Dict[str, Any],
verification_frames: Dict[int, List[Dict[str, Any]]],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[int, Any],
verify_depth: bool,
refine_depth: bool,
use_confidence_weights: bool,
depth_confidence_threshold: int,
depth_pool_size: int = 1,
report_csv_path: Optional[str] = None,
) -> Tuple[Dict[str, Any], List[List[Any]]]:
"""
Apply depth verification and refinement to computed extrinsics.
Returns updated results and list of CSV rows.
"""
csv_rows: List[List[Any]] = []
if not (verify_depth or refine_depth):
return results, csv_rows
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
for serial, vfs in verification_frames.items():
if str(serial) not in results:
continue
# Extract depth maps and confidence maps from the top-N frames
# vfs is already sorted by score descending and truncated to depth_pool_size
depth_maps = []
confidence_maps = []
# We need at least one frame with depth
valid_frames = []
for vf in vfs:
frame = vf["frame"]
if frame.depth_map is not None:
depth_maps.append(frame.depth_map)
confidence_maps.append(frame.confidence_map)
valid_frames.append(vf)
if not valid_frames:
click.echo(
f"Camera {serial}: No frames with depth map available for verification."
)
continue
# Use the best frame (first in the list) for marker IDs and corners
# This ensures we use the highest quality detection for geometry
best_vf = valid_frames[0]
ids = best_vf["ids"]
# Determine if we should pool or use single frame
use_pooling = depth_pool_size > 1 and len(depth_maps) > 1
if use_pooling:
try:
pooled_depth, pooled_conf = pool_depth_maps(
depth_maps,
confidence_maps,
confidence_thresh=depth_confidence_threshold,
)
# Check if pooling resulted in a valid map (enough valid pixels)
# We'll do a quick check against the best single frame
# If pooled map has significantly fewer valid pixels, fallback
best_depth = depth_maps[0]
best_conf = confidence_maps[0]
# Simple validity check (finite and > 0)
# We don't need to be perfect here, just catch catastrophic pooling failure
n_valid_pooled = np.count_nonzero(
np.isfinite(pooled_depth) & (pooled_depth > 0)
)
# For best frame, we also respect confidence threshold if provided
mask_best = np.isfinite(best_depth) & (best_depth > 0)
if best_conf is not None:
mask_best &= best_conf <= depth_confidence_threshold
n_valid_best = np.count_nonzero(mask_best)
# If pooled result is much worse (e.g. < 50% of valid points of single frame), fallback
# This can happen if frames are misaligned or pooling logic fails
if n_valid_pooled < (n_valid_best * 0.5):
click.echo(
f"Camera {serial}: Pooled depth has too few valid points ({n_valid_pooled} vs {n_valid_best}). "
"Falling back to best single frame."
)
final_depth = best_depth
final_conf = best_conf
pool_metadata = {
"pool_size_requested": depth_pool_size,
"pool_size_actual": len(depth_maps),
"pooled": False,
"fallback_reason": "insufficient_valid_points",
}
else:
final_depth = pooled_depth
final_conf = pooled_conf
pool_metadata = {
"pool_size_requested": depth_pool_size,
"pool_size_actual": len(depth_maps),
"pooled": True,
}
click.echo(
f"Camera {serial}: Using pooled depth from {len(depth_maps)} frames."
)
except Exception as e:
click.echo(
f"Camera {serial}: Pooling failed with error: {e}. Falling back to single frame.",
err=True,
)
final_depth = depth_maps[0]
final_conf = confidence_maps[0]
pool_metadata = {
"pool_size_requested": depth_pool_size,
"pool_size_actual": len(depth_maps),
"pooled": False,
"fallback_reason": f"exception: {str(e)}",
}
else:
# Single frame case (N=1 or only 1 available)
final_depth = depth_maps[0]
final_conf = confidence_maps[0]
# Only add metadata if pooling was requested but not possible due to lack of frames
if depth_pool_size > 1:
pool_metadata = {
"pool_size_requested": depth_pool_size,
"pool_size_actual": len(depth_maps),
"pooled": False,
"fallback_reason": "insufficient_frames",
}
else:
pool_metadata = None
# Use the FINAL COMPUTED POSE for verification
pose_str = results[str(serial)]["pose"]
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_matrix = camera_matrices[serial]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if marker_corners_world and final_depth is not None:
verify_res = verify_extrinsics_with_depth(
T_mean,
marker_corners_world,
final_depth,
cam_matrix,
confidence_map=final_conf,
confidence_thresh=depth_confidence_threshold,
)
results[str(serial)]["depth_verify"] = {
"rmse": verify_res.rmse,
"mean_abs": verify_res.mean_abs,
"median": verify_res.median,
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
"n_valid": verify_res.n_valid,
"n_total": verify_res.n_total,
}
if pool_metadata:
results[str(serial)]["depth_pool"] = pool_metadata
click.echo(
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
)
if refine_depth:
if verify_res.n_valid < 4:
click.echo(
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
)
else:
click.echo(f"Camera {serial}: Refining extrinsics with depth...")
T_refined, refine_stats = refine_extrinsics_with_depth(
T_mean,
marker_corners_world,
final_depth,
cam_matrix,
confidence_map=(final_conf if use_confidence_weights else None),
confidence_thresh=depth_confidence_threshold,
)
verify_res_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
final_depth,
cam_matrix,
confidence_map=final_conf,
confidence_thresh=depth_confidence_threshold,
)
pose_str_refined = " ".join(f"{x:.6f}" for x in T_refined.flatten())
results[str(serial)]["pose"] = pose_str_refined
results[str(serial)]["refine_depth"] = refine_stats
results[str(serial)]["depth_verify_post"] = {
"rmse": verify_res_post.rmse,
"mean_abs": verify_res_post.mean_abs,
"median": verify_res_post.median,
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
"n_valid": verify_res_post.n_valid,
"n_total": verify_res_post.n_total,
}
if pool_metadata:
results[str(serial)]["depth_pool"] = pool_metadata
improvement = verify_res.rmse - verify_res_post.rmse
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
improvement
)
click.echo(
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
f"(Improved by {improvement:.3f}m). "
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
)
# Warning gates
if improvement < 1e-4 and refine_stats["nfev"] > 5:
click.echo(
f" WARNING: Optimization ran for {refine_stats['nfev']} steps but improvement was negligible ({improvement:.6f}m).",
err=True,
)
if not refine_stats["success"] or refine_stats["nfev"] <= 1:
click.echo(
f" WARNING: Optimization might have failed or stalled. Success: {refine_stats['success']}, Steps: {refine_stats['nfev']}. Message: {refine_stats['termination_message']}",
err=True,
)
verify_res = verify_res_post
if report_csv_path:
for mid, cidx, resid in verify_res.residuals:
csv_rows.append([serial, mid, cidx, resid])
if report_csv_path and csv_rows:
with open(report_csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
writer.writerows(csv_rows)
click.echo(f"Saved depth verification report to {report_csv_path}")
return results, csv_rows
def run_benchmark_matrix(
results: Dict[str, Any],
verification_frames: Dict[int, List[Dict[str, Any]]],
first_frames: Dict[int, Dict[str, Any]],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[int, Any],
depth_confidence_threshold: int,
) -> Dict[str, Any]:
"""
Run benchmark matrix comparing 4 configurations:
1) baseline (linear loss, no confidence weights)
2) robust (soft_l1, f_scale=0.1, no confidence)
3) robust+confidence
4) robust+confidence+best-frame
"""
benchmark_results = {}
configs = [
{
"name": "baseline",
"loss": "linear",
"use_confidence": False,
"use_best_frame": False,
},
{
"name": "robust",
"loss": "soft_l1",
"use_confidence": False,
"use_best_frame": False,
},
{
"name": "robust+confidence",
"loss": "soft_l1",
"use_confidence": True,
"use_best_frame": False,
},
{
"name": "robust+confidence+best-frame",
"loss": "soft_l1",
"use_confidence": True,
"use_best_frame": True,
},
]
click.echo("\nRunning Benchmark Matrix...")
for serial in results.keys():
serial_int = int(serial)
if serial_int not in first_frames or serial_int not in verification_frames:
continue
cam_matrix = camera_matrices[serial_int]
pose_str = results[serial]["pose"]
T_initial = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_bench = {}
for config in configs:
name = config["name"]
use_best = config["use_best_frame"]
if use_best:
vf = verification_frames[serial_int][0]
else:
vf = first_frames[serial_int]
frame = vf["frame"]
ids = vf["ids"]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if not marker_corners_world or frame.depth_map is None:
continue
# Pre-refinement verification
verify_pre = verify_extrinsics_with_depth(
T_initial,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
# Refinement
T_refined, refine_stats = refine_extrinsics_with_depth(
T_initial,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=(
frame.confidence_map if config["use_confidence"] else None
),
confidence_thresh=depth_confidence_threshold,
loss=str(config["loss"]),
f_scale=0.1,
)
# Post-refinement verification
verify_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
cam_bench[name] = {
"rmse_pre": verify_pre.rmse,
"rmse_post": verify_post.rmse,
"improvement": verify_pre.rmse - verify_post.rmse,
"delta_rot_deg": refine_stats["delta_rotation_deg"],
"delta_trans_m": refine_stats["delta_translation_norm_m"],
"nfev": refine_stats["nfev"],
"success": refine_stats["success"],
"frame_index": vf["frame_index"],
}
benchmark_results[serial] = cam_bench
# Print summary table for this camera
click.echo(f"\nBenchmark Results for Camera {serial}:")
header = f"{'Config':<30} | {'RMSE Pre':<10} | {'RMSE Post':<10} | {'Improv':<10} | {'Iter':<5}"
click.echo(header)
click.echo("-" * len(header))
for name, stats in cam_bench.items():
click.echo(
f"{name:<30} | {stats['rmse_pre']:<10.4f} | {stats['rmse_post']:<10.4f} | "
f"{stats['improvement']:<10.4f} | {stats['nfev']:<5}"
)
return benchmark_results
@click.command()
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
@click.option(
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
)
@click.option(
"--max-reproj-error",
"-e",
default=2.0,
type=float,
help="Max reprojection error for pose.",
)
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
@click.option(
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
)
@click.option(
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
)
@click.option(
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
)
@click.option(
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
)
@click.option(
"--use-confidence-weights/--no-confidence-weights",
default=False,
help="Use confidence-weighted residuals in depth refinement.",
)
@click.option(
"--depth-mode",
default=None,
type=click.Choice(["NEURAL", "NEURAL_PLUS", "NEURAL_LIGHT", "NONE"]),
help="Depth computation mode. Defaults to NEURAL_PLUS if depth verification/refinement is enabled, otherwise NONE.",
)
@click.option(
"--depth-confidence-threshold",
default=50,
type=int,
help="Confidence threshold for depth filtering (lower = more confident).",
)
@click.option(
"--depth-pool-size",
default=1,
type=click.IntRange(min=1, max=10),
help="Number of best frames to pool for depth verification/refinement (1=single best frame).",
)
@click.option(
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
)
@click.option(
"--auto-align/--no-auto-align",
default=False,
help="Automatically align ground plane.",
)
@click.option(
"--ground-face", type=str, help="Explicit face name for ground alignment."
)
@click.option(
"--ground-marker-id", type=int, help="Explicit marker ID to define ground face."
)
@click.option(
"--aruco-dictionary",
default="DICT_4X4_50",
type=click.Choice(list(ARUCO_DICT_MAP.keys())),
help="ArUco dictionary to use.",
)
@click.option(
"--min-markers",
default=1,
type=int,
help="Minimum markers required for pose estimation.",
)
@click.option(
"--debug/--no-debug",
default=False,
help="Enable verbose debug logging.",
)
@click.option(
"--max-samples",
default=None,
type=int,
help="Maximum number of samples to process before stopping.",
)
@click.option(
"--benchmark-matrix/--no-benchmark-matrix",
default=False,
help="Run benchmark matrix comparing different refinement configurations.",
)
def main(
svo: tuple[str, ...],
markers: str,
output: str,
sample_interval: int,
max_reproj_error: float,
preview: bool,
validate_markers: bool,
self_check: bool,
verify_depth: bool,
refine_depth: bool,
use_confidence_weights: bool,
depth_mode: str | None,
depth_confidence_threshold: int,
depth_pool_size: int,
report_csv: str | None,
auto_align: bool,
ground_face: str | None,
ground_marker_id: int | None,
aruco_dictionary: str,
min_markers: int,
debug: bool,
max_samples: int | None,
benchmark_matrix: bool,
):
"""
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
"""
# Configure logging level
logger.remove()
logger.add(
lambda msg: click.echo(msg, nl=False),
level="DEBUG" if debug else "INFO",
format="{message}",
)
depth_mode_map = {
"NEURAL": sl.DEPTH_MODE.NEURAL,
"NEURAL_PLUS": sl.DEPTH_MODE.NEURAL_PLUS,
"NEURAL_LIGHT": sl.DEPTH_MODE.NEURAL_LIGHT,
"NONE": sl.DEPTH_MODE.NONE,
}
if depth_mode is None:
if verify_depth or refine_depth or benchmark_matrix:
sl_depth_mode = sl.DEPTH_MODE.NEURAL_PLUS
else:
sl_depth_mode = sl.DEPTH_MODE.NONE
else:
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
# Expand SVO paths (files or directories)
expanded_svo = []
for path_str in svo:
path = Path(path_str)
if path.is_dir():
click.echo(f"Searching for SVO files in {path}...")
found = sorted(
[
str(p)
for p in path.iterdir()
if p.is_file() and p.suffix.lower() in (".svo", ".svo2")
]
)
if found:
click.echo(f"Found {len(found)} SVO files in {path}")
expanded_svo.extend(found)
else:
click.echo(f"Warning: No .svo/.svo2 files found in {path}", err=True)
elif path.is_file():
expanded_svo.append(str(path))
else:
click.echo(f"Warning: Path not found: {path}", err=True)
if not expanded_svo:
if validate_markers:
click.echo("Marker validation successful. No SVOs provided, exiting.")
return
else:
click.echo(
"Error: --svo is required unless --validate-markers is used.", err=True
)
raise click.UsageError("Missing option '--svo' / '-s'.")
# 1. Load Marker Geometry
try:
marker_geometry = load_marker_geometry(markers)
if validate_markers:
validate_marker_geometry(marker_geometry)
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
# Load face mapping if available
face_marker_map = load_face_mapping(markers)
if face_marker_map:
click.echo(f"Loaded face mapping for {len(face_marker_map)} faces.")
else:
click.echo("No face mapping found in parquet (missing 'name'/'ids').")
face_marker_map = None
except Exception as e:
click.echo(f"Error loading markers: {e}", err=True)
raise SystemExit(1)
# 2. Initialize SVO Reader
reader = SVOReader(expanded_svo, depth_mode=sl_depth_mode)
if not reader.cameras:
click.echo("No SVO files could be opened.", err=True)
return
# Align SVOs
reader.sync_to_latest_start()
# Calculate max frames to process to avoid infinite loop
max_frames = 10000 # Default safety limit
if reader.cameras:
remaining = []
for i, cam in enumerate(reader.cameras):
total = reader.camera_info[i]["total_frames"]
if total > 0:
current = cam.get_svo_position()
remaining.append(total - current)
else:
# If any total_frames is unknown (<= 0), use a hard limit
remaining = [10000]
break
if remaining:
max_frames = min(remaining)
else:
click.echo(
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
)
serials = [info["serial"] for info in reader.camera_info]
accumulators = {serial: PoseAccumulator() for serial in serials}
camera_matrices = {
serial: build_camera_matrix_from_zed(cam)
for serial, cam in zip(serials, reader.cameras)
}
# Store verification frames for post-process check
verification_frames: Dict[int, List[Dict[str, Any]]] = {}
# Store first valid frame for benchmarking
first_frames: Dict[int, Dict[str, Any]] = {}
# Track all visible marker IDs for heuristic ground detection
all_visible_ids = set()
detector = create_detector(dictionary_id=ARUCO_DICT_MAP[aruco_dictionary])
frame_count = 0
sampled_count = 0
click.echo(f"Processing SVOs: {serials}")
try:
while frame_count < max_frames:
frames = reader.grab_synced()
if not any(frames):
break
if frame_count % sample_interval == 0:
preview_frames = {}
for i, frame in enumerate(frames):
if frame is None:
continue
serial = frame.serial_number
K = camera_matrices[serial]
# Detect markers
corners, ids = detect_markers(frame.image, detector)
if ids is not None:
all_visible_ids.update(ids.flatten().tolist())
logger.debug(
f"Cam {serial}: Detected {len(ids)} markers: {ids.flatten()}"
)
else:
logger.debug(f"Cam {serial}: No markers detected")
if ids is None:
if preview:
preview_frames[serial] = frame.image
continue
# Estimate pose (T_cam_from_world)
pose_res = estimate_pose_from_detections(
corners, ids, marker_geometry, K, min_markers=min_markers
)
if pose_res:
rvec, tvec, reproj_err, n_markers = pose_res
if reproj_err <= max_reproj_error:
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
# We want T_world_from_cam
T_world_cam = invert_transform(T_cam_world)
# Save best frame for verification based on scoring
if (
verify_depth or refine_depth or benchmark_matrix
) and frame.depth_map is not None:
current_score = score_frame(
n_markers,
reproj_err,
corners,
frame.depth_map,
depth_confidence_threshold,
frame.confidence_map,
)
if serial not in first_frames:
first_frames[serial] = {
"frame": frame,
"ids": ids,
"corners": corners,
"score": current_score,
"frame_index": frame_count,
}
if serial not in verification_frames:
verification_frames[serial] = []
verification_frames[serial].append(
{
"frame": frame,
"ids": ids,
"corners": corners,
"score": current_score,
"frame_index": frame_count,
}
)
# Sort by score descending and truncate to pool size
verification_frames[serial].sort(
key=lambda x: x["score"], reverse=True
)
verification_frames[serial] = verification_frames[
serial
][:depth_pool_size]
logger.debug(
f"Cam {serial}: Updated verification pool (size {len(verification_frames[serial])}), top score {verification_frames[serial][0]['score']:.2f}"
)
accumulators[serial].add_pose(
T_world_cam, reproj_err, frame_count
)
logger.debug(
f"Cam {serial}: Pose accepted. Reproj={reproj_err:.3f}, Markers={n_markers}"
)
else:
logger.debug(
f"Cam {serial}: Pose rejected. Reproj {reproj_err:.3f} > {max_reproj_error}"
)
if preview:
img = draw_detected_markers(
frame.image.copy(), corners, ids
)
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
preview_frames[serial] = img
else:
if ids is not None:
logger.debug(
f"Cam {serial}: Pose estimation failed (insufficient markers < {min_markers} or solver failure)"
)
elif preview:
preview_frames[serial] = frame.image
if preview and preview_frames:
key = show_preview(preview_frames)
if key == 27 or key == ord("q"):
break
sampled_count += 1
if max_samples is not None and sampled_count >= max_samples:
click.echo(f"\nReached max samples ({max_samples}). Stopping.")
break
frame_count += 1
if frame_count % 100 == 0:
counts = [len(acc.poses) for acc in accumulators.values()]
click.echo(
f"Frame {frame_count}, Accepted Poses: {dict(zip(serials, counts))}"
)
except KeyboardInterrupt:
click.echo("\nInterrupted by user.")
finally:
reader.close()
cv2.destroyAllWindows()
# 3. Compute Final Poses
results = {}
for serial, acc in accumulators.items():
if not acc.poses:
click.echo(f"Warning: No valid poses for camera {serial}")
continue
# Use RANSAC to find best consensus
inliers = acc.ransac_filter()
T_mean, stats = acc.compute_robust_mean(inliers)
# Flatten for JSON as space-separated string
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
results[str(serial)] = {"pose": pose_str, "stats": stats}
click.echo(
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
)
if not results:
click.echo("No extrinsics computed.", err=True)
return
# 4. Run Depth Verification if requested
apply_depth_verify_refine_postprocess(
results,
verification_frames,
marker_geometry,
camera_matrices,
verify_depth,
refine_depth,
use_confidence_weights,
depth_confidence_threshold,
depth_pool_size,
report_csv,
)
# 5. Run Benchmark Matrix if requested
if benchmark_matrix:
benchmark_results = run_benchmark_matrix(
results,
verification_frames,
first_frames,
marker_geometry,
camera_matrices,
depth_confidence_threshold,
)
# Add to results for saving
for serial, bench in benchmark_results.items():
if serial in results:
results[serial]["benchmark"] = bench
# 6. Optional Ground Plane Alignment
if auto_align:
click.echo("\nPerforming ground plane alignment...")
target_face = ground_face
# Use loaded map or skip if None
if face_marker_map is None:
click.echo(
"Warning: No face mapping available (missing 'name'/'ids' in parquet). Skipping alignment.",
err=True,
)
# Skip alignment logic by ensuring loop below doesn't run and heuristic fails gracefully
mapping_to_use = {}
else:
mapping_to_use = face_marker_map
if not target_face and ground_marker_id is not None:
# Map marker ID to face
for face, ids in mapping_to_use.items():
if ground_marker_id in ids:
target_face = face
logger.info(
f"Mapped ground-marker-id {ground_marker_id} to face '{face}' (markers={ids})"
)
break
ground_normal = None
if target_face:
ground_normal = get_face_normal_from_geometry(
target_face, marker_geometry, face_marker_map=face_marker_map
)
if ground_normal is not None:
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Using explicit ground face '{target_face}' (markers={ids})"
)
else:
# Heuristic detection
heuristic_res = detect_ground_face(
all_visible_ids, marker_geometry, face_marker_map=face_marker_map
)
if heuristic_res:
target_face, ground_normal = heuristic_res
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Heuristically detected ground face '{target_face}' (markers={ids})"
)
if ground_normal is not None:
R_align: Mat33 = rotation_align_vectors(ground_normal, np.array([0, 1, 0]))
logger.info(f"Computed alignment rotation for face '{target_face}'")
for serial, data in results.items():
T_mean: Mat44 = np.fromstring(data["pose"], sep=" ").reshape(4, 4)
T_aligned = apply_alignment_to_pose(T_mean, R_align)
data["pose"] = " ".join(f"{x:.6f}" for x in T_aligned.flatten())
logger.debug(f"Applied alignment to camera {serial}")
else:
click.echo(
"Warning: Could not determine ground normal. Skipping alignment."
)
# 6. Save to JSON
with open(output, "w") as f:
json.dump(results, f, indent=4, sort_keys=True)
click.echo(f"Saved extrinsics to {output}")
# 7. Optional Self-Check
if self_check:
# Verify reprojection error
for serial, data in results.items():
if data["stats"]["median_reproj_error"] > max_reproj_error:
click.echo(
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
err=True,
)
raise SystemExit(1)
# Verify depth-quality outliers if depth verification ran
depth_rmse_by_cam = {}
for serial, data in results.items():
depth_metrics = data.get("depth_verify_post") or data.get("depth_verify")
if depth_metrics and "rmse" in depth_metrics:
depth_rmse_by_cam[serial] = float(depth_metrics["rmse"])
if len(depth_rmse_by_cam) >= 2:
rmse_values = sorted(depth_rmse_by_cam.values())
median_rmse = float(np.median(np.array(rmse_values)))
outlier_factor = 2.5
min_outlier_rmse_m = 0.08
failed_depth_cams = []
for serial, rmse in depth_rmse_by_cam.items():
if rmse > max(min_outlier_rmse_m, outlier_factor * median_rmse):
failed_depth_cams.append((serial, rmse))
if failed_depth_cams:
failed_str = ", ".join(
f"{serial}:{rmse:.3f}m"
for serial, rmse in sorted(failed_depth_cams)
)
click.echo(
"Error: Calibration failed depth outlier self-check "
f"(median RMSE={median_rmse:.3f}m, outliers={failed_str}).",
err=True,
)
raise SystemExit(1)
# Simple check: verify distance between cameras if multiple
if len(results) >= 2:
serials_list = sorted(results.keys())
for i in range(len(serials_list)):
for j in range(i + 1, len(serials_list)):
s1 = serials_list[i]
s2 = serials_list[j]
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
dist = np.linalg.norm(p1 - p2)
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter