1124 lines
42 KiB
Python
1124 lines
42 KiB
Python
import click
|
|
import cv2
|
|
import json
|
|
import csv
|
|
import numpy as np
|
|
import pyzed.sl as sl
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
from aruco.marker_geometry import (
|
|
load_marker_geometry,
|
|
validate_marker_geometry,
|
|
load_face_mapping,
|
|
)
|
|
from aruco.svo_sync import SVOReader
|
|
from aruco.detector import (
|
|
create_detector,
|
|
detect_markers,
|
|
build_camera_matrix_from_zed,
|
|
estimate_pose_from_detections,
|
|
)
|
|
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
|
|
from aruco.pose_averaging import PoseAccumulator
|
|
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
|
|
from aruco.depth_verify import verify_extrinsics_with_depth
|
|
from aruco.depth_refine import refine_extrinsics_with_depth
|
|
from aruco.depth_pool import pool_depth_maps
|
|
from aruco.alignment import (
|
|
get_face_normal_from_geometry,
|
|
detect_ground_face,
|
|
rotation_align_vectors,
|
|
apply_alignment_to_pose,
|
|
Vec3,
|
|
Mat44,
|
|
)
|
|
from loguru import logger
|
|
from jaxtyping import Float
|
|
from typing import TYPE_CHECKING
|
|
|
|
# Type aliases
|
|
if TYPE_CHECKING:
|
|
Mat33 = Float[np.ndarray, "3 3"]
|
|
CornersNC = Float[np.ndarray, "N 3"]
|
|
else:
|
|
Mat33 = np.ndarray
|
|
CornersNC = np.ndarray
|
|
|
|
|
|
ARUCO_DICT_MAP = {
|
|
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
|
|
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
|
|
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
|
|
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
|
|
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
|
|
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
|
|
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
|
|
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
|
|
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
|
|
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
|
|
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
|
|
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
|
|
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
|
|
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
|
|
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
|
|
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
|
|
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
|
|
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
|
|
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
|
|
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
|
|
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11,
|
|
}
|
|
|
|
|
|
def score_frame(
|
|
n_markers: int,
|
|
reproj_err: float,
|
|
corners: np.ndarray,
|
|
depth_map: Optional[np.ndarray],
|
|
depth_confidence_threshold: int = 50,
|
|
confidence_map: Optional[np.ndarray] = None,
|
|
) -> float:
|
|
"""
|
|
Compute a quality score for a frame to select the best one for depth verification.
|
|
Higher is better.
|
|
"""
|
|
# Base score: more markers is better, lower reprojection error is better.
|
|
# We weight markers heavily as they provide more constraints.
|
|
score = n_markers * 100.0 - reproj_err
|
|
|
|
if depth_map is not None:
|
|
# Calculate depth validity ratio at marker corners.
|
|
# This ensures we pick a frame where depth is actually available where we need it.
|
|
valid_count = 0
|
|
total_count = 0
|
|
h, w = depth_map.shape[:2]
|
|
|
|
# corners shape is (N, 4, 2)
|
|
flat_corners = corners.reshape(-1, 2)
|
|
for pt in flat_corners:
|
|
x, y = int(round(pt[0])), int(round(pt[1]))
|
|
if 0 <= x < w and 0 <= y < h:
|
|
total_count += 1
|
|
d = depth_map[y, x]
|
|
if np.isfinite(d) and d > 0:
|
|
if confidence_map is not None:
|
|
# ZED confidence: lower is more confident
|
|
if confidence_map[y, x] <= depth_confidence_threshold:
|
|
valid_count += 1
|
|
else:
|
|
valid_count += 1
|
|
|
|
if total_count > 0:
|
|
depth_ratio = valid_count / total_count
|
|
score += depth_ratio * 50.0
|
|
|
|
return score
|
|
|
|
|
|
def apply_depth_verify_refine_postprocess(
|
|
results: Dict[str, Any],
|
|
verification_frames: Dict[int, List[Dict[str, Any]]],
|
|
marker_geometry: Dict[int, Any],
|
|
camera_matrices: Dict[int, Any],
|
|
verify_depth: bool,
|
|
refine_depth: bool,
|
|
use_confidence_weights: bool,
|
|
depth_confidence_threshold: int,
|
|
depth_pool_size: int = 1,
|
|
report_csv_path: Optional[str] = None,
|
|
) -> Tuple[Dict[str, Any], List[List[Any]]]:
|
|
"""
|
|
Apply depth verification and refinement to computed extrinsics.
|
|
Returns updated results and list of CSV rows.
|
|
"""
|
|
csv_rows: List[List[Any]] = []
|
|
|
|
if not (verify_depth or refine_depth):
|
|
return results, csv_rows
|
|
|
|
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
|
|
|
|
for serial, vfs in verification_frames.items():
|
|
if str(serial) not in results:
|
|
continue
|
|
|
|
# Extract depth maps and confidence maps from the top-N frames
|
|
# vfs is already sorted by score descending and truncated to depth_pool_size
|
|
depth_maps = []
|
|
confidence_maps = []
|
|
|
|
# We need at least one frame with depth
|
|
valid_frames = []
|
|
for vf in vfs:
|
|
frame = vf["frame"]
|
|
if frame.depth_map is not None:
|
|
depth_maps.append(frame.depth_map)
|
|
confidence_maps.append(frame.confidence_map)
|
|
valid_frames.append(vf)
|
|
|
|
if not valid_frames:
|
|
click.echo(
|
|
f"Camera {serial}: No frames with depth map available for verification."
|
|
)
|
|
continue
|
|
|
|
# Use the best frame (first in the list) for marker IDs and corners
|
|
# This ensures we use the highest quality detection for geometry
|
|
best_vf = valid_frames[0]
|
|
ids = best_vf["ids"]
|
|
|
|
# Determine if we should pool or use single frame
|
|
use_pooling = depth_pool_size > 1 and len(depth_maps) > 1
|
|
|
|
if use_pooling:
|
|
try:
|
|
pooled_depth, pooled_conf = pool_depth_maps(
|
|
depth_maps,
|
|
confidence_maps,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
# Check if pooling resulted in a valid map (enough valid pixels)
|
|
# We'll do a quick check against the best single frame
|
|
# If pooled map has significantly fewer valid pixels, fallback
|
|
best_depth = depth_maps[0]
|
|
best_conf = confidence_maps[0]
|
|
|
|
# Simple validity check (finite and > 0)
|
|
# We don't need to be perfect here, just catch catastrophic pooling failure
|
|
n_valid_pooled = np.count_nonzero(
|
|
np.isfinite(pooled_depth) & (pooled_depth > 0)
|
|
)
|
|
|
|
# For best frame, we also respect confidence threshold if provided
|
|
mask_best = np.isfinite(best_depth) & (best_depth > 0)
|
|
if best_conf is not None:
|
|
mask_best &= best_conf <= depth_confidence_threshold
|
|
n_valid_best = np.count_nonzero(mask_best)
|
|
|
|
# If pooled result is much worse (e.g. < 50% of valid points of single frame), fallback
|
|
# This can happen if frames are misaligned or pooling logic fails
|
|
if n_valid_pooled < n_valid_best:
|
|
click.echo(
|
|
f"Camera {serial}: Pooled depth has fewer valid points ({n_valid_pooled} vs {n_valid_best}). "
|
|
"Falling back to best single frame."
|
|
)
|
|
final_depth = best_depth
|
|
final_conf = best_conf
|
|
pool_metadata = {
|
|
"pool_size_requested": depth_pool_size,
|
|
"pool_size_actual": len(depth_maps),
|
|
"pooled": False,
|
|
"fallback_reason": "insufficient_valid_points",
|
|
}
|
|
else:
|
|
# A/B Test: Compare RMSE of pooled vs best single frame
|
|
# We need to compute RMSE for both using the current T_mean
|
|
pose_str = results[str(serial)]["pose"]
|
|
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
|
|
cam_matrix = camera_matrices[serial]
|
|
|
|
marker_corners_world = {
|
|
int(mid): marker_geometry[int(mid)]
|
|
for mid in ids.flatten()
|
|
if int(mid) in marker_geometry
|
|
}
|
|
|
|
# Verify pooled
|
|
verify_pooled = verify_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
pooled_depth,
|
|
cam_matrix,
|
|
confidence_map=pooled_conf,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
# Verify best single frame
|
|
verify_best = verify_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
best_depth,
|
|
cam_matrix,
|
|
confidence_map=best_conf,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
# If pooled RMSE is worse than best single frame, fallback
|
|
# We use a small epsilon to avoid flipping on noise, but generally strict
|
|
if verify_pooled.rmse > verify_best.rmse:
|
|
click.echo(
|
|
f"Camera {serial}: Pooled depth RMSE ({verify_pooled.rmse:.4f}m) worse than single frame ({verify_best.rmse:.4f}m). "
|
|
"Falling back to best single frame."
|
|
)
|
|
final_depth = best_depth
|
|
final_conf = best_conf
|
|
pool_metadata = {
|
|
"pool_size_requested": depth_pool_size,
|
|
"pool_size_actual": len(depth_maps),
|
|
"pooled": False,
|
|
"fallback_reason": "worse_verify_rmse",
|
|
"pooled_rmse": verify_pooled.rmse,
|
|
"single_rmse": verify_best.rmse,
|
|
}
|
|
else:
|
|
final_depth = pooled_depth
|
|
final_conf = pooled_conf
|
|
pool_metadata = {
|
|
"pool_size_requested": depth_pool_size,
|
|
"pool_size_actual": len(depth_maps),
|
|
"pooled": True,
|
|
"pooled_rmse": verify_pooled.rmse,
|
|
"single_rmse": verify_best.rmse,
|
|
}
|
|
click.echo(
|
|
f"Camera {serial}: Using pooled depth from {len(depth_maps)} frames (RMSE {verify_pooled.rmse:.4f}m vs {verify_best.rmse:.4f}m)."
|
|
)
|
|
except Exception as e:
|
|
click.echo(
|
|
f"Camera {serial}: Pooling failed with error: {e}. Falling back to single frame.",
|
|
err=True,
|
|
)
|
|
final_depth = depth_maps[0]
|
|
final_conf = confidence_maps[0]
|
|
pool_metadata = {
|
|
"pool_size_requested": depth_pool_size,
|
|
"pool_size_actual": len(depth_maps),
|
|
"pooled": False,
|
|
"fallback_reason": f"exception: {str(e)}",
|
|
}
|
|
else:
|
|
# Single frame case (N=1 or only 1 available)
|
|
final_depth = depth_maps[0]
|
|
final_conf = confidence_maps[0]
|
|
# Only add metadata if pooling was requested but not possible due to lack of frames
|
|
if depth_pool_size > 1:
|
|
pool_metadata = {
|
|
"pool_size_requested": depth_pool_size,
|
|
"pool_size_actual": len(depth_maps),
|
|
"pooled": False,
|
|
"fallback_reason": "insufficient_frames",
|
|
}
|
|
else:
|
|
pool_metadata = None
|
|
|
|
# Use the FINAL COMPUTED POSE for verification
|
|
pose_str = results[str(serial)]["pose"]
|
|
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
|
|
cam_matrix = camera_matrices[serial]
|
|
|
|
marker_corners_world = {
|
|
int(mid): marker_geometry[int(mid)]
|
|
for mid in ids.flatten()
|
|
if int(mid) in marker_geometry
|
|
}
|
|
|
|
if marker_corners_world and final_depth is not None:
|
|
verify_res = verify_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
final_depth,
|
|
cam_matrix,
|
|
confidence_map=final_conf,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
results[str(serial)]["depth_verify"] = {
|
|
"rmse": verify_res.rmse,
|
|
"mean_abs": verify_res.mean_abs,
|
|
"median": verify_res.median,
|
|
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
|
|
"n_valid": verify_res.n_valid,
|
|
"n_total": verify_res.n_total,
|
|
}
|
|
|
|
if pool_metadata:
|
|
results[str(serial)]["depth_pool"] = pool_metadata
|
|
|
|
click.echo(
|
|
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
|
|
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
|
|
)
|
|
|
|
if refine_depth:
|
|
if verify_res.n_valid < 4:
|
|
click.echo(
|
|
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
|
|
)
|
|
else:
|
|
click.echo(f"Camera {serial}: Refining extrinsics with depth...")
|
|
T_refined, refine_stats = refine_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
final_depth,
|
|
cam_matrix,
|
|
confidence_map=(final_conf if use_confidence_weights else None),
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
verify_res_post = verify_extrinsics_with_depth(
|
|
T_refined,
|
|
marker_corners_world,
|
|
final_depth,
|
|
cam_matrix,
|
|
confidence_map=final_conf,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
pose_str_refined = " ".join(f"{x:.6f}" for x in T_refined.flatten())
|
|
results[str(serial)]["pose"] = pose_str_refined
|
|
results[str(serial)]["refine_depth"] = refine_stats
|
|
results[str(serial)]["depth_verify_post"] = {
|
|
"rmse": verify_res_post.rmse,
|
|
"mean_abs": verify_res_post.mean_abs,
|
|
"median": verify_res_post.median,
|
|
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
|
|
"n_valid": verify_res_post.n_valid,
|
|
"n_total": verify_res_post.n_total,
|
|
}
|
|
|
|
if pool_metadata:
|
|
results[str(serial)]["depth_pool"] = pool_metadata
|
|
|
|
improvement = verify_res.rmse - verify_res_post.rmse
|
|
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
|
|
improvement
|
|
)
|
|
|
|
click.echo(
|
|
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
|
|
f"(Improved by {improvement:.3f}m). "
|
|
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
|
|
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
|
|
)
|
|
|
|
# Warning gates
|
|
if improvement < 1e-4 and refine_stats["nfev"] > 5:
|
|
click.echo(
|
|
f" WARNING: Optimization ran for {refine_stats['nfev']} steps but improvement was negligible ({improvement:.6f}m).",
|
|
err=True,
|
|
)
|
|
if not refine_stats["success"] or refine_stats["nfev"] <= 1:
|
|
click.echo(
|
|
f" WARNING: Optimization might have failed or stalled. Success: {refine_stats['success']}, Steps: {refine_stats['nfev']}. Message: {refine_stats['termination_message']}",
|
|
err=True,
|
|
)
|
|
|
|
verify_res = verify_res_post
|
|
|
|
if report_csv_path:
|
|
for mid, cidx, resid in verify_res.residuals:
|
|
csv_rows.append([serial, mid, cidx, resid])
|
|
|
|
if report_csv_path and csv_rows:
|
|
with open(report_csv_path, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
|
|
writer.writerows(csv_rows)
|
|
click.echo(f"Saved depth verification report to {report_csv_path}")
|
|
|
|
return results, csv_rows
|
|
|
|
|
|
def run_benchmark_matrix(
|
|
results: Dict[str, Any],
|
|
verification_frames: Dict[int, List[Dict[str, Any]]],
|
|
first_frames: Dict[int, Dict[str, Any]],
|
|
marker_geometry: Dict[int, Any],
|
|
camera_matrices: Dict[int, Any],
|
|
depth_confidence_threshold: int,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run benchmark matrix comparing 4 configurations:
|
|
1) baseline (linear loss, no confidence weights)
|
|
2) robust (soft_l1, f_scale=0.1, no confidence)
|
|
3) robust+confidence
|
|
4) robust+confidence+best-frame
|
|
"""
|
|
benchmark_results = {}
|
|
|
|
configs = [
|
|
{
|
|
"name": "baseline",
|
|
"loss": "linear",
|
|
"use_confidence": False,
|
|
"use_best_frame": False,
|
|
},
|
|
{
|
|
"name": "robust",
|
|
"loss": "soft_l1",
|
|
"use_confidence": False,
|
|
"use_best_frame": False,
|
|
},
|
|
{
|
|
"name": "robust+confidence",
|
|
"loss": "soft_l1",
|
|
"use_confidence": True,
|
|
"use_best_frame": False,
|
|
},
|
|
{
|
|
"name": "robust+confidence+best-frame",
|
|
"loss": "soft_l1",
|
|
"use_confidence": True,
|
|
"use_best_frame": True,
|
|
},
|
|
]
|
|
|
|
click.echo("\nRunning Benchmark Matrix...")
|
|
|
|
for serial in results.keys():
|
|
serial_int = int(serial)
|
|
if serial_int not in first_frames or serial_int not in verification_frames:
|
|
continue
|
|
|
|
cam_matrix = camera_matrices[serial_int]
|
|
pose_str = results[serial]["pose"]
|
|
T_initial = np.fromstring(pose_str, sep=" ").reshape(4, 4)
|
|
|
|
cam_bench = {}
|
|
|
|
for config in configs:
|
|
name = config["name"]
|
|
use_best = config["use_best_frame"]
|
|
if use_best:
|
|
vf = verification_frames[serial_int][0]
|
|
else:
|
|
vf = first_frames[serial_int]
|
|
|
|
frame = vf["frame"]
|
|
ids = vf["ids"]
|
|
marker_corners_world = {
|
|
int(mid): marker_geometry[int(mid)]
|
|
for mid in ids.flatten()
|
|
if int(mid) in marker_geometry
|
|
}
|
|
|
|
if not marker_corners_world or frame.depth_map is None:
|
|
continue
|
|
|
|
# Pre-refinement verification
|
|
verify_pre = verify_extrinsics_with_depth(
|
|
T_initial,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
confidence_map=frame.confidence_map,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
# Refinement
|
|
T_refined, refine_stats = refine_extrinsics_with_depth(
|
|
T_initial,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
confidence_map=(
|
|
frame.confidence_map if config["use_confidence"] else None
|
|
),
|
|
confidence_thresh=depth_confidence_threshold,
|
|
loss=str(config["loss"]),
|
|
f_scale=0.1,
|
|
)
|
|
|
|
# Post-refinement verification
|
|
verify_post = verify_extrinsics_with_depth(
|
|
T_refined,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
confidence_map=frame.confidence_map,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
cam_bench[name] = {
|
|
"rmse_pre": verify_pre.rmse,
|
|
"rmse_post": verify_post.rmse,
|
|
"improvement": verify_pre.rmse - verify_post.rmse,
|
|
"delta_rot_deg": refine_stats["delta_rotation_deg"],
|
|
"delta_trans_m": refine_stats["delta_translation_norm_m"],
|
|
"nfev": refine_stats["nfev"],
|
|
"success": refine_stats["success"],
|
|
"frame_index": vf["frame_index"],
|
|
}
|
|
|
|
benchmark_results[serial] = cam_bench
|
|
|
|
# Print summary table for this camera
|
|
click.echo(f"\nBenchmark Results for Camera {serial}:")
|
|
header = f"{'Config':<30} | {'RMSE Pre':<10} | {'RMSE Post':<10} | {'Improv':<10} | {'Iter':<5}"
|
|
click.echo(header)
|
|
click.echo("-" * len(header))
|
|
for name, stats in cam_bench.items():
|
|
click.echo(
|
|
f"{name:<30} | {stats['rmse_pre']:<10.4f} | {stats['rmse_post']:<10.4f} | "
|
|
f"{stats['improvement']:<10.4f} | {stats['nfev']:<5}"
|
|
)
|
|
|
|
return benchmark_results
|
|
|
|
|
|
@click.command()
|
|
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
|
|
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
|
|
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
|
|
@click.option(
|
|
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
|
|
)
|
|
@click.option(
|
|
"--max-reproj-error",
|
|
"-e",
|
|
default=2.0,
|
|
type=float,
|
|
help="Max reprojection error for pose.",
|
|
)
|
|
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
|
|
@click.option(
|
|
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
|
|
)
|
|
@click.option(
|
|
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
|
|
)
|
|
@click.option(
|
|
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
|
|
)
|
|
@click.option(
|
|
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
|
|
)
|
|
@click.option(
|
|
"--use-confidence-weights/--no-confidence-weights",
|
|
default=False,
|
|
help="Use confidence-weighted residuals in depth refinement.",
|
|
)
|
|
@click.option(
|
|
"--depth-mode",
|
|
default=None,
|
|
type=click.Choice(["NEURAL", "NEURAL_PLUS", "NEURAL_LIGHT", "NONE"]),
|
|
help="Depth computation mode. Defaults to NEURAL_PLUS if depth verification/refinement is enabled, otherwise NONE.",
|
|
)
|
|
@click.option(
|
|
"--depth-confidence-threshold",
|
|
default=50,
|
|
type=int,
|
|
help="Confidence threshold for depth filtering (lower = more confident).",
|
|
)
|
|
@click.option(
|
|
"--depth-pool-size",
|
|
default=1,
|
|
type=click.IntRange(min=1, max=10),
|
|
help="Number of best frames to pool for depth verification/refinement (1=single best frame).",
|
|
)
|
|
@click.option(
|
|
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
|
|
)
|
|
@click.option(
|
|
"--auto-align/--no-auto-align",
|
|
default=False,
|
|
help="Automatically align ground plane.",
|
|
)
|
|
@click.option(
|
|
"--ground-face", type=str, help="Explicit face name for ground alignment."
|
|
)
|
|
@click.option(
|
|
"--ground-marker-id", type=int, help="Explicit marker ID to define ground face."
|
|
)
|
|
@click.option(
|
|
"--aruco-dictionary",
|
|
default="DICT_4X4_50",
|
|
type=click.Choice(list(ARUCO_DICT_MAP.keys())),
|
|
help="ArUco dictionary to use.",
|
|
)
|
|
@click.option(
|
|
"--min-markers",
|
|
default=1,
|
|
type=int,
|
|
help="Minimum markers required for pose estimation.",
|
|
)
|
|
@click.option(
|
|
"--debug/--no-debug",
|
|
default=False,
|
|
help="Enable verbose debug logging.",
|
|
)
|
|
@click.option(
|
|
"--max-samples",
|
|
default=None,
|
|
type=int,
|
|
help="Maximum number of samples to process before stopping.",
|
|
)
|
|
@click.option(
|
|
"--benchmark-matrix/--no-benchmark-matrix",
|
|
default=False,
|
|
help="Run benchmark matrix comparing different refinement configurations.",
|
|
)
|
|
def main(
|
|
svo: tuple[str, ...],
|
|
markers: str,
|
|
output: str,
|
|
sample_interval: int,
|
|
max_reproj_error: float,
|
|
preview: bool,
|
|
validate_markers: bool,
|
|
self_check: bool,
|
|
verify_depth: bool,
|
|
refine_depth: bool,
|
|
use_confidence_weights: bool,
|
|
depth_mode: str | None,
|
|
depth_confidence_threshold: int,
|
|
depth_pool_size: int,
|
|
report_csv: str | None,
|
|
auto_align: bool,
|
|
ground_face: str | None,
|
|
ground_marker_id: int | None,
|
|
aruco_dictionary: str,
|
|
min_markers: int,
|
|
debug: bool,
|
|
max_samples: int | None,
|
|
benchmark_matrix: bool,
|
|
):
|
|
"""
|
|
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
|
|
"""
|
|
# Configure logging level
|
|
logger.remove()
|
|
logger.add(
|
|
lambda msg: click.echo(msg, nl=False),
|
|
level="DEBUG" if debug else "INFO",
|
|
format="{message}",
|
|
)
|
|
|
|
depth_mode_map = {
|
|
"NEURAL": sl.DEPTH_MODE.NEURAL,
|
|
"NEURAL_PLUS": sl.DEPTH_MODE.NEURAL_PLUS,
|
|
"NEURAL_LIGHT": sl.DEPTH_MODE.NEURAL_LIGHT,
|
|
"NONE": sl.DEPTH_MODE.NONE,
|
|
}
|
|
|
|
if depth_mode is None:
|
|
if verify_depth or refine_depth or benchmark_matrix:
|
|
sl_depth_mode = sl.DEPTH_MODE.NEURAL_PLUS
|
|
else:
|
|
sl_depth_mode = sl.DEPTH_MODE.NONE
|
|
else:
|
|
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
|
|
|
|
# Expand SVO paths (files or directories)
|
|
expanded_svo = []
|
|
for path_str in svo:
|
|
path = Path(path_str)
|
|
if path.is_dir():
|
|
click.echo(f"Searching for SVO files in {path}...")
|
|
found = sorted(
|
|
[
|
|
str(p)
|
|
for p in path.iterdir()
|
|
if p.is_file() and p.suffix.lower() in (".svo", ".svo2")
|
|
]
|
|
)
|
|
if found:
|
|
click.echo(f"Found {len(found)} SVO files in {path}")
|
|
expanded_svo.extend(found)
|
|
else:
|
|
click.echo(f"Warning: No .svo/.svo2 files found in {path}", err=True)
|
|
elif path.is_file():
|
|
expanded_svo.append(str(path))
|
|
else:
|
|
click.echo(f"Warning: Path not found: {path}", err=True)
|
|
|
|
if not expanded_svo:
|
|
if validate_markers:
|
|
click.echo("Marker validation successful. No SVOs provided, exiting.")
|
|
return
|
|
else:
|
|
click.echo(
|
|
"Error: --svo is required unless --validate-markers is used.", err=True
|
|
)
|
|
raise click.UsageError("Missing option '--svo' / '-s'.")
|
|
|
|
# 1. Load Marker Geometry
|
|
try:
|
|
marker_geometry = load_marker_geometry(markers)
|
|
if validate_markers:
|
|
validate_marker_geometry(marker_geometry)
|
|
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
|
|
|
|
# Load face mapping if available
|
|
face_marker_map = load_face_mapping(markers)
|
|
if face_marker_map:
|
|
click.echo(f"Loaded face mapping for {len(face_marker_map)} faces.")
|
|
else:
|
|
click.echo("No face mapping found in parquet (missing 'name'/'ids').")
|
|
face_marker_map = None
|
|
|
|
except Exception as e:
|
|
click.echo(f"Error loading markers: {e}", err=True)
|
|
raise SystemExit(1)
|
|
|
|
# 2. Initialize SVO Reader
|
|
reader = SVOReader(expanded_svo, depth_mode=sl_depth_mode)
|
|
if not reader.cameras:
|
|
click.echo("No SVO files could be opened.", err=True)
|
|
return
|
|
|
|
# Align SVOs
|
|
reader.sync_to_latest_start()
|
|
|
|
# Calculate max frames to process to avoid infinite loop
|
|
max_frames = 10000 # Default safety limit
|
|
if reader.cameras:
|
|
remaining = []
|
|
for i, cam in enumerate(reader.cameras):
|
|
total = reader.camera_info[i]["total_frames"]
|
|
if total > 0:
|
|
current = cam.get_svo_position()
|
|
remaining.append(total - current)
|
|
else:
|
|
# If any total_frames is unknown (<= 0), use a hard limit
|
|
remaining = [10000]
|
|
break
|
|
if remaining:
|
|
max_frames = min(remaining)
|
|
else:
|
|
click.echo(
|
|
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
|
|
)
|
|
|
|
serials = [info["serial"] for info in reader.camera_info]
|
|
accumulators = {serial: PoseAccumulator() for serial in serials}
|
|
camera_matrices = {
|
|
serial: build_camera_matrix_from_zed(cam)
|
|
for serial, cam in zip(serials, reader.cameras)
|
|
}
|
|
|
|
# Store verification frames for post-process check
|
|
verification_frames: Dict[int, List[Dict[str, Any]]] = {}
|
|
# Store first valid frame for benchmarking
|
|
first_frames: Dict[int, Dict[str, Any]] = {}
|
|
|
|
# Track all visible marker IDs for heuristic ground detection
|
|
all_visible_ids = set()
|
|
|
|
detector = create_detector(dictionary_id=ARUCO_DICT_MAP[aruco_dictionary])
|
|
|
|
frame_count = 0
|
|
sampled_count = 0
|
|
|
|
click.echo(f"Processing SVOs: {serials}")
|
|
|
|
try:
|
|
while frame_count < max_frames:
|
|
frames = reader.grab_synced()
|
|
if not any(frames):
|
|
break
|
|
|
|
if frame_count % sample_interval == 0:
|
|
preview_frames = {}
|
|
for i, frame in enumerate(frames):
|
|
if frame is None:
|
|
continue
|
|
|
|
serial = frame.serial_number
|
|
K = camera_matrices[serial]
|
|
|
|
# Detect markers
|
|
corners, ids = detect_markers(frame.image, detector)
|
|
|
|
if ids is not None:
|
|
all_visible_ids.update(ids.flatten().tolist())
|
|
logger.debug(
|
|
f"Cam {serial}: Detected {len(ids)} markers: {ids.flatten()}"
|
|
)
|
|
else:
|
|
logger.debug(f"Cam {serial}: No markers detected")
|
|
|
|
if ids is None:
|
|
if preview:
|
|
preview_frames[serial] = frame.image
|
|
continue
|
|
|
|
# Estimate pose (T_cam_from_world)
|
|
pose_res = estimate_pose_from_detections(
|
|
corners, ids, marker_geometry, K, min_markers=min_markers
|
|
)
|
|
|
|
if pose_res:
|
|
rvec, tvec, reproj_err, n_markers = pose_res
|
|
if reproj_err <= max_reproj_error:
|
|
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
|
|
# We want T_world_from_cam
|
|
T_world_cam = invert_transform(T_cam_world)
|
|
|
|
# Save best frame for verification based on scoring
|
|
if (
|
|
verify_depth or refine_depth or benchmark_matrix
|
|
) and frame.depth_map is not None:
|
|
current_score = score_frame(
|
|
n_markers,
|
|
reproj_err,
|
|
corners,
|
|
frame.depth_map,
|
|
depth_confidence_threshold,
|
|
frame.confidence_map,
|
|
)
|
|
|
|
if serial not in first_frames:
|
|
first_frames[serial] = {
|
|
"frame": frame,
|
|
"ids": ids,
|
|
"corners": corners,
|
|
"score": current_score,
|
|
"frame_index": frame_count,
|
|
}
|
|
|
|
if serial not in verification_frames:
|
|
verification_frames[serial] = []
|
|
|
|
verification_frames[serial].append(
|
|
{
|
|
"frame": frame,
|
|
"ids": ids,
|
|
"corners": corners,
|
|
"score": current_score,
|
|
"frame_index": frame_count,
|
|
}
|
|
)
|
|
# Sort by score descending and truncate to pool size
|
|
verification_frames[serial].sort(
|
|
key=lambda x: x["score"], reverse=True
|
|
)
|
|
verification_frames[serial] = verification_frames[
|
|
serial
|
|
][:depth_pool_size]
|
|
|
|
logger.debug(
|
|
f"Cam {serial}: Updated verification pool (size {len(verification_frames[serial])}), top score {verification_frames[serial][0]['score']:.2f}"
|
|
)
|
|
|
|
accumulators[serial].add_pose(
|
|
T_world_cam, reproj_err, frame_count
|
|
)
|
|
logger.debug(
|
|
f"Cam {serial}: Pose accepted. Reproj={reproj_err:.3f}, Markers={n_markers}"
|
|
)
|
|
|
|
else:
|
|
logger.debug(
|
|
f"Cam {serial}: Pose rejected. Reproj {reproj_err:.3f} > {max_reproj_error}"
|
|
)
|
|
|
|
if preview:
|
|
img = draw_detected_markers(
|
|
frame.image.copy(), corners, ids
|
|
)
|
|
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
|
|
preview_frames[serial] = img
|
|
else:
|
|
if ids is not None:
|
|
logger.debug(
|
|
f"Cam {serial}: Pose estimation failed (insufficient markers < {min_markers} or solver failure)"
|
|
)
|
|
elif preview:
|
|
preview_frames[serial] = frame.image
|
|
|
|
if preview and preview_frames:
|
|
key = show_preview(preview_frames)
|
|
if key == 27 or key == ord("q"):
|
|
break
|
|
|
|
sampled_count += 1
|
|
if max_samples is not None and sampled_count >= max_samples:
|
|
click.echo(f"\nReached max samples ({max_samples}). Stopping.")
|
|
break
|
|
|
|
frame_count += 1
|
|
if frame_count % 100 == 0:
|
|
counts = [len(acc.poses) for acc in accumulators.values()]
|
|
click.echo(
|
|
f"Frame {frame_count}, Accepted Poses: {dict(zip(serials, counts))}"
|
|
)
|
|
|
|
except KeyboardInterrupt:
|
|
click.echo("\nInterrupted by user.")
|
|
finally:
|
|
reader.close()
|
|
cv2.destroyAllWindows()
|
|
|
|
# 3. Compute Final Poses
|
|
results = {}
|
|
for serial, acc in accumulators.items():
|
|
if not acc.poses:
|
|
click.echo(f"Warning: No valid poses for camera {serial}")
|
|
continue
|
|
|
|
# Use RANSAC to find best consensus
|
|
inliers = acc.ransac_filter()
|
|
T_mean, stats = acc.compute_robust_mean(inliers)
|
|
|
|
# Flatten for JSON as space-separated string
|
|
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
|
|
|
|
results[str(serial)] = {"pose": pose_str, "stats": stats}
|
|
click.echo(
|
|
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
|
|
)
|
|
|
|
if not results:
|
|
click.echo("No extrinsics computed.", err=True)
|
|
return
|
|
|
|
# 4. Run Depth Verification if requested
|
|
apply_depth_verify_refine_postprocess(
|
|
results,
|
|
verification_frames,
|
|
marker_geometry,
|
|
camera_matrices,
|
|
verify_depth,
|
|
refine_depth,
|
|
use_confidence_weights,
|
|
depth_confidence_threshold,
|
|
depth_pool_size,
|
|
report_csv,
|
|
)
|
|
|
|
# 5. Run Benchmark Matrix if requested
|
|
if benchmark_matrix:
|
|
benchmark_results = run_benchmark_matrix(
|
|
results,
|
|
verification_frames,
|
|
first_frames,
|
|
marker_geometry,
|
|
camera_matrices,
|
|
depth_confidence_threshold,
|
|
)
|
|
# Add to results for saving
|
|
for serial, bench in benchmark_results.items():
|
|
if serial in results:
|
|
results[serial]["benchmark"] = bench
|
|
|
|
# 6. Optional Ground Plane Alignment
|
|
if auto_align:
|
|
click.echo("\nPerforming ground plane alignment...")
|
|
target_face = ground_face
|
|
|
|
# Use loaded map or skip if None
|
|
if face_marker_map is None:
|
|
click.echo(
|
|
"Warning: No face mapping available (missing 'name'/'ids' in parquet). Skipping alignment.",
|
|
err=True,
|
|
)
|
|
# Skip alignment logic by ensuring loop below doesn't run and heuristic fails gracefully
|
|
mapping_to_use = {}
|
|
else:
|
|
mapping_to_use = face_marker_map
|
|
|
|
if not target_face and ground_marker_id is not None:
|
|
# Map marker ID to face
|
|
for face, ids in mapping_to_use.items():
|
|
if ground_marker_id in ids:
|
|
target_face = face
|
|
logger.info(
|
|
f"Mapped ground-marker-id {ground_marker_id} to face '{face}' (markers={ids})"
|
|
)
|
|
break
|
|
|
|
ground_normal = None
|
|
if target_face:
|
|
ground_normal = get_face_normal_from_geometry(
|
|
target_face, marker_geometry, face_marker_map=face_marker_map
|
|
)
|
|
if ground_normal is not None:
|
|
ids = mapping_to_use.get(target_face, [])
|
|
logger.info(
|
|
f"Using explicit ground face '{target_face}' (markers={ids})"
|
|
)
|
|
else:
|
|
# Heuristic detection
|
|
heuristic_res = detect_ground_face(
|
|
all_visible_ids, marker_geometry, face_marker_map=face_marker_map
|
|
)
|
|
if heuristic_res:
|
|
target_face, ground_normal = heuristic_res
|
|
ids = mapping_to_use.get(target_face, [])
|
|
logger.info(
|
|
f"Heuristically detected ground face '{target_face}' (markers={ids})"
|
|
)
|
|
|
|
if ground_normal is not None:
|
|
R_align: Mat33 = rotation_align_vectors(ground_normal, np.array([0, 1, 0]))
|
|
logger.info(f"Computed alignment rotation for face '{target_face}'")
|
|
|
|
for serial, data in results.items():
|
|
T_mean: Mat44 = np.fromstring(data["pose"], sep=" ").reshape(4, 4)
|
|
T_aligned = apply_alignment_to_pose(T_mean, R_align)
|
|
data["pose"] = " ".join(f"{x:.6f}" for x in T_aligned.flatten())
|
|
logger.debug(f"Applied alignment to camera {serial}")
|
|
else:
|
|
click.echo(
|
|
"Warning: Could not determine ground normal. Skipping alignment."
|
|
)
|
|
|
|
# 6. Save to JSON
|
|
with open(output, "w") as f:
|
|
json.dump(results, f, indent=4, sort_keys=True)
|
|
click.echo(f"Saved extrinsics to {output}")
|
|
|
|
# 7. Optional Self-Check
|
|
if self_check:
|
|
# Verify reprojection error
|
|
for serial, data in results.items():
|
|
if data["stats"]["median_reproj_error"] > max_reproj_error:
|
|
click.echo(
|
|
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
|
|
err=True,
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
# Verify depth-quality outliers if depth verification ran
|
|
depth_rmse_by_cam = {}
|
|
for serial, data in results.items():
|
|
depth_metrics = data.get("depth_verify_post") or data.get("depth_verify")
|
|
if depth_metrics and "rmse" in depth_metrics:
|
|
depth_rmse_by_cam[serial] = float(depth_metrics["rmse"])
|
|
|
|
if len(depth_rmse_by_cam) >= 2:
|
|
rmse_values = sorted(depth_rmse_by_cam.values())
|
|
median_rmse = float(np.median(np.array(rmse_values)))
|
|
outlier_factor = 2.5
|
|
min_outlier_rmse_m = 0.08
|
|
|
|
failed_depth_cams = []
|
|
for serial, rmse in depth_rmse_by_cam.items():
|
|
if rmse > max(min_outlier_rmse_m, outlier_factor * median_rmse):
|
|
failed_depth_cams.append((serial, rmse))
|
|
|
|
if failed_depth_cams:
|
|
failed_str = ", ".join(
|
|
f"{serial}:{rmse:.3f}m"
|
|
for serial, rmse in sorted(failed_depth_cams)
|
|
)
|
|
click.echo(
|
|
"Error: Calibration failed depth outlier self-check "
|
|
f"(median RMSE={median_rmse:.3f}m, outliers={failed_str}).",
|
|
err=True,
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
# Simple check: verify distance between cameras if multiple
|
|
if len(results) >= 2:
|
|
serials_list = sorted(results.keys())
|
|
for i in range(len(serials_list)):
|
|
for j in range(i + 1, len(serials_list)):
|
|
s1 = serials_list[i]
|
|
s2 = serials_list[j]
|
|
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
|
|
:3, 3
|
|
]
|
|
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
|
|
:3, 3
|
|
]
|
|
dist = np.linalg.norm(p1 - p2)
|
|
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() # pylint: disable=no-value-for-parameter
|