Files
zed-playground/workspaces/py_workspace/refine_ground_plane.py
2026-03-06 17:17:59 +08:00

412 lines
13 KiB
Python

import click
import json
import numpy as np
from pathlib import Path
from typing import Dict, Any, Optional
from loguru import logger
import sys
from aruco.ground_plane import (
GroundPlaneConfig,
refine_ground_from_depth,
create_ground_diagnostic_plot,
save_diagnostic_plot,
Mat44,
)
from aruco.depth_save import load_depth_data
from aruco.icp_registration import refine_with_icp, ICPConfig
@click.command()
@click.option(
"--input-extrinsics",
"-i",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="Input extrinsics JSON file.",
)
@click.option(
"--input-depth",
"-d",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="Input depth HDF5 file.",
)
@click.option(
"--output-extrinsics",
"-o",
required=True,
type=click.Path(dir_okay=False),
help="Output extrinsics JSON file.",
)
@click.option(
"--plot/--no-plot",
default=True,
help="Generate diagnostic plot.",
)
@click.option(
"--plot-output",
type=click.Path(dir_okay=False),
help="Path for diagnostic plot HTML (defaults to output_extrinsics_base.html).",
)
@click.option(
"--max-rotation-deg",
default=5.0,
help="Maximum allowed rotation correction in degrees.",
)
@click.option(
"--max-translation-m",
default=0.1,
help="Maximum allowed translation correction in meters.",
)
@click.option(
"--ransac-threshold",
default=0.02,
help="RANSAC distance threshold in meters.",
)
@click.option(
"--min-inlier-ratio",
default=0.0,
help="Minimum ratio of inliers to total points (0.0-1.0).",
)
@click.option(
"--height-range",
nargs=2,
type=float,
default=(0.2, 5.0),
help="Min and max height (depth) range in meters.",
)
@click.option(
"--stride",
default=8,
help="Pixel stride for depth sampling.",
)
@click.option(
"--seed",
type=int,
help="Random seed for RANSAC determinism.",
)
@click.option(
"--icp/--no-icp",
default=False,
help="Enable ICP refinement after ground plane alignment.",
)
@click.option(
"--icp-method",
type=click.Choice(["point_to_plane", "gicp"]),
default="point_to_plane",
help="ICP registration method.",
)
@click.option(
"--icp-voxel-size",
type=float,
default=0.02,
help="Voxel size for ICP downsampling (meters).",
)
@click.option(
"--icp-region",
type=click.Choice(["floor", "hybrid", "full"]),
default="hybrid",
help="Region to use for ICP registration.",
)
@click.option(
"--icp-global-init/--no-icp-global-init",
default=False,
help="Enable FPFH+RANSAC global pre-alignment.",
)
@click.option(
"--icp-min-overlap",
type=float,
default=0.5,
help="Minimum overlap area/volume to accept a pair.",
)
@click.option(
"--icp-band-height",
type=float,
default=0.3,
help="Height of the floor band (meters).",
)
@click.option(
"--icp-robust-kernel",
type=click.Choice(["none", "tukey"]),
default="none",
help="Robust kernel for ICP optimization.",
)
@click.option(
"--icp-robust-k",
type=float,
default=0.1,
help="Parameter k for robust kernel.",
)
@click.option(
"--icp-depth-bias/--no-icp-depth-bias",
default=True,
help="Enable per-camera depth bias pre-correction.",
)
@click.option(
"--icp-max-rotation-deg",
default=10.0,
help="Maximum allowed rotation correction for ICP in degrees.",
)
@click.option(
"--icp-max-translation-m",
default=0.3,
help="Maximum allowed translation correction for ICP in meters.",
)
@click.option(
"--debug/--no-debug",
default=False,
help="Enable debug logging.",
)
def main(
input_extrinsics: str,
input_depth: str,
output_extrinsics: str,
plot: bool,
plot_output: Optional[str],
max_rotation_deg: float,
max_translation_m: float,
ransac_threshold: float,
min_inlier_ratio: float,
height_range: tuple[float, float],
stride: int,
seed: Optional[int],
icp: bool,
icp_method: str,
icp_voxel_size: float,
icp_region: str,
icp_global_init: bool,
icp_min_overlap: float,
icp_band_height: float,
icp_robust_kernel: str,
icp_robust_k: float,
icp_depth_bias: bool,
icp_max_rotation_deg: float,
icp_max_translation_m: float,
debug: bool,
):
"""
Refine camera extrinsics by aligning the ground plane detected in depth maps.
Loads existing extrinsics and depth data, detects the floor plane in each camera,
and computes a correction transform to align the floor to Y=0.
"""
# Configure logging
logger.remove()
logger.add(
sys.stderr,
level="DEBUG" if debug else "INFO",
format="<green>{time:HH:mm:ss}</green> | <level>{message}</level>",
)
try:
# 1. Load Extrinsics
logger.info(f"Loading extrinsics from {input_extrinsics}")
with open(input_extrinsics, "r") as f:
extrinsics_data = json.load(f)
# Parse extrinsics into Dict[str, Mat44]
extrinsics: Dict[str, Mat44] = {}
for serial, data in extrinsics_data.items():
if serial == "_meta":
continue
if "pose" in data:
pose_str = data["pose"]
T = np.fromstring(pose_str, sep=" ").reshape(4, 4)
extrinsics[serial] = T
if not extrinsics:
raise click.UsageError("No valid camera poses found in input extrinsics.")
# 2. Load Depth Data
logger.info(f"Loading depth data from {input_depth}")
depth_data = load_depth_data(input_depth)
# Prepare camera data for refinement
camera_data_for_refine: Dict[str, Dict[str, Any]] = {}
for serial, data in depth_data.items():
# Use pooled depth if available, otherwise check raw frames
depth_map = data.get("pooled_depth")
if depth_map is None:
# Fallback to first raw frame if available
raw_frames = data.get("raw_frames", [])
if raw_frames:
depth_map = raw_frames[0].get("depth_map")
if depth_map is not None:
camera_data_for_refine[serial] = {
"depth": depth_map,
"K": data["intrinsics"],
}
if not camera_data_for_refine:
raise click.UsageError("No depth maps found in input depth file.")
# 3. Configure Refinement
config = GroundPlaneConfig(
enabled=True,
target_y=0.0,
stride=stride,
depth_min=height_range[0],
depth_max=height_range[1],
ransac_dist_thresh=ransac_threshold,
max_rotation_deg=max_rotation_deg,
max_translation_m=max_translation_m,
min_inlier_ratio=min_inlier_ratio,
seed=seed,
)
# 4. Run Refinement
logger.info("Running ground plane refinement...")
new_extrinsics, metrics = refine_ground_from_depth(
camera_data_for_refine, extrinsics, config
)
logger.info(f"Refinement result: {metrics.message}")
if metrics.success:
logger.info(f"Max rotation: {metrics.rotation_deg:.2f} deg")
logger.info(f"Max translation: {metrics.translation_m:.3f} m")
# 4.5 Optional ICP Refinement
icp_metrics = None
if icp:
logger.info(
f"Running ICP refinement ({icp_method}, region={icp_region})..."
)
icp_config = ICPConfig(
method=icp_method,
voxel_size=icp_voxel_size,
region=icp_region,
global_init=icp_global_init,
seed=seed,
min_overlap_area=icp_min_overlap,
band_height=icp_band_height,
robust_kernel=icp_robust_kernel,
robust_kernel_k=icp_robust_k,
depth_bias=icp_depth_bias,
max_rotation_deg=icp_max_rotation_deg,
max_translation_m=icp_max_translation_m,
)
icp_extrinsics, icp_metrics = refine_with_icp(
camera_data_for_refine,
new_extrinsics,
metrics.camera_planes,
icp_config,
)
if icp_metrics.success:
logger.info(f"ICP refinement successful: {icp_metrics.message}")
if icp_metrics.depth_biases:
logger.info(
f"Estimated depth biases (m): {icp_metrics.depth_biases}"
)
new_extrinsics = icp_extrinsics
else:
logger.warning(
f"ICP refinement failed or skipped: {icp_metrics.message}"
)
# 5. Save Output Extrinsics
output_data = extrinsics_data.copy()
per_camera_diagnostics = {}
for serial, T_new in new_extrinsics.items():
if serial in output_data:
pose_str = " ".join(f"{x:.6f}" for x in T_new.flatten())
output_data[serial]["pose"] = pose_str
if serial in metrics.camera_corrections:
T_corr = metrics.camera_corrections[serial]
trace = np.trace(T_corr[:3, :3])
cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0)
rot_deg = float(np.rad2deg(np.arccos(cos_angle)))
trans_m = float(np.linalg.norm(T_corr[:3, 3]))
per_camera_diagnostics[serial] = {
"corrected": True,
"delta_rot_deg": rot_deg,
"delta_trans_m": trans_m,
}
else:
per_camera_diagnostics[serial] = {
"corrected": False,
"reason": "skipped_or_failed",
}
if "_meta" not in output_data:
output_data["_meta"] = {}
output_data["_meta"]["ground_refined"] = {
"timestamp": str(np.datetime64("now")),
"config": {
"max_rotation_deg": max_rotation_deg,
"max_translation_m": max_translation_m,
"ransac_threshold": ransac_threshold,
"height_range": height_range,
},
"metrics": {
"success": metrics.success,
"num_cameras_total": metrics.num_cameras_total,
"num_cameras_valid": metrics.num_cameras_valid,
"correction_applied": metrics.correction_applied,
"max_rotation_deg": metrics.rotation_deg,
"max_translation_m": metrics.translation_m,
},
"per_camera": per_camera_diagnostics,
}
if icp_metrics:
output_data["_meta"]["icp_refined"] = {
"timestamp": str(np.datetime64("now")),
"config": {
"method": icp_method,
"voxel_size": icp_voxel_size,
"region": icp_region,
"global_init": icp_global_init,
"min_overlap": icp_min_overlap,
"band_height": icp_band_height,
"robust_kernel": icp_robust_kernel,
"robust_kernel_k": icp_robust_k,
"depth_bias": icp_depth_bias,
},
"metrics": {
"success": icp_metrics.success,
"num_pairs_attempted": icp_metrics.num_pairs_attempted,
"num_pairs_converged": icp_metrics.num_pairs_converged,
"num_cameras_optimized": icp_metrics.num_cameras_optimized,
"num_disconnected": icp_metrics.num_disconnected,
"depth_biases": icp_metrics.depth_biases,
"message": icp_metrics.message,
},
}
logger.info(f"Saving refined extrinsics to {output_extrinsics}")
with open(output_extrinsics, "w") as f:
json.dump(output_data, f, indent=4, sort_keys=True)
# 6. Generate Plot (Optional)
if plot:
if not plot_output:
plot_output = str(Path(output_extrinsics).with_suffix(".html"))
logger.info(f"Generating diagnostic plot to {plot_output}")
fig = create_ground_diagnostic_plot(
metrics,
camera_data_for_refine,
extrinsics, # before
new_extrinsics, # after
)
save_diagnostic_plot(fig, plot_output)
except Exception as e:
logger.error(f"Error: {e}")
if debug:
raise
sys.exit(1)
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter