6d3c5cc5c1
- Retrieve depth + confidence measures from SVOReader when depth enabled - Compute depth residual metrics and attach to output JSON - Optionally write per-corner residual CSV via --report-csv - Post-process refinement: optimize final pose and report pre/post metrics - Add unit tests for depth verification and refinement modules - Add basedpyright dev dependency for diagnostics
409 lines
15 KiB
Python
409 lines
15 KiB
Python
import click
|
|
import cv2
|
|
import json
|
|
import numpy as np
|
|
import pyzed.sl as sl
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
from aruco.marker_geometry import load_marker_geometry, validate_marker_geometry
|
|
from aruco.svo_sync import SVOReader
|
|
from aruco.detector import (
|
|
create_detector,
|
|
detect_markers,
|
|
build_camera_matrix_from_zed,
|
|
estimate_pose_from_detections,
|
|
)
|
|
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
|
|
from aruco.pose_averaging import PoseAccumulator
|
|
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
|
|
from aruco.depth_verify import verify_extrinsics_with_depth
|
|
from aruco.depth_refine import refine_extrinsics_with_depth
|
|
|
|
|
|
@click.command()
|
|
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
|
|
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
|
|
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
|
|
@click.option(
|
|
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
|
|
)
|
|
@click.option(
|
|
"--max-reproj-error",
|
|
"-e",
|
|
default=2.0,
|
|
type=float,
|
|
help="Max reprojection error for pose.",
|
|
)
|
|
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
|
|
@click.option(
|
|
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
|
|
)
|
|
@click.option(
|
|
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
|
|
)
|
|
@click.option(
|
|
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
|
|
)
|
|
@click.option(
|
|
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
|
|
)
|
|
@click.option(
|
|
"--depth-mode",
|
|
default="NEURAL",
|
|
type=click.Choice(["NEURAL", "ULTRA", "PERFORMANCE", "NONE"]),
|
|
help="Depth computation mode.",
|
|
)
|
|
@click.option(
|
|
"--depth-confidence-threshold",
|
|
default=50,
|
|
type=int,
|
|
help="Confidence threshold for depth filtering (lower = more confident).",
|
|
)
|
|
@click.option(
|
|
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
|
|
)
|
|
def main(
|
|
svo,
|
|
markers,
|
|
output,
|
|
sample_interval,
|
|
max_reproj_error,
|
|
preview,
|
|
validate_markers,
|
|
self_check,
|
|
verify_depth,
|
|
refine_depth,
|
|
depth_mode,
|
|
depth_confidence_threshold,
|
|
report_csv,
|
|
):
|
|
"""
|
|
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
|
|
"""
|
|
depth_mode_map = {
|
|
"NEURAL": sl.DEPTH_MODE.NEURAL,
|
|
"ULTRA": sl.DEPTH_MODE.ULTRA,
|
|
"PERFORMANCE": sl.DEPTH_MODE.PERFORMANCE,
|
|
"NONE": sl.DEPTH_MODE.NONE,
|
|
}
|
|
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
|
|
|
|
# 1. Load Marker Geometry
|
|
try:
|
|
marker_geometry = load_marker_geometry(markers)
|
|
if validate_markers:
|
|
validate_marker_geometry(marker_geometry)
|
|
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
|
|
except Exception as e:
|
|
click.echo(f"Error loading markers: {e}", err=True)
|
|
raise SystemExit(1)
|
|
|
|
if not svo:
|
|
if validate_markers:
|
|
click.echo("Marker validation successful. No SVOs provided, exiting.")
|
|
return
|
|
else:
|
|
click.echo(
|
|
"Error: --svo is required unless --validate-markers is used.", err=True
|
|
)
|
|
raise click.UsageError("Missing option '--svo' / '-s'.")
|
|
|
|
# 2. Initialize SVO Reader
|
|
reader = SVOReader(svo, depth_mode=sl_depth_mode)
|
|
if not reader.cameras:
|
|
click.echo("No SVO files could be opened.", err=True)
|
|
return
|
|
|
|
# Align SVOs
|
|
reader.sync_to_latest_start()
|
|
|
|
# Calculate max frames to process to avoid infinite loop
|
|
max_frames = 10000 # Default safety limit
|
|
if reader.cameras:
|
|
remaining = []
|
|
for i, cam in enumerate(reader.cameras):
|
|
total = reader.camera_info[i]["total_frames"]
|
|
if total > 0:
|
|
current = cam.get_svo_position()
|
|
remaining.append(total - current)
|
|
else:
|
|
# If any total_frames is unknown (<= 0), use a hard limit
|
|
remaining = [10000]
|
|
break
|
|
if remaining:
|
|
max_frames = min(remaining)
|
|
else:
|
|
click.echo(
|
|
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
|
|
)
|
|
|
|
serials = [info["serial"] for info in reader.camera_info]
|
|
accumulators = {serial: PoseAccumulator() for serial in serials}
|
|
camera_matrices = {
|
|
serial: build_camera_matrix_from_zed(cam)
|
|
for serial, cam in zip(serials, reader.cameras)
|
|
}
|
|
|
|
# Store verification frames for post-process check
|
|
verification_frames = {}
|
|
|
|
detector = create_detector()
|
|
|
|
frame_count = 0
|
|
sampled_count = 0
|
|
|
|
click.echo(f"Processing SVOs: {serials}")
|
|
|
|
try:
|
|
while frame_count < max_frames:
|
|
frames = reader.grab_synced()
|
|
if not any(frames):
|
|
break
|
|
|
|
if frame_count % sample_interval == 0:
|
|
preview_frames = {}
|
|
for i, frame in enumerate(frames):
|
|
if frame is None:
|
|
continue
|
|
|
|
serial = frame.serial_number
|
|
K = camera_matrices[serial]
|
|
|
|
# Detect markers
|
|
corners, ids = detect_markers(frame.image, detector)
|
|
|
|
if ids is None:
|
|
if preview:
|
|
preview_frames[serial] = frame.image
|
|
continue
|
|
|
|
# Estimate pose (T_cam_from_world)
|
|
pose_res = estimate_pose_from_detections(
|
|
corners, ids, marker_geometry, K, min_markers=4
|
|
)
|
|
|
|
if pose_res:
|
|
rvec, tvec, reproj_err, n_markers = pose_res
|
|
if reproj_err <= max_reproj_error:
|
|
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
|
|
# We want T_world_from_cam
|
|
T_world_cam = invert_transform(T_cam_world)
|
|
|
|
# Save latest valid frame for verification
|
|
if (
|
|
verify_depth or refine_depth
|
|
) and frame.depth_map is not None:
|
|
verification_frames[serial] = {
|
|
"frame": frame,
|
|
"ids": ids,
|
|
"corners": corners,
|
|
}
|
|
|
|
accumulators[serial].add_pose(
|
|
T_world_cam, reproj_err, frame_count
|
|
)
|
|
|
|
if preview:
|
|
img = draw_detected_markers(
|
|
frame.image.copy(), corners, ids
|
|
)
|
|
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
|
|
preview_frames[serial] = img
|
|
elif preview:
|
|
preview_frames[serial] = frame.image
|
|
|
|
if preview and preview_frames:
|
|
key = show_preview(preview_frames)
|
|
if key == 27 or key == ord("q"):
|
|
break
|
|
|
|
sampled_count += 1
|
|
|
|
frame_count += 1
|
|
if frame_count % 100 == 0:
|
|
counts = [len(acc.poses) for acc in accumulators.values()]
|
|
click.echo(
|
|
f"Frame {frame_count}, Detections: {dict(zip(serials, counts))}"
|
|
)
|
|
|
|
except KeyboardInterrupt:
|
|
click.echo("\nInterrupted by user.")
|
|
finally:
|
|
reader.close()
|
|
cv2.destroyAllWindows()
|
|
|
|
# 3. Compute Final Poses
|
|
results = {}
|
|
for serial, acc in accumulators.items():
|
|
if not acc.poses:
|
|
click.echo(f"Warning: No valid poses for camera {serial}")
|
|
continue
|
|
|
|
# Use RANSAC to find best consensus
|
|
inliers = acc.ransac_filter()
|
|
T_mean, stats = acc.compute_robust_mean(inliers)
|
|
|
|
# Flatten for JSON as space-separated string
|
|
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
|
|
|
|
results[str(serial)] = {"pose": pose_str, "stats": stats}
|
|
click.echo(
|
|
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
|
|
)
|
|
|
|
if not results:
|
|
click.echo("No extrinsics computed.", err=True)
|
|
return
|
|
|
|
# 4. Run Depth Verification if requested
|
|
csv_rows: List[List[Any]] = []
|
|
if verify_depth or refine_depth:
|
|
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
|
|
for serial, acc in accumulators.items():
|
|
if serial not in verification_frames or str(serial) not in results:
|
|
continue
|
|
|
|
# Retrieve stored frame data
|
|
vf = verification_frames[serial]
|
|
frame = vf["frame"]
|
|
ids = vf["ids"]
|
|
|
|
# Use the FINAL COMPUTED POSE for verification
|
|
pose_str = results[str(serial)]["pose"]
|
|
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
|
|
cam_matrix = camera_matrices[serial]
|
|
|
|
marker_corners_world = {
|
|
int(mid): marker_geometry[int(mid)]
|
|
for mid in ids.flatten()
|
|
if int(mid) in marker_geometry
|
|
}
|
|
|
|
if marker_corners_world and frame.depth_map is not None:
|
|
verify_res = verify_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
confidence_map=frame.confidence_map,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
results[str(serial)]["depth_verify"] = {
|
|
"rmse": verify_res.rmse,
|
|
"mean_abs": verify_res.mean_abs,
|
|
"median": verify_res.median,
|
|
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
|
|
"n_valid": verify_res.n_valid,
|
|
"n_total": verify_res.n_total,
|
|
}
|
|
|
|
click.echo(
|
|
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
|
|
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
|
|
)
|
|
|
|
if refine_depth:
|
|
if verify_res.n_valid < 4:
|
|
click.echo(
|
|
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
|
|
)
|
|
else:
|
|
click.echo(
|
|
f"Camera {serial}: Refining extrinsics with depth..."
|
|
)
|
|
T_refined, refine_stats = refine_extrinsics_with_depth(
|
|
T_mean,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
)
|
|
|
|
verify_res_post = verify_extrinsics_with_depth(
|
|
T_refined,
|
|
marker_corners_world,
|
|
frame.depth_map,
|
|
cam_matrix,
|
|
confidence_map=frame.confidence_map,
|
|
confidence_thresh=depth_confidence_threshold,
|
|
)
|
|
|
|
pose_str_refined = " ".join(
|
|
f"{x:.6f}" for x in T_refined.flatten()
|
|
)
|
|
results[str(serial)]["pose"] = pose_str_refined
|
|
results[str(serial)]["refine_depth"] = refine_stats
|
|
results[str(serial)]["depth_verify_post"] = {
|
|
"rmse": verify_res_post.rmse,
|
|
"mean_abs": verify_res_post.mean_abs,
|
|
"median": verify_res_post.median,
|
|
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
|
|
"n_valid": verify_res_post.n_valid,
|
|
"n_total": verify_res_post.n_total,
|
|
}
|
|
|
|
improvement = verify_res.rmse - verify_res_post.rmse
|
|
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
|
|
improvement
|
|
)
|
|
|
|
click.echo(
|
|
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
|
|
f"(Improved by {improvement:.3f}m). "
|
|
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
|
|
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
|
|
)
|
|
|
|
verify_res = verify_res_post
|
|
|
|
if report_csv:
|
|
for mid, cidx, resid in verify_res.residuals:
|
|
csv_rows.append([serial, mid, cidx, resid])
|
|
|
|
# 5. Save CSV Report
|
|
if report_csv and csv_rows:
|
|
import csv
|
|
|
|
with open(report_csv, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
|
|
writer.writerows(csv_rows)
|
|
click.echo(f"Saved depth verification report to {report_csv}")
|
|
|
|
# 6. Save to JSON
|
|
with open(output, "w") as f:
|
|
json.dump(results, f, indent=4, sort_keys=True)
|
|
click.echo(f"Saved extrinsics to {output}")
|
|
|
|
# 7. Optional Self-Check
|
|
if self_check:
|
|
# Verify reprojection error
|
|
for serial, data in results.items():
|
|
if data["stats"]["median_reproj_error"] > max_reproj_error:
|
|
click.echo(
|
|
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
|
|
err=True,
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
# Simple check: verify distance between cameras if multiple
|
|
if len(results) >= 2:
|
|
serials_list = sorted(results.keys())
|
|
for i in range(len(serials_list)):
|
|
for j in range(i + 1, len(serials_list)):
|
|
s1 = serials_list[i]
|
|
s2 = serials_list[j]
|
|
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
|
|
:3, 3
|
|
]
|
|
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
|
|
:3, 3
|
|
]
|
|
dist = np.linalg.norm(p1 - p2)
|
|
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() # pylint: disable=no-value-for-parameter
|