Files
zed-playground/py_workspace/calibrate_extrinsics.py
T

641 lines
23 KiB
Python

import click
import cv2
import json
import csv
import numpy as np
import pyzed.sl as sl
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from aruco.marker_geometry import (
load_marker_geometry,
validate_marker_geometry,
load_face_mapping,
)
from aruco.svo_sync import SVOReader
from aruco.detector import (
create_detector,
detect_markers,
build_camera_matrix_from_zed,
estimate_pose_from_detections,
)
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
from aruco.pose_averaging import PoseAccumulator
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
from aruco.depth_verify import verify_extrinsics_with_depth
from aruco.depth_refine import refine_extrinsics_with_depth
from aruco.alignment import (
get_face_normal_from_geometry,
detect_ground_face,
rotation_align_vectors,
apply_alignment_to_pose,
)
from loguru import logger
ARUCO_DICT_MAP = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11,
}
def apply_depth_verify_refine_postprocess(
results: Dict[str, Any],
verification_frames: Dict[str, Any],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[str, Any],
verify_depth: bool,
refine_depth: bool,
depth_confidence_threshold: int,
report_csv_path: Optional[str] = None,
) -> Tuple[Dict[str, Any], List[List[Any]]]:
"""
Apply depth verification and refinement to computed extrinsics.
Returns updated results and list of CSV rows.
"""
csv_rows: List[List[Any]] = []
if not (verify_depth or refine_depth):
return results, csv_rows
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
for serial, vf in verification_frames.items():
if str(serial) not in results:
continue
frame = vf["frame"]
ids = vf["ids"]
# Use the FINAL COMPUTED POSE for verification
pose_str = results[str(serial)]["pose"]
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_matrix = camera_matrices[serial]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if marker_corners_world and frame.depth_map is not None:
verify_res = verify_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
results[str(serial)]["depth_verify"] = {
"rmse": verify_res.rmse,
"mean_abs": verify_res.mean_abs,
"median": verify_res.median,
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
"n_valid": verify_res.n_valid,
"n_total": verify_res.n_total,
}
click.echo(
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
)
if refine_depth:
if verify_res.n_valid < 4:
click.echo(
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
)
else:
click.echo(f"Camera {serial}: Refining extrinsics with depth...")
T_refined, refine_stats = refine_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
)
verify_res_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
pose_str_refined = " ".join(f"{x:.6f}" for x in T_refined.flatten())
results[str(serial)]["pose"] = pose_str_refined
results[str(serial)]["refine_depth"] = refine_stats
results[str(serial)]["depth_verify_post"] = {
"rmse": verify_res_post.rmse,
"mean_abs": verify_res_post.mean_abs,
"median": verify_res_post.median,
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
"n_valid": verify_res_post.n_valid,
"n_total": verify_res_post.n_total,
}
improvement = verify_res.rmse - verify_res_post.rmse
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
improvement
)
click.echo(
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
f"(Improved by {improvement:.3f}m). "
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
)
verify_res = verify_res_post
if report_csv_path:
for mid, cidx, resid in verify_res.residuals:
csv_rows.append([serial, mid, cidx, resid])
if report_csv_path and csv_rows:
with open(report_csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
writer.writerows(csv_rows)
click.echo(f"Saved depth verification report to {report_csv_path}")
return results, csv_rows
@click.command()
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
@click.option(
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
)
@click.option(
"--max-reproj-error",
"-e",
default=2.0,
type=float,
help="Max reprojection error for pose.",
)
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
@click.option(
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
)
@click.option(
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
)
@click.option(
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
)
@click.option(
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
)
@click.option(
"--depth-mode",
default="NEURAL",
type=click.Choice(["NEURAL", "ULTRA", "PERFORMANCE", "NONE"]),
help="Depth computation mode.",
)
@click.option(
"--depth-confidence-threshold",
default=50,
type=int,
help="Confidence threshold for depth filtering (lower = more confident).",
)
@click.option(
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
)
@click.option(
"--auto-align/--no-auto-align",
default=False,
help="Automatically align ground plane.",
)
@click.option(
"--ground-face", type=str, help="Explicit face name for ground alignment."
)
@click.option(
"--ground-marker-id", type=int, help="Explicit marker ID to define ground face."
)
@click.option(
"--aruco-dictionary",
default="DICT_4X4_50",
type=click.Choice(list(ARUCO_DICT_MAP.keys())),
help="ArUco dictionary to use.",
)
@click.option(
"--min-markers",
default=1,
type=int,
help="Minimum markers required for pose estimation.",
)
@click.option(
"--debug/--no-debug",
default=False,
help="Enable verbose debug logging.",
)
@click.option(
"--max-samples",
default=None,
type=int,
help="Maximum number of samples to process before stopping.",
)
def main(
svo: tuple[str, ...],
markers: str,
output: str,
sample_interval: int,
max_reproj_error: float,
preview: bool,
validate_markers: bool,
self_check: bool,
verify_depth: bool,
refine_depth: bool,
depth_mode: str,
depth_confidence_threshold: int,
report_csv: str | None,
auto_align: bool,
ground_face: str | None,
ground_marker_id: int | None,
aruco_dictionary: str,
min_markers: int,
debug: bool,
max_samples: int | None,
):
"""
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
"""
# Configure logging level
logger.remove()
logger.add(
lambda msg: click.echo(msg, nl=False),
level="DEBUG" if debug else "INFO",
format="{message}",
)
depth_mode_map = {
"NEURAL": sl.DEPTH_MODE.NEURAL,
"ULTRA": sl.DEPTH_MODE.ULTRA,
"PERFORMANCE": sl.DEPTH_MODE.PERFORMANCE,
"NONE": sl.DEPTH_MODE.NONE,
}
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
if not (verify_depth or refine_depth):
sl_depth_mode = sl.DEPTH_MODE.NONE
# Expand SVO paths (files or directories)
expanded_svo = []
for path_str in svo:
path = Path(path_str)
if path.is_dir():
click.echo(f"Searching for SVO files in {path}...")
found = sorted(
[
str(p)
for p in path.iterdir()
if p.is_file() and p.suffix.lower() in (".svo", ".svo2")
]
)
if found:
click.echo(f"Found {len(found)} SVO files in {path}")
expanded_svo.extend(found)
else:
click.echo(f"Warning: No .svo/.svo2 files found in {path}", err=True)
elif path.is_file():
expanded_svo.append(str(path))
else:
click.echo(f"Warning: Path not found: {path}", err=True)
if not expanded_svo:
if validate_markers:
click.echo("Marker validation successful. No SVOs provided, exiting.")
return
else:
click.echo(
"Error: --svo is required unless --validate-markers is used.", err=True
)
raise click.UsageError("Missing option '--svo' / '-s'.")
# 1. Load Marker Geometry
try:
marker_geometry = load_marker_geometry(markers)
if validate_markers:
validate_marker_geometry(marker_geometry)
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
# Load face mapping if available
face_marker_map = load_face_mapping(markers)
if face_marker_map:
click.echo(f"Loaded face mapping for {len(face_marker_map)} faces.")
else:
click.echo("No face mapping found in parquet (missing 'name'/'ids').")
face_marker_map = None
except Exception as e:
click.echo(f"Error loading markers: {e}", err=True)
raise SystemExit(1)
# 2. Initialize SVO Reader
reader = SVOReader(expanded_svo, depth_mode=sl_depth_mode)
if not reader.cameras:
click.echo("No SVO files could be opened.", err=True)
return
# Align SVOs
reader.sync_to_latest_start()
# Calculate max frames to process to avoid infinite loop
max_frames = 10000 # Default safety limit
if reader.cameras:
remaining = []
for i, cam in enumerate(reader.cameras):
total = reader.camera_info[i]["total_frames"]
if total > 0:
current = cam.get_svo_position()
remaining.append(total - current)
else:
# If any total_frames is unknown (<= 0), use a hard limit
remaining = [10000]
break
if remaining:
max_frames = min(remaining)
else:
click.echo(
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
)
serials = [info["serial"] for info in reader.camera_info]
accumulators = {serial: PoseAccumulator() for serial in serials}
camera_matrices = {
serial: build_camera_matrix_from_zed(cam)
for serial, cam in zip(serials, reader.cameras)
}
# Store verification frames for post-process check
verification_frames = {}
# Track all visible marker IDs for heuristic ground detection
all_visible_ids = set()
detector = create_detector(dictionary_id=ARUCO_DICT_MAP[aruco_dictionary])
frame_count = 0
sampled_count = 0
click.echo(f"Processing SVOs: {serials}")
try:
while frame_count < max_frames:
frames = reader.grab_synced()
if not any(frames):
break
if frame_count % sample_interval == 0:
preview_frames = {}
for i, frame in enumerate(frames):
if frame is None:
continue
serial = frame.serial_number
K = camera_matrices[serial]
# Detect markers
corners, ids = detect_markers(frame.image, detector)
if ids is not None:
all_visible_ids.update(ids.flatten().tolist())
logger.debug(
f"Cam {serial}: Detected {len(ids)} markers: {ids.flatten()}"
)
else:
logger.debug(f"Cam {serial}: No markers detected")
if ids is None:
if preview:
preview_frames[serial] = frame.image
continue
# Estimate pose (T_cam_from_world)
pose_res = estimate_pose_from_detections(
corners, ids, marker_geometry, K, min_markers=min_markers
)
if pose_res:
rvec, tvec, reproj_err, n_markers = pose_res
if reproj_err <= max_reproj_error:
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
# We want T_world_from_cam
T_world_cam = invert_transform(T_cam_world)
# Save latest valid frame for verification
if (
verify_depth or refine_depth
) and frame.depth_map is not None:
verification_frames[serial] = {
"frame": frame,
"ids": ids,
"corners": corners,
}
accumulators[serial].add_pose(
T_world_cam, reproj_err, frame_count
)
logger.debug(
f"Cam {serial}: Pose accepted. Reproj={reproj_err:.3f}, Markers={n_markers}"
)
else:
logger.debug(
f"Cam {serial}: Pose rejected. Reproj {reproj_err:.3f} > {max_reproj_error}"
)
if preview:
img = draw_detected_markers(
frame.image.copy(), corners, ids
)
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
preview_frames[serial] = img
else:
if ids is not None:
logger.debug(
f"Cam {serial}: Pose estimation failed (insufficient markers < {min_markers} or solver failure)"
)
elif preview:
preview_frames[serial] = frame.image
if preview and preview_frames:
key = show_preview(preview_frames)
if key == 27 or key == ord("q"):
break
sampled_count += 1
if max_samples is not None and sampled_count >= max_samples:
click.echo(f"\nReached max samples ({max_samples}). Stopping.")
break
frame_count += 1
if frame_count % 100 == 0:
counts = [len(acc.poses) for acc in accumulators.values()]
click.echo(
f"Frame {frame_count}, Accepted Poses: {dict(zip(serials, counts))}"
)
except KeyboardInterrupt:
click.echo("\nInterrupted by user.")
finally:
reader.close()
cv2.destroyAllWindows()
# 3. Compute Final Poses
results = {}
for serial, acc in accumulators.items():
if not acc.poses:
click.echo(f"Warning: No valid poses for camera {serial}")
continue
# Use RANSAC to find best consensus
inliers = acc.ransac_filter()
T_mean, stats = acc.compute_robust_mean(inliers)
# Flatten for JSON as space-separated string
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
results[str(serial)] = {"pose": pose_str, "stats": stats}
click.echo(
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
)
if not results:
click.echo("No extrinsics computed.", err=True)
return
# 4. Run Depth Verification if requested
apply_depth_verify_refine_postprocess(
results,
verification_frames,
marker_geometry,
camera_matrices,
verify_depth,
refine_depth,
depth_confidence_threshold,
report_csv,
)
# 5. Optional Ground Plane Alignment
if auto_align:
click.echo("\nPerforming ground plane alignment...")
target_face = ground_face
# Use loaded map or skip if None
if face_marker_map is None:
click.echo(
"Warning: No face mapping available (missing 'name'/'ids' in parquet). Skipping alignment.",
err=True,
)
# Skip alignment logic by ensuring loop below doesn't run and heuristic fails gracefully
mapping_to_use = {}
else:
mapping_to_use = face_marker_map
if not target_face and ground_marker_id is not None:
# Map marker ID to face
for face, ids in mapping_to_use.items():
if ground_marker_id in ids:
target_face = face
logger.info(
f"Mapped ground-marker-id {ground_marker_id} to face '{face}' (markers={ids})"
)
break
ground_normal = None
if target_face:
ground_normal = get_face_normal_from_geometry(
target_face, marker_geometry, face_marker_map=face_marker_map
)
if ground_normal is not None:
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Using explicit ground face '{target_face}' (markers={ids})"
)
else:
# Heuristic detection
heuristic_res = detect_ground_face(
all_visible_ids, marker_geometry, face_marker_map=face_marker_map
)
if heuristic_res:
target_face, ground_normal = heuristic_res
ids = mapping_to_use.get(target_face, [])
logger.info(
f"Heuristically detected ground face '{target_face}' (markers={ids})"
)
if ground_normal is not None:
R_align = rotation_align_vectors(ground_normal, np.array([0, 1, 0]))
logger.info(f"Computed alignment rotation for face '{target_face}'")
for serial, data in results.items():
T_mean = np.fromstring(data["pose"], sep=" ").reshape(4, 4)
T_aligned = apply_alignment_to_pose(T_mean, R_align)
data["pose"] = " ".join(f"{x:.6f}" for x in T_aligned.flatten())
logger.debug(f"Applied alignment to camera {serial}")
else:
click.echo(
"Warning: Could not determine ground normal. Skipping alignment."
)
# 6. Save to JSON
with open(output, "w") as f:
json.dump(results, f, indent=4, sort_keys=True)
click.echo(f"Saved extrinsics to {output}")
# 7. Optional Self-Check
if self_check:
# Verify reprojection error
for serial, data in results.items():
if data["stats"]["median_reproj_error"] > max_reproj_error:
click.echo(
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
err=True,
)
raise SystemExit(1)
# Simple check: verify distance between cameras if multiple
if len(results) >= 2:
serials_list = sorted(results.keys())
for i in range(len(serials_list)):
for j in range(i + 1, len(serials_list)):
s1 = serials_list[i]
s2 = serials_list[j]
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
dist = np.linalg.norm(p1 - p2)
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter