Files
zed-playground/py_workspace/calibrate_extrinsics.py
T
crosstyan 553cc457f0 refactor(cli): extract depth postprocess and add tests
- Extract apply_depth_verify_refine_postprocess() from main() for testability
- Add test_depth_cli_postprocess.py using mocks to validate JSON and CSV behavior
- Keeps CLI behavior unchanged
2026-02-05 05:07:48 +00:00

436 lines
15 KiB
Python

import click
import cv2
import json
import csv
import numpy as np
import pyzed.sl as sl
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from aruco.marker_geometry import load_marker_geometry, validate_marker_geometry
from aruco.svo_sync import SVOReader
from aruco.detector import (
create_detector,
detect_markers,
build_camera_matrix_from_zed,
estimate_pose_from_detections,
)
from aruco.pose_math import rvec_tvec_to_matrix, invert_transform, matrix_to_rvec_tvec
from aruco.pose_averaging import PoseAccumulator
from aruco.preview import draw_detected_markers, draw_pose_axes, show_preview
from aruco.depth_verify import verify_extrinsics_with_depth
from aruco.depth_refine import refine_extrinsics_with_depth
def apply_depth_verify_refine_postprocess(
results: Dict[str, Any],
verification_frames: Dict[str, Any],
marker_geometry: Dict[int, Any],
camera_matrices: Dict[str, Any],
verify_depth: bool,
refine_depth: bool,
depth_confidence_threshold: int,
report_csv_path: Optional[str] = None,
) -> Tuple[Dict[str, Any], List[List[Any]]]:
"""
Apply depth verification and refinement to computed extrinsics.
Returns updated results and list of CSV rows.
"""
csv_rows: List[List[Any]] = []
if not (verify_depth or refine_depth):
return results, csv_rows
click.echo("\nRunning depth verification/refinement on computed extrinsics...")
for serial, vf in verification_frames.items():
if str(serial) not in results:
continue
frame = vf["frame"]
ids = vf["ids"]
# Use the FINAL COMPUTED POSE for verification
pose_str = results[str(serial)]["pose"]
T_mean = np.fromstring(pose_str, sep=" ").reshape(4, 4)
cam_matrix = camera_matrices[serial]
marker_corners_world = {
int(mid): marker_geometry[int(mid)]
for mid in ids.flatten()
if int(mid) in marker_geometry
}
if marker_corners_world and frame.depth_map is not None:
verify_res = verify_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
results[str(serial)]["depth_verify"] = {
"rmse": verify_res.rmse,
"mean_abs": verify_res.mean_abs,
"median": verify_res.median,
"depth_normalized_rmse": verify_res.depth_normalized_rmse,
"n_valid": verify_res.n_valid,
"n_total": verify_res.n_total,
}
click.echo(
f"Camera {serial} verification: RMSE={verify_res.rmse:.3f}m, "
f"Valid={verify_res.n_valid}/{verify_res.n_total}"
)
if refine_depth:
if verify_res.n_valid < 4:
click.echo(
f"Camera {serial}: Not enough valid depth points for refinement ({verify_res.n_valid}). Skipping."
)
else:
click.echo(f"Camera {serial}: Refining extrinsics with depth...")
T_refined, refine_stats = refine_extrinsics_with_depth(
T_mean,
marker_corners_world,
frame.depth_map,
cam_matrix,
)
verify_res_post = verify_extrinsics_with_depth(
T_refined,
marker_corners_world,
frame.depth_map,
cam_matrix,
confidence_map=frame.confidence_map,
confidence_thresh=depth_confidence_threshold,
)
pose_str_refined = " ".join(f"{x:.6f}" for x in T_refined.flatten())
results[str(serial)]["pose"] = pose_str_refined
results[str(serial)]["refine_depth"] = refine_stats
results[str(serial)]["depth_verify_post"] = {
"rmse": verify_res_post.rmse,
"mean_abs": verify_res_post.mean_abs,
"median": verify_res_post.median,
"depth_normalized_rmse": verify_res_post.depth_normalized_rmse,
"n_valid": verify_res_post.n_valid,
"n_total": verify_res_post.n_total,
}
improvement = verify_res.rmse - verify_res_post.rmse
results[str(serial)]["refine_depth"]["improvement_rmse"] = (
improvement
)
click.echo(
f"Camera {serial} refined: RMSE={verify_res_post.rmse:.3f}m "
f"(Improved by {improvement:.3f}m). "
f"Delta Rot={refine_stats['delta_rotation_deg']:.2f}deg, "
f"Trans={refine_stats['delta_translation_norm_m']:.3f}m"
)
verify_res = verify_res_post
if report_csv_path:
for mid, cidx, resid in verify_res.residuals:
csv_rows.append([serial, mid, cidx, resid])
if report_csv_path and csv_rows:
with open(report_csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["serial", "marker_id", "corner_idx", "residual"])
writer.writerows(csv_rows)
click.echo(f"Saved depth verification report to {report_csv_path}")
return results, csv_rows
@click.command()
@click.option("--svo", "-s", multiple=True, required=False, help="Path to SVO files.")
@click.option("--markers", "-m", required=True, help="Path to markers parquet file.")
@click.option("--output", "-o", default="extrinsics.json", help="Output JSON file.")
@click.option(
"--sample-interval", "-n", default=30, type=int, help="Sample every N frames."
)
@click.option(
"--max-reproj-error",
"-e",
default=2.0,
type=float,
help="Max reprojection error for pose.",
)
@click.option("--preview/--no-preview", default=True, help="Show preview window.")
@click.option(
"--validate-markers/--no-validate", default=True, help="Validate marker geometry."
)
@click.option(
"--self-check/--no-self-check", default=False, help="Perform self-check on result."
)
@click.option(
"--verify-depth/--no-verify-depth", default=False, help="Enable depth verification."
)
@click.option(
"--refine-depth/--no-refine-depth", default=False, help="Enable depth refinement."
)
@click.option(
"--depth-mode",
default="NEURAL",
type=click.Choice(["NEURAL", "ULTRA", "PERFORMANCE", "NONE"]),
help="Depth computation mode.",
)
@click.option(
"--depth-confidence-threshold",
default=50,
type=int,
help="Confidence threshold for depth filtering (lower = more confident).",
)
@click.option(
"--report-csv", type=click.Path(), help="Optional path for per-frame CSV report."
)
def main(
svo,
markers,
output,
sample_interval,
max_reproj_error,
preview,
validate_markers,
self_check,
verify_depth,
refine_depth,
depth_mode,
depth_confidence_threshold,
report_csv,
):
"""
Calibrate camera extrinsics relative to a global coordinate system defined by ArUco markers.
"""
depth_mode_map = {
"NEURAL": sl.DEPTH_MODE.NEURAL,
"ULTRA": sl.DEPTH_MODE.ULTRA,
"PERFORMANCE": sl.DEPTH_MODE.PERFORMANCE,
"NONE": sl.DEPTH_MODE.NONE,
}
sl_depth_mode = depth_mode_map.get(depth_mode, sl.DEPTH_MODE.NONE)
if not (verify_depth or refine_depth):
sl_depth_mode = sl.DEPTH_MODE.NONE
# 1. Load Marker Geometry
try:
marker_geometry = load_marker_geometry(markers)
if validate_markers:
validate_marker_geometry(marker_geometry)
click.echo(f"Loaded {len(marker_geometry)} markers from {markers}")
except Exception as e:
click.echo(f"Error loading markers: {e}", err=True)
raise SystemExit(1)
if not svo:
if validate_markers:
click.echo("Marker validation successful. No SVOs provided, exiting.")
return
else:
click.echo(
"Error: --svo is required unless --validate-markers is used.", err=True
)
raise click.UsageError("Missing option '--svo' / '-s'.")
# 2. Initialize SVO Reader
reader = SVOReader(svo, depth_mode=sl_depth_mode)
if not reader.cameras:
click.echo("No SVO files could be opened.", err=True)
return
# Align SVOs
reader.sync_to_latest_start()
# Calculate max frames to process to avoid infinite loop
max_frames = 10000 # Default safety limit
if reader.cameras:
remaining = []
for i, cam in enumerate(reader.cameras):
total = reader.camera_info[i]["total_frames"]
if total > 0:
current = cam.get_svo_position()
remaining.append(total - current)
else:
# If any total_frames is unknown (<= 0), use a hard limit
remaining = [10000]
break
if remaining:
max_frames = min(remaining)
else:
click.echo(
"Warning: Could not determine SVO lengths, using safety limit of 10,000 frames."
)
serials = [info["serial"] for info in reader.camera_info]
accumulators = {serial: PoseAccumulator() for serial in serials}
camera_matrices = {
serial: build_camera_matrix_from_zed(cam)
for serial, cam in zip(serials, reader.cameras)
}
# Store verification frames for post-process check
verification_frames = {}
detector = create_detector()
frame_count = 0
sampled_count = 0
click.echo(f"Processing SVOs: {serials}")
try:
while frame_count < max_frames:
frames = reader.grab_synced()
if not any(frames):
break
if frame_count % sample_interval == 0:
preview_frames = {}
for i, frame in enumerate(frames):
if frame is None:
continue
serial = frame.serial_number
K = camera_matrices[serial]
# Detect markers
corners, ids = detect_markers(frame.image, detector)
if ids is None:
if preview:
preview_frames[serial] = frame.image
continue
# Estimate pose (T_cam_from_world)
pose_res = estimate_pose_from_detections(
corners, ids, marker_geometry, K, min_markers=4
)
if pose_res:
rvec, tvec, reproj_err, n_markers = pose_res
if reproj_err <= max_reproj_error:
T_cam_world = rvec_tvec_to_matrix(rvec, tvec)
# We want T_world_from_cam
T_world_cam = invert_transform(T_cam_world)
# Save latest valid frame for verification
if (
verify_depth or refine_depth
) and frame.depth_map is not None:
verification_frames[serial] = {
"frame": frame,
"ids": ids,
"corners": corners,
}
accumulators[serial].add_pose(
T_world_cam, reproj_err, frame_count
)
if preview:
img = draw_detected_markers(
frame.image.copy(), corners, ids
)
img = draw_pose_axes(img, rvec, tvec, K, length=0.2)
preview_frames[serial] = img
elif preview:
preview_frames[serial] = frame.image
if preview and preview_frames:
key = show_preview(preview_frames)
if key == 27 or key == ord("q"):
break
sampled_count += 1
frame_count += 1
if frame_count % 100 == 0:
counts = [len(acc.poses) for acc in accumulators.values()]
click.echo(
f"Frame {frame_count}, Detections: {dict(zip(serials, counts))}"
)
except KeyboardInterrupt:
click.echo("\nInterrupted by user.")
finally:
reader.close()
cv2.destroyAllWindows()
# 3. Compute Final Poses
results = {}
for serial, acc in accumulators.items():
if not acc.poses:
click.echo(f"Warning: No valid poses for camera {serial}")
continue
# Use RANSAC to find best consensus
inliers = acc.ransac_filter()
T_mean, stats = acc.compute_robust_mean(inliers)
# Flatten for JSON as space-separated string
pose_str = " ".join(f"{x:.6f}" for x in T_mean.flatten())
results[str(serial)] = {"pose": pose_str, "stats": stats}
click.echo(
f"Camera {serial}: {stats['n_inliers']}/{stats['n_total']} inliers, median error: {stats['median_reproj_error']:.3f}"
)
if not results:
click.echo("No extrinsics computed.", err=True)
return
# 4. Run Depth Verification if requested
apply_depth_verify_refine_postprocess(
results,
verification_frames,
marker_geometry,
camera_matrices,
verify_depth,
refine_depth,
depth_confidence_threshold,
report_csv,
)
# 6. Save to JSON
with open(output, "w") as f:
json.dump(results, f, indent=4, sort_keys=True)
click.echo(f"Saved extrinsics to {output}")
# 7. Optional Self-Check
if self_check:
# Verify reprojection error
for serial, data in results.items():
if data["stats"]["median_reproj_error"] > max_reproj_error:
click.echo(
f"Error: Camera {serial} failed self-check (median error {data['stats']['median_reproj_error']:.3f} > {max_reproj_error})",
err=True,
)
raise SystemExit(1)
# Simple check: verify distance between cameras if multiple
if len(results) >= 2:
serials_list = sorted(results.keys())
for i in range(len(serials_list)):
for j in range(i + 1, len(serials_list)):
s1 = serials_list[i]
s2 = serials_list[j]
p1 = np.fromstring(results[s1]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
p2 = np.fromstring(results[s2]["pose"], sep=" ").reshape(4, 4)[
:3, 3
]
dist = np.linalg.norm(p1 - p2)
click.echo(f"Self-check: Distance {s1} <-> {s2}: {dist:.3f}m")
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter