refactor: things

This commit is contained in:
2026-03-06 17:17:59 +08:00
parent 8c6087683f
commit 33ab1a5d9d
171 changed files with 293 additions and 29894 deletions
+320
View File
@@ -0,0 +1,320 @@
import numpy as np
from loguru import logger
from jaxtyping import Float
from typing import TYPE_CHECKING
# Type aliases for shape-aware annotations
if TYPE_CHECKING:
Vec3 = Float[np.ndarray, "3"]
Mat33 = Float[np.ndarray, "3 3"]
Mat44 = Float[np.ndarray, "4 4"]
CornersNC = Float[np.ndarray, "N 3"]
else:
Vec3 = np.ndarray
Mat33 = np.ndarray
Mat44 = np.ndarray
CornersNC = np.ndarray
def compute_face_normal(corners: CornersNC) -> Vec3:
"""
Compute the normal vector of a face defined by its corners.
Assumes corners are in order (e.g., clockwise or counter-clockwise).
Args:
corners: (N, 3) array of corner coordinates.
Returns:
(3,) normalized normal vector.
"""
if corners.ndim != 2 or corners.shape[1] != 3:
raise ValueError(f"Expected (N, 3) array, got {corners.shape}")
if corners.shape[0] < 3:
raise ValueError("At least 3 corners are required to compute a normal.")
# Use the cross product of two edges
# Stable edge definition for quad faces consistent with plan/repo convention:
# normal from (corners[1]-corners[0]) x (corners[3]-corners[0]) when N>=4,
# fallback to index 2 if N==3.
v1 = corners[1] - corners[0]
if corners.shape[0] >= 4:
v2 = corners[3] - corners[0]
else:
v2 = corners[2] - corners[0]
normal = np.cross(v1, v2)
norm = np.linalg.norm(normal)
if norm < 1e-10:
raise ValueError("Corners are collinear or degenerate; cannot compute normal.")
return (normal / norm).astype(np.float64)
def rotation_align_vectors(from_vec: Vec3, to_vec: Vec3) -> Mat33:
"""
Compute the 3x3 rotation matrix that aligns from_vec to to_vec.
Args:
from_vec: (3,) source vector.
to_vec: (3,) target vector.
Returns:
(3, 3) rotation matrix.
"""
if from_vec.shape != (3,) or to_vec.shape != (3,):
raise ValueError(
f"Expected (3,) vectors, got {from_vec.shape} and {to_vec.shape}"
)
norm_from = np.linalg.norm(from_vec)
norm_to = np.linalg.norm(to_vec)
if norm_from < 1e-10 or norm_to < 1e-10:
raise ValueError("Cannot align zero-norm vectors.")
# Normalize inputs
a = from_vec / norm_from
b = to_vec / norm_to
v = np.cross(a, b)
c = np.dot(a, b)
s = np.linalg.norm(v)
# Handle parallel case
if s < 1e-10:
if c > 0:
return np.eye(3, dtype=np.float64)
else:
# Anti-parallel case: 180 degree rotation around an orthogonal axis
# Find an orthogonal axis
if abs(a[0]) < 0.9:
ortho = np.array([1.0, 0.0, 0.0])
else:
ortho = np.array([0.0, 1.0, 0.0])
axis = np.cross(a, ortho)
axis /= np.linalg.norm(axis)
# Rodrigues formula for 180 degrees
K = np.array(
[
[0.0, -axis[2], axis[1]],
[axis[2], 0.0, -axis[0]],
[-axis[1], axis[0], 0.0],
]
)
return (np.eye(3) + 2 * (K @ K)).astype(np.float64)
# General case using Rodrigues' rotation formula
# R = I + [v]_x + [v]_x^2 * (1-c)/s^2
vx = np.array([[0.0, -v[2], v[1]], [v[2], 0.0, -v[0]], [-v[1], v[0], 0.0]])
R = np.eye(3) + vx + (vx @ vx) * ((1 - c) / (s**2))
return R.astype(np.float64)
def apply_alignment_to_pose(T: Mat44, R_align: Mat33) -> Mat44:
"""
Apply an alignment rotation to a 4x4 pose matrix.
The alignment is applied in the global frame (pre-multiplication of rotation).
Args:
T: (4, 4) homogeneous transformation matrix.
R_align: (3, 3) alignment rotation matrix.
Returns:
(4, 4) aligned transformation matrix.
"""
if T.shape != (4, 4):
raise ValueError(f"Expected 4x4 matrix, got {T.shape}")
if R_align.shape != (3, 3):
raise ValueError(f"Expected 3x3 matrix, got {R_align.shape}")
T_align = np.eye(4, dtype=np.float64)
T_align[:3, :3] = R_align
return (T_align @ T).astype(np.float64)
def estimate_up_vector_from_cameras(camera_poses: list[Mat44]) -> Vec3:
"""
Estimate the 'up' vector of the scene based on camera positions.
Assumes cameras are arranged roughly in a horizontal ring (coplanar).
The normal of the plane fitting the camera centers is used as the up vector.
The sign is disambiguated using the average camera 'up' vector (-Y in OpenCV).
Args:
camera_poses: List of (4, 4) camera-to-world transformation matrices.
Returns:
(3,) normalized up vector.
"""
if not camera_poses:
raise ValueError("No camera poses provided.")
# Extract camera centers (translations)
centers = np.array([T[:3, 3] for T in camera_poses])
# Calculate average camera 'up' vector (assuming OpenCV convention: Y is down, so up is -Y)
# T[:3, 1] is the Y axis direction in world frame
# We want the vector pointing UP in world coordinates.
# In OpenCV camera frame, Y is down. So -Y is up.
# The world-frame representation of the camera's -Y axis is -R[:, 1]
# T[:3, 1] is the second column of the rotation matrix (Y axis).
avg_cam_up = np.mean([-T[:3, 1] for T in camera_poses], axis=0)
norm = np.linalg.norm(avg_cam_up)
if norm > 1e-6:
avg_cam_up /= norm
else:
avg_cam_up = np.array([0.0, 1.0, 0.0]) # Fallback
# If fewer than 3 cameras, we can't reliably fit a plane.
# Fallback to average camera up vector.
if len(camera_poses) < 3:
logger.debug("Fewer than 3 cameras; using average camera -Y as up vector.")
return avg_cam_up
# Fit plane to camera centers using SVD
centroid = np.mean(centers, axis=0)
centered = centers - centroid
# Check if points are collinear or coincident (rank check)
# If they are collinear, plane is undefined.
if np.linalg.matrix_rank(centered) < 2:
logger.debug(
"Camera centers are collinear; using average camera -Y as up vector."
)
return avg_cam_up
try:
u, s, vh = np.linalg.svd(centered)
# The normal is the singular vector corresponding to the smallest singular value
normal = vh[2, :]
except np.linalg.LinAlgError:
logger.warning("SVD failed; using average camera -Y as up vector.")
return avg_cam_up
# Disambiguate sign: choose the normal that aligns best with average camera up
if np.dot(normal, avg_cam_up) < 0:
normal = -normal
return normal
def get_face_normal_from_geometry(
face_name: str,
marker_geometry: dict[int, np.ndarray],
face_marker_map: dict[str, list[int]] | None = None,
) -> Vec3 | None:
"""
Compute the average normal vector for a face based on available marker geometry.
Args:
face_name: Name of the face (key in face_marker_map).
marker_geometry: Dictionary mapping marker IDs to their (4, 3) corner coordinates.
face_marker_map: Dictionary mapping face names to marker IDs.
Returns:
(3,) normalized average normal vector, or None if no markers are available.
"""
if face_marker_map is None:
return None
if face_name not in face_marker_map:
return None
marker_ids = face_marker_map[face_name]
normals = []
for mid in marker_ids:
if mid in marker_geometry:
try:
n = compute_face_normal(marker_geometry[mid])
normals.append(n)
except ValueError:
continue
if not normals:
return None
avg_normal = np.mean(normals, axis=0)
norm = np.linalg.norm(avg_normal)
if norm < 1e-10:
return None
return (avg_normal / norm).astype(np.float64)
def detect_ground_face(
visible_marker_ids: set[int],
marker_geometry: dict[int, np.ndarray],
camera_up_vector: Vec3 = np.array([0, -1, 0]),
face_marker_map: dict[str, list[int]] | None = None,
) -> tuple[str, Vec3] | None:
"""
Detect which face of the object is most likely the ground face.
The ground face is the one whose normal is most aligned with the camera's up vector.
Args:
visible_marker_ids: Set of marker IDs currently visible.
marker_geometry: Dictionary mapping marker IDs to their (4, 3) corner coordinates.
camera_up_vector: (3,) vector representing the 'up' direction in camera frame.
face_marker_map: Dictionary mapping face names to marker IDs.
Returns:
Tuple of (best_face_name, best_face_normal) or None if no face is detected.
"""
if not visible_marker_ids:
return None
if face_marker_map is None:
return None
if camera_up_vector.shape != (3,):
raise ValueError(
f"Expected (3,) camera_up_vector, got {camera_up_vector.shape}"
)
up_norm = np.linalg.norm(camera_up_vector)
if up_norm < 1e-10:
raise ValueError("camera_up_vector cannot be zero-norm.")
up_vec = camera_up_vector / up_norm
best_face = None
best_normal = None
best_score = -np.inf
# Iterate faces in mapping
for face_name, face_marker_ids in face_marker_map.items():
# We check ALL faces for which we have geometry, regardless of visibility.
# This allows detecting the ground face even if it's occluded,
# provided we have geometry for it (e.g. from a loaded model or previous detections).
# However, get_face_normal_from_geometry requires marker_geometry to contain the markers.
# If marker_geometry only contains *visible* markers (which is typical if passed from detection),
# then we are limited to visible faces.
# But if marker_geometry is the full loaded geometry, we can check all faces.
normal = get_face_normal_from_geometry(
face_name, marker_geometry, face_marker_map=face_marker_map
)
if normal is None:
continue
# Score by dot(normal, normalized camera_up_vector)
score = np.dot(normal, up_vec)
logger.debug(f"Face '{face_name}' considered. Score: {score:.4f}")
if score > best_score:
best_score = score
best_face = face_name
best_normal = normal
if best_face is not None and best_normal is not None:
logger.debug(
f"Best ground face detected: '{best_face}' with score {best_score:.4f}"
)
return best_face, best_normal
return None
@@ -0,0 +1,95 @@
from dataclasses import dataclass
# Order of detection result
# 0, 1, 2, 3
# TL, TR, BR, BL
# RED, GREEN, BLUE, YELLOW
@dataclass
class DiamondBoardParameter:
marker_leghth: float
"""
the ArUco marker length in meter
"""
chess_length: float
"""
the length of the chess board in meter
"""
border_length: float = 0.01
"""
border_length in m, default is 1cm
"""
@property
def marker_border_length(self):
assert self.chess_length > self.marker_leghth
return (self.chess_length - self.marker_leghth) / 2
@property
def total_side_length(self):
assert self.chess_length > self.marker_leghth
return self.marker_border_length * 2 + self.chess_length * 3
# 9mm + 127mm + 127mm (97mm marker) + 127mm + 10mm
# i.e. marker boarder = 127mm - 97mm = 30mm (15mm each side)
Point2D = tuple[float, float]
Quad2D = tuple[Point2D, Point2D, Point2D, Point2D]
@dataclass
class ArUcoMarker:
id: int
corners: Quad2D
# let's let TL be the origin
def generate_diamond_corners(
ids: tuple[int, int, int, int], params: DiamondBoardParameter
):
"""
A diamond chess board, which could be count as a kind of ChArUco board
C | 0 | C
---------
1 | C | 2
---------
C | 3 | C
where C is the chess box, and 0, 1, 2, 3 are the markers (whose ids are passed in order)
Args:
ids: a tuple of 4 ids of the markers
params: DiamondBoardParameter
"""
def tl_to_square(tl_x: float, tl_y: float, side_length: float) -> Quad2D:
return (
(tl_x, tl_y),
(tl_x + side_length, tl_y),
(tl_x + side_length, tl_y + side_length),
(tl_x, tl_y + side_length),
)
tl_0_x = params.border_length + params.chess_length + params.marker_border_length
tl_0_y = params.border_length + params.marker_border_length
tl_1_x = params.border_length + params.marker_border_length
tl_1_y = params.border_length + params.chess_length + params.marker_border_length
tl_2_x = (
params.border_length + params.chess_length * 2 + params.marker_border_length
)
tl_2_y = tl_1_y
tl_3_x = params.border_length + params.chess_length + params.marker_border_length
tl_3_y = (
params.border_length + params.chess_length * 2 + params.marker_border_length
)
return (
ArUcoMarker(ids[0], tl_to_square(tl_0_x, tl_0_y, params.marker_leghth)),
ArUcoMarker(ids[1], tl_to_square(tl_1_x, tl_1_y, params.marker_leghth)),
ArUcoMarker(ids[2], tl_to_square(tl_2_x, tl_2_y, params.marker_leghth)),
ArUcoMarker(ids[3], tl_to_square(tl_3_x, tl_3_y, params.marker_leghth)),
)
@@ -0,0 +1,89 @@
import numpy as np
def pool_depth_maps(
depth_maps: list[np.ndarray],
confidence_maps: list[np.ndarray] | None = None,
confidence_thresh: float = 50.0,
min_valid_count: int = 1,
) -> tuple[np.ndarray, np.ndarray | None]:
"""
Pool multiple depth maps into a single depth map using per-pixel median.
Args:
depth_maps: List of depth maps (H, W) in meters.
confidence_maps: Optional list of confidence maps (H, W).
ZED semantics: lower is better, 100 is often invalid/occluded.
confidence_thresh: Confidence values > threshold are considered invalid.
min_valid_count: Minimum number of valid depth values required to produce a pooled value.
Returns:
Tuple of (pooled_depth_map, pooled_confidence_map).
pooled_depth_map: (H, W) array with median depth or NaN.
pooled_confidence_map: (H, W) array with per-pixel minimum confidence, or None.
Raises:
ValueError: If depth_maps is empty or shapes are inconsistent.
"""
if not depth_maps:
raise ValueError("depth_maps list cannot be empty")
n_maps = len(depth_maps)
shape = depth_maps[0].shape
for i, dm in enumerate(depth_maps):
if dm.shape != shape:
raise ValueError(
f"Depth map {i} has inconsistent shape {dm.shape} != {shape}"
)
if confidence_maps:
if len(confidence_maps) != n_maps:
raise ValueError(
f"Number of confidence maps ({len(confidence_maps)}) "
+ f"must match number of depth maps ({n_maps})"
)
for i, cm in enumerate(confidence_maps):
if cm.shape != shape:
raise ValueError(
f"Confidence map {i} has inconsistent shape {cm.shape} != {shape}"
)
if n_maps == 1:
pooled_depth = depth_maps[0].copy()
invalid_mask = ~np.isfinite(pooled_depth) | (pooled_depth <= 0)
if confidence_maps:
invalid_mask |= confidence_maps[0] > confidence_thresh
pooled_depth[invalid_mask] = np.nan
if min_valid_count > 1:
pooled_depth[:] = np.nan
pooled_conf = confidence_maps[0].copy() if confidence_maps else None
return pooled_depth, pooled_conf
depth_stack = np.stack(depth_maps, axis=0)
valid_mask = np.isfinite(depth_stack) & (depth_stack > 0)
conf_stack = None
if confidence_maps:
conf_stack = np.stack(confidence_maps, axis=0)
valid_mask &= conf_stack <= confidence_thresh
masked_depths = depth_stack.copy()
masked_depths[~valid_mask] = np.nan
valid_counts = np.sum(valid_mask, axis=0)
with np.errstate(invalid="ignore"):
pooled_depth = np.nanmedian(masked_depths, axis=0)
pooled_depth[valid_counts < min_valid_count] = np.nan
pooled_conf = None
if conf_stack is not None:
pooled_conf = np.min(conf_stack, axis=0)
return pooled_depth, pooled_conf
@@ -0,0 +1,227 @@
import numpy as np
from typing import Dict, Tuple, Any, Optional, List
from scipy.optimize import least_squares
from .pose_math import rvec_tvec_to_matrix, matrix_to_rvec_tvec
from .depth_verify import (
compute_depth_residual,
get_confidence_weight,
project_point_to_pixel,
)
def extrinsics_to_params(T: np.ndarray) -> np.ndarray:
rvec, tvec = matrix_to_rvec_tvec(T)
return np.concatenate([rvec, tvec])
def params_to_extrinsics(params: np.ndarray) -> np.ndarray:
rvec = params[:3]
tvec = params[3:]
return rvec_tvec_to_matrix(rvec, tvec)
def depth_residuals(
params: np.ndarray,
active_corners: List[np.ndarray],
depth_map: np.ndarray,
K: np.ndarray,
initial_params: np.ndarray,
reg_rot: float = 0.1,
reg_trans: float = 1.0,
confidence_map: Optional[np.ndarray] = None,
confidence_thresh: float = 100.0,
) -> np.ndarray:
T = params_to_extrinsics(params)
residuals = []
for corner in active_corners:
residual = compute_depth_residual(corner, T, depth_map, K, window_size=5)
if residual is None:
# If a point becomes invalid during optimization, assign a large residual
# to discourage the optimizer from going there, but keep the vector length fixed.
# 10.0m is a safe "large" value for depth residuals.
residual = 10.0
else:
if confidence_map is not None:
u, v = project_point_to_pixel(
(np.linalg.inv(T) @ np.append(corner, 1.0))[:3], K
)
if u is not None and v is not None:
h, w = confidence_map.shape[:2]
if 0 <= u < w and 0 <= v < h:
conf = confidence_map[v, u]
weight = get_confidence_weight(conf, confidence_thresh)
residual *= np.sqrt(weight)
residuals.append(residual)
# Regularization as pseudo-residuals
param_diff = params - initial_params
# Rotation regularization (first 3 params)
if reg_rot > 0:
residuals.extend(param_diff[:3] * reg_rot)
# Translation regularization (last 3 params)
if reg_trans > 0:
residuals.extend(param_diff[3:] * reg_trans)
return np.array(residuals)
def refine_extrinsics_with_depth(
T_initial: np.ndarray,
marker_corners_world: Dict[int, np.ndarray],
depth_map: np.ndarray,
K: np.ndarray,
max_translation_m: float = 0.05,
max_rotation_deg: float = 5.0,
regularization_weight: float = 0.1,
loss: str = "soft_l1",
f_scale: float = 0.1,
reg_rot: float | None = None,
reg_trans: float | None = None,
confidence_map: Optional[np.ndarray] = None,
confidence_thresh: float = 100.0,
) -> Tuple[np.ndarray, dict[str, Any]]:
initial_params = extrinsics_to_params(T_initial)
# Handle legacy regularization_weight if specific weights aren't provided
# Default behavior: reg_rot = weight, reg_trans = weight * 10
# This matches the plan's default of (0.1, 1.0) when weight is 0.1
if reg_rot is None:
reg_rot = regularization_weight
if reg_trans is None:
reg_trans = regularization_weight * 10.0
# Check for valid depth points first
active_corners = []
n_points_total = 0
n_depth_valid = 0
n_confidence_rejected = 0
for marker_id, corners in marker_corners_world.items():
for corner in corners:
n_points_total += 1
res = compute_depth_residual(corner, T_initial, depth_map, K, window_size=5)
if res is not None:
n_depth_valid += 1
is_confident = True
if confidence_map is not None:
u, v = project_point_to_pixel(
(np.linalg.inv(T_initial) @ np.append(corner, 1.0))[:3], K
)
if u is not None and v is not None:
h, w = confidence_map.shape[:2]
if 0 <= u < w and 0 <= v < h:
conf = confidence_map[v, u]
weight = get_confidence_weight(conf, confidence_thresh)
if weight <= 0:
n_confidence_rejected += 1
is_confident = False
if is_confident:
active_corners.append(corner)
if not active_corners:
return T_initial, {
"success": False,
"reason": "no_valid_depth_points",
"initial_cost": 0.0,
"final_cost": 0.0,
"iterations": 0,
"delta_rotation_deg": 0.0,
"delta_translation_norm_m": 0.0,
"termination_message": "No valid depth points found at marker corners",
"termination_status": -1,
"nfev": 0,
"njev": 0,
"optimality": 0.0,
"n_active_bounds": 0,
"active_mask": np.zeros(6, dtype=int),
"cost": 0.0,
"n_points_total": n_points_total,
"n_depth_valid": n_depth_valid,
"n_confidence_rejected": n_confidence_rejected,
"loss_function": loss,
"f_scale": f_scale,
}
max_rotation_rad = np.deg2rad(max_rotation_deg)
lower_bounds = initial_params.copy()
upper_bounds = initial_params.copy()
lower_bounds[:3] -= max_rotation_rad
upper_bounds[:3] += max_rotation_rad
lower_bounds[3:] -= max_translation_m
upper_bounds[3:] += max_translation_m
bounds = (lower_bounds, upper_bounds)
result = least_squares(
depth_residuals,
initial_params,
args=(
active_corners,
depth_map,
K,
initial_params,
reg_rot,
reg_trans,
confidence_map,
confidence_thresh,
),
method="trf",
loss=loss,
f_scale=f_scale,
bounds=bounds,
x_scale="jac",
max_nfev=200,
)
T_refined = params_to_extrinsics(result.x)
delta_params = result.x - initial_params
delta_rotation_rad = np.linalg.norm(delta_params[:3])
delta_rotation_deg = np.rad2deg(delta_rotation_rad)
delta_translation = np.linalg.norm(delta_params[3:])
# Calculate initial cost for comparison
initial_residuals = depth_residuals(
initial_params,
active_corners,
depth_map,
K,
initial_params,
reg_rot,
reg_trans,
confidence_map,
confidence_thresh,
)
initial_cost = 0.5 * np.sum(initial_residuals**2)
stats = {
"iterations": result.nfev,
"success": result.success,
"initial_cost": float(initial_cost),
"final_cost": float(result.cost),
"delta_rotation_deg": float(delta_rotation_deg),
"delta_translation_norm_m": float(delta_translation),
"termination_message": result.message,
"termination_status": int(result.status),
"nfev": int(result.nfev),
"njev": int(getattr(result, "njev", 0)),
"optimality": float(result.optimality),
"n_active_bounds": int(np.sum(result.active_mask != 0)),
"active_mask": result.active_mask.tolist()
if hasattr(result.active_mask, "tolist")
else result.active_mask,
"cost": float(result.cost),
"n_points_total": n_points_total,
"n_depth_valid": n_depth_valid,
"n_confidence_rejected": n_confidence_rejected,
"loss_function": loss,
"f_scale": f_scale,
}
return T_refined, stats
+181
View File
@@ -0,0 +1,181 @@
import h5py
import numpy as np
import json
import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional, cast
from loguru import logger
def save_depth_data(
path: str | Path, camera_data: Dict[str, Any], schema_version: int = 1
) -> None:
"""
Save depth data to HDF5 format.
Args:
path: Output file path
camera_data: Dictionary mapping serial numbers to camera data
schema_version: Schema version number (default: 1)
"""
path = Path(path)
# Ensure parent directory exists
if not path.parent.exists():
raise FileNotFoundError(f"Parent directory does not exist: {path.parent}")
with h5py.File(path, "w") as f:
# Global metadata
meta = f.create_group("meta")
meta.attrs["schema_version"] = schema_version
meta.attrs["units"] = "meters"
meta.attrs["coordinate_frame"] = "world_from_cam"
meta.attrs["created_at"] = datetime.datetime.now().isoformat()
# Per-camera data
cameras = f.create_group("cameras")
for serial, data in camera_data.items():
cam_group = cameras.create_group(serial)
# Intrinsics and resolution
cam_group.create_dataset("intrinsics", data=data["intrinsics"])
cam_group.attrs["resolution"] = data["resolution"]
# Pooled depth map (compressed)
cam_group.create_dataset(
"pooled_depth",
data=data["pooled_depth"],
compression="gzip",
compression_opts=4,
shuffle=True,
)
# Optional pooled confidence
if "pooled_confidence" in data and data["pooled_confidence"] is not None:
cam_group.create_dataset(
"pooled_confidence",
data=data["pooled_confidence"],
compression="gzip",
compression_opts=4,
shuffle=True,
)
# Pool metadata (JSON serialized)
if data.get("pool_metadata"):
cam_group.attrs["pool_metadata"] = json.dumps(data["pool_metadata"])
# Raw frames
if "raw_frames" in data and data["raw_frames"]:
raw_group = cam_group.create_group("raw_frames")
for i, frame in enumerate(data["raw_frames"]):
frame_group = raw_group.create_group(f"frame_{i}")
frame_group.attrs["frame_index"] = frame["frame_index"]
frame_group.attrs["score"] = frame["score"]
if "depth_map" in frame and frame["depth_map"] is not None:
frame_group.create_dataset(
"depth_map",
data=frame["depth_map"],
compression="gzip",
compression_opts=4,
shuffle=True,
)
if (
"confidence_map" in frame
and frame["confidence_map"] is not None
):
frame_group.create_dataset(
"confidence_map",
data=frame["confidence_map"],
compression="gzip",
compression_opts=4,
shuffle=True,
)
def load_depth_data(path: str | Path) -> Dict[str, Any]:
"""
Load depth data from HDF5 file.
Args:
path: Input file path
Returns:
Dictionary mapping serial numbers to camera data
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
results = {}
with h5py.File(path, "r") as f:
if "cameras" not in f:
return {}
cameras_group = f["cameras"]
if not isinstance(cameras_group, h5py.Group):
return {}
for serial in cameras_group:
cam_group = cameras_group[serial]
if not isinstance(cam_group, h5py.Group):
continue
# We use type: ignore here because h5py dataset slicing returns numpy arrays
# but type checkers struggle with the dynamic nature of h5py
cam_data: Dict[str, Any] = {
"intrinsics": np.array(cam_group["intrinsics"]),
"resolution": tuple(cast(Any, cam_group.attrs["resolution"])),
"pooled_depth": np.array(cam_group["pooled_depth"]),
}
if "pooled_confidence" in cam_group:
cam_data["pooled_confidence"] = np.array(cam_group["pooled_confidence"])
if "pool_metadata" in cam_group.attrs:
metadata_str = cam_group.attrs["pool_metadata"]
if isinstance(metadata_str, str):
cam_data["pool_metadata"] = json.loads(metadata_str)
elif isinstance(metadata_str, bytes):
cam_data["pool_metadata"] = json.loads(metadata_str.decode("utf-8"))
else:
cam_data["pool_metadata"] = None
else:
cam_data["pool_metadata"] = None
# Load raw frames
raw_frames: List[Dict[str, Any]] = []
if "raw_frames" in cam_group:
raw_group = cam_group["raw_frames"]
if isinstance(raw_group, h5py.Group):
# Sort by frame index to maintain order if needed, though keys are frame_0, frame_1...
# We'll just iterate sorted keys
for key in sorted(
raw_group.keys(), key=lambda x: int(x.split("_")[1])
):
frame_group = raw_group[key]
if not isinstance(frame_group, h5py.Group):
continue
frame_data: Dict[str, Any] = {
"frame_index": frame_group.attrs["frame_index"],
"score": frame_group.attrs["score"],
}
if "depth_map" in frame_group:
frame_data["depth_map"] = np.array(frame_group["depth_map"])
if "confidence_map" in frame_group:
frame_data["confidence_map"] = np.array(
frame_group["confidence_map"]
)
raw_frames.append(frame_data)
cam_data["raw_frames"] = raw_frames
results[serial] = cam_data
return results
@@ -0,0 +1,179 @@
import numpy as np
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
from .pose_math import invert_transform
@dataclass
class DepthVerificationResult:
residuals: List[Tuple[int, int, float]] # (marker_id, corner_idx, residual)
rmse: float
mean_abs: float
median: float
depth_normalized_rmse: float
n_valid: int
n_total: int
def project_point_to_pixel(P_cam: np.ndarray, K: np.ndarray):
X, Y, Z = P_cam
if Z <= 0:
return None, None
u = int(round(K[0, 0] * X / Z + K[0, 2]))
v = int(round(K[1, 1] * Y / Z + K[1, 2]))
return u, v
def get_confidence_weight(confidence: float, threshold: float = 100.0) -> float:
"""
Convert ZED confidence value to a weight in [0, 1].
ZED semantics: 1 is most confident, 100 is least confident.
"""
if not np.isfinite(confidence) or confidence < 0:
return 0.0
# Linear weight from 1.0 (at confidence=0) to 0.0 (at confidence=threshold)
weight = 1.0 - (confidence / threshold)
return float(np.clip(weight, 0.0, 1.0))
def compute_depth_residual(
P_world: np.ndarray,
T_world_cam: np.ndarray,
depth_map: np.ndarray,
K: np.ndarray,
window_size: int = 5,
) -> Optional[float]:
T_cam_world = invert_transform(T_world_cam)
P_world_h = np.append(P_world, 1.0)
P_cam = (T_cam_world @ P_world_h)[:3]
z_predicted = P_cam[2]
if z_predicted <= 0:
return None
u, v = project_point_to_pixel(P_cam, K)
if u is None or v is None:
return None
h, w = depth_map.shape[:2]
if u < 0 or u >= w or v < 0 or v >= h:
return None
if window_size <= 1:
z_measured = depth_map[v, u]
else:
half = window_size // 2
x_min = max(0, u - half)
x_max = min(w, u + half + 1)
y_min = max(0, v - half)
y_max = min(h, v + half + 1)
window = depth_map[y_min:y_max, x_min:x_max]
valid_depths = window[np.isfinite(window) & (window > 0)]
if len(valid_depths) == 0:
return None
z_measured = np.median(valid_depths)
if not np.isfinite(z_measured) or z_measured <= 0:
return None
return float(z_measured - z_predicted)
def compute_marker_corner_residuals(
T_world_cam: np.ndarray,
marker_corners_world: Dict[int, np.ndarray],
depth_map: np.ndarray,
K: np.ndarray,
confidence_map: Optional[np.ndarray] = None,
confidence_thresh: float = 50,
) -> List[Tuple[int, int, float]]:
detailed_residuals = []
for marker_id, corners in marker_corners_world.items():
for corner_idx, corner in enumerate(corners):
# Check confidence if map is provided
if confidence_map is not None:
T_cam_world = invert_transform(T_world_cam)
P_world_h = np.append(corner, 1.0)
P_cam = (T_cam_world @ P_world_h)[:3]
u_proj, v_proj = project_point_to_pixel(P_cam, K)
if u_proj is not None and v_proj is not None:
h, w = confidence_map.shape[:2]
if 0 <= u_proj < w and 0 <= v_proj < h:
confidence = confidence_map[v_proj, u_proj]
# Higher confidence value means LESS confident in ZED SDK
# Range [1, 100], where 100 is typically occlusion/invalid
if confidence > confidence_thresh:
continue
residual = compute_depth_residual(
corner, T_world_cam, depth_map, K, window_size=5
)
if residual is not None:
detailed_residuals.append((int(marker_id), corner_idx, residual))
return detailed_residuals
def verify_extrinsics_with_depth(
T_world_cam: np.ndarray,
marker_corners_world: Dict[int, np.ndarray],
depth_map: np.ndarray,
K: np.ndarray,
confidence_map: Optional[np.ndarray] = None,
confidence_thresh: float = 50,
) -> DepthVerificationResult:
detailed_residuals = compute_marker_corner_residuals(
T_world_cam,
marker_corners_world,
depth_map,
K,
confidence_map,
confidence_thresh,
)
residuals = [r[2] for r in detailed_residuals]
n_total = sum(len(corners) for corners in marker_corners_world.values())
n_valid = len(residuals)
if n_valid == 0:
return DepthVerificationResult(
residuals=[],
rmse=0.0,
mean_abs=0.0,
median=0.0,
depth_normalized_rmse=0.0,
n_valid=0,
n_total=n_total,
)
residuals_array = np.array(residuals)
rmse = float(np.sqrt(np.mean(residuals_array**2)))
mean_abs = float(np.mean(np.abs(residuals_array)))
median = float(np.median(residuals_array))
depth_normalized_rmse = 0.0
if n_valid > 0:
depths = []
for marker_id, corners in marker_corners_world.items():
for corner in corners:
T_cam_world = invert_transform(T_world_cam)
P_world_h = np.append(corner, 1.0)
P_cam = (T_cam_world @ P_world_h)[:3]
if P_cam[2] > 0:
depths.append(P_cam[2])
if depths:
mean_depth = np.mean(depths)
if mean_depth > 0:
depth_normalized_rmse = float(rmse / mean_depth)
return DepthVerificationResult(
residuals=detailed_residuals,
rmse=rmse,
mean_abs=mean_abs,
median=median,
depth_normalized_rmse=depth_normalized_rmse,
n_valid=n_valid,
n_total=n_total,
)
+144
View File
@@ -0,0 +1,144 @@
import cv2
import numpy as np
import pyzed.sl as sl
from typing import Optional, Tuple, Dict
from .pose_math import compute_reprojection_error
def create_detector(
dictionary_id: int = cv2.aruco.DICT_4X4_50,
params: Optional[cv2.aruco.DetectorParameters] = None,
) -> cv2.aruco.ArucoDetector:
"""
Creates an ArUco detector with the specified dictionary and parameters.
Args:
dictionary_id: The ArUco dictionary ID.
params: Optional detector parameters. If None, default parameters are used.
Returns:
cv2.aruco.ArucoDetector: The configured ArUco detector.
"""
dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
if params is None:
params = cv2.aruco.DetectorParameters()
return cv2.aruco.ArucoDetector(dictionary, params)
def detect_markers(
image: np.ndarray, detector: cv2.aruco.ArucoDetector
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""
Detects ArUco markers in an image.
Args:
image: Input image (BGR or grayscale).
detector: The ArUco detector to use.
Returns:
Tuple[np.ndarray, Optional[np.ndarray]]:
- corners: Detected corners as float32 shape (N, 4, 2).
- ids: Detected marker IDs as int shape (N,) or None if no markers detected.
"""
if image is None:
raise ValueError("Input image is None")
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
corners, ids, _ = detector.detectMarkers(gray)
if ids is not None:
# corners is a list of (1, 4, 2) arrays, reshape to (N, 4, 2)
corners_array = np.array(corners, dtype=np.float32).reshape(-1, 4, 2)
ids_array = ids.flatten().astype(np.int32)
return corners_array, ids_array
else:
return np.empty((0, 4, 2), dtype=np.float32), None
def build_camera_matrix_from_zed(
cam: sl.Camera, resizer: Optional[sl.Resolution] = None
) -> np.ndarray:
"""
Builds a 3x3 camera intrinsic matrix from a ZED camera's calibration parameters.
Args:
cam: The ZED camera object.
resizer: Optional resolution to get parameters for.
Returns:
np.ndarray: 3x3 float64 camera matrix K.
"""
cam_params = cam.get_camera_information(
resizer
).camera_configuration.calibration_parameters.left_cam
fx = cam_params.fx
fy = cam_params.fy
cx = cam_params.cx
cy = cam_params.cy
K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float64)
return K
def estimate_pose_from_detections(
corners: np.ndarray,
ids: np.ndarray,
marker_geometry: Dict[int, np.ndarray],
K: np.ndarray,
dist: Optional[np.ndarray] = None,
min_markers: int = 4,
pnp_flag: int = cv2.SOLVEPNP_SQPNP,
) -> Optional[Tuple[np.ndarray, np.ndarray, float, int]]:
"""
Estimates the camera pose from detected ArUco markers.
Args:
corners: Detected corners (N, 4, 2).
ids: Detected marker IDs (N,).
marker_geometry: Dictionary mapping marker ID to its 3D corners (4, 3).
K: 3x3 camera matrix.
dist: Optional distortion coefficients.
min_markers: Minimum number of markers required for pose estimation.
pnp_flag: OpenCV solvePnP flag.
Returns:
Optional[Tuple[np.ndarray, np.ndarray, float, int]]:
(rvec, tvec, reproj_err, n_markers) or None if estimation fails or not enough markers.
"""
if ids is None or len(ids) == 0:
return None
obj_pts = []
img_pts = []
n_markers = 0
for i, marker_id in enumerate(ids):
if marker_id in marker_geometry:
obj_pts.append(marker_geometry[marker_id])
img_pts.append(corners[i])
n_markers += 1
if n_markers < min_markers:
return None
# Stack points into (M*4, 3) and (M*4, 2)
obj_pts_array = np.vstack(obj_pts).astype(np.float32)
img_pts_array = np.vstack(img_pts).astype(np.float32)
success, rvec, tvec = cv2.solvePnP(
obj_pts_array, img_pts_array, K, dist, flags=pnp_flag
)
if not success:
return None
reproj_err = compute_reprojection_error(
obj_pts_array, img_pts_array, rvec, tvec, K, dist
)
return rvec, tvec, reproj_err, n_markers
@@ -0,0 +1,173 @@
from datetime import datetime
from pathlib import Path
from typing import Final, Optional, TypedDict, cast
import awkward as ak
import cv2
import numpy as np
from cv2 import aruco
from cv2.typing import MatLike
from jaxtyping import Int, Num
from loguru import logger
NDArray = np.ndarray
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
# OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet"
OBJECT_POINTS_PARQUET = Path("output") / "standard_box_markers.parquet"
DICTIONARY: Final[int] = aruco.DICT_4X4_50
# 400mm
MARKER_LENGTH: Final[float] = 0.4
class MarkerFace(TypedDict):
"""
for diamond ArUco markers, N is 4
"""
name: str
"""
a label for the face
"""
ids: Int[NDArray, "N"]
"""
ArUco marker ids
"""
corners: Num[NDArray, "N 4 3"]
"""
Corner coordinates in 3D of rectangle,
relative to the world origin
"""
def gen():
API = cv2.CAP_AVFOUNDATION
cap = cv2.VideoCapture(0, API)
while True:
ret, frame = cap.read()
if not ret:
logger.warning("Failed to grab frame")
break
yield frame
def main():
aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)
cal = ak.from_parquet(CALIBRATION_PARQUET)[0]
camera_matrix = cast(MatLike, ak.to_numpy(cal["camera_matrix"]))
distortion_coefficients = cast(MatLike, ak.to_numpy(cal["distortion_coefficients"]))
ops = ak.from_parquet(OBJECT_POINTS_PARQUET)
detector = aruco.ArucoDetector(
dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()
)
total_ids = cast(NDArray, ak.to_numpy(ops["ids"])).flatten()
total_corners = cast(NDArray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))
logger.info("ops_map={}", ops_map)
writer: Optional[cv2.VideoWriter] = None
for frame in gen():
grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# pylint: disable-next=unpacking-non-sequence
markers, ids, rejected = detector.detectMarkers(grey)
# `markers` is [N, 1, 4, 2]
# `ids` is [N, 1]
if ids is not None:
markers = np.reshape(markers, (-1, 4, 2))
ids = np.reshape(ids, (-1, 1))
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
ips_map: dict[int, NDArray] = {}
for cs, id in zip(markers, ids):
id = int(id)
cs = cast(NDArray, cs)
ips_map[id] = cs
center = np.mean(cs, axis=0).astype(int)
GREY = (128, 128, 128)
# logger.info("id={}, center={}", id, center)
cv2.circle(frame, tuple(center), 5, GREY, -1)
cv2.putText(
frame,
str(id),
tuple(center),
cv2.FONT_HERSHEY_SIMPLEX,
1,
GREY,
2,
)
# BGR
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
color_map = [RED, GREEN, BLUE, YELLOW]
for color, corners in zip(color_map, cs):
corners = corners.astype(int)
frame = cv2.circle(frame, corners, 5, color, -1)
# https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e
ops: NDArray = np.empty((0, 3), dtype=np.float32)
ips: NDArray = np.empty((0, 2), dtype=np.float32)
for id, ip in ips_map.items():
try:
op = ops_map[id]
assert ip.shape == (4, 2), f"corners.shape={ip.shape}"
assert op.shape == (4, 3), f"op.shape={op.shape}"
ops = np.concatenate((ops, op), axis=0)
ips = np.concatenate((ips, ip), axis=0)
except KeyError:
logger.warning("No object points for id={}", id)
continue
assert len(ops) == len(ips), f"len(ops)={len(ops)} != len(ips)={len(ips)}"
if len(ops) > 0:
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags
ret, rvec, tvec = cv2.solvePnP(
objectPoints=ops,
imagePoints=ips,
cameraMatrix=camera_matrix,
distCoeffs=distortion_coefficients,
flags=cv2.SOLVEPNP_SQPNP,
)
# ret, rvec, tvec, inliners = cv2.solvePnPRansac(
# objectPoints=ops,
# imagePoints=ips,
# cameraMatrix=camera_matrix,
# distCoeffs=distortion_coefficients,
# flags=cv2.SOLVEPNP_SQPNP,
# )
if ret:
cv2.drawFrameAxes(
frame,
camera_matrix,
distortion_coefficients,
rvec,
tvec,
MARKER_LENGTH,
)
else:
logger.warning("Failed to solvePnPRansac")
cv2.imshow("frame", frame)
if writer is not None:
writer.write(frame)
if (k := cv2.waitKey(1)) == ord("q"):
logger.info("Exiting")
break
elif k == ord("s"):
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.png"
logger.info("Saving to {}", file_name)
cv2.imwrite(file_name, frame)
elif k == ord("r"):
if writer is not None:
writer.release()
writer = None
logger.info("Recording stopped")
else:
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.mp4"
logger.info("Recording to {}", file_name)
fourcc = cv2.VideoWriter.fourcc(*"mp4v")
writer = cv2.VideoWriter(file_name, fourcc, 20.0, frame.shape[:2][::-1])
if __name__ == "__main__":
main()
@@ -0,0 +1,690 @@
import numpy as np
from typing import Optional, Tuple, List, Dict, Any
from jaxtyping import Float
from typing import TYPE_CHECKING
import open3d as o3d
from dataclasses import dataclass, field
import plotly.graph_objects as go
if TYPE_CHECKING:
Vec3 = Float[np.ndarray, "3"]
Mat44 = Float[np.ndarray, "4 4"]
PointsNC = Float[np.ndarray, "N 3"]
else:
Vec3 = np.ndarray
Mat44 = np.ndarray
PointsNC = np.ndarray
@dataclass
class FloorPlane:
normal: Vec3
d: float
num_inliers: int = 0
@dataclass
class FloorCorrection:
transform: Mat44
valid: bool
reason: str = ""
@dataclass
class GroundPlaneConfig:
enabled: bool = True
target_y: float = 0.0
stride: int = 8
depth_min: float = 0.2
depth_max: float = 5.0
ransac_dist_thresh: float = 0.02
ransac_n: int = 3
ransac_iters: int = 1000
max_rotation_deg: float = 5.0
max_translation_m: float = 0.1
min_inliers: int = 500
min_inlier_ratio: float = 0.15
min_valid_cameras: int = 2
normal_vertical_thresh: float = 0.9
max_consensus_deviation_deg: float = 10.0
max_consensus_deviation_m: float = 0.5
seed: Optional[int] = None
@dataclass
class GroundPlaneMetrics:
success: bool = False
correction_applied: bool = False
num_cameras_total: int = 0
num_cameras_valid: int = 0
# Per-camera corrections
camera_corrections: Dict[str, Mat44] = field(default_factory=dict)
skipped_cameras: List[str] = field(default_factory=list)
# Summary stats (optional, maybe average of corrections?)
rotation_deg: float = 0.0
translation_m: float = 0.0
camera_planes: Dict[str, FloorPlane] = field(default_factory=dict)
consensus_plane: Optional[FloorPlane] = None
message: str = ""
def unproject_depth_to_points(
depth_map: np.ndarray,
K: np.ndarray,
stride: int = 1,
depth_min: float = 0.1,
depth_max: float = 10.0,
) -> PointsNC:
"""
Unproject a depth map to a point cloud.
"""
h, w = depth_map.shape
fx = K[0, 0]
fy = K[1, 1]
cx = K[0, 2]
cy = K[1, 2]
# Create meshgrid of pixel coordinates
# Use stride to reduce number of points
u_coords = np.arange(0, w, stride)
v_coords = np.arange(0, h, stride)
u, v = np.meshgrid(u_coords, v_coords)
# Sample depth map
z = depth_map[0:h:stride, 0:w:stride]
# Filter by depth bounds
valid_mask = (z > depth_min) & (z < depth_max) & np.isfinite(z)
# Apply mask
z_valid = z[valid_mask]
u_valid = u[valid_mask]
v_valid = v[valid_mask]
# Unproject
x_valid = (u_valid - cx) * z_valid / fx
y_valid = (v_valid - cy) * z_valid / fy
# Stack into (N, 3) array
points = np.stack((x_valid, y_valid, z_valid), axis=-1)
return points.astype(np.float64)
def detect_floor_plane(
points: PointsNC,
distance_threshold: float = 0.02,
ransac_n: int = 3,
num_iterations: int = 1000,
seed: Optional[int] = None,
) -> Optional[FloorPlane]:
"""
Detect the floor plane from a point cloud using RANSAC.
Returns FloorPlane or None if detection fails.
"""
if points.shape[0] < ransac_n:
return None
# Convert to Open3D PointCloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)
# Set seed for determinism if provided
if seed is not None:
o3d.utility.random.seed(seed)
# Segment plane
plane_model, inliers = pcd.segment_plane(
distance_threshold=distance_threshold,
ransac_n=ransac_n,
num_iterations=num_iterations,
)
# Check if we found enough inliers
if len(inliers) < ransac_n:
return None
# plane_model is [a, b, c, d]
a, b, c, d = plane_model
normal = np.array([a, b, c], dtype=np.float64)
# Normalize normal (Open3D usually returns normalized, but be safe)
norm = np.linalg.norm(normal)
if norm > 1e-6:
normal /= norm
d /= norm
return FloorPlane(normal=normal, d=d, num_inliers=len(inliers))
def compute_consensus_plane(
planes: List[FloorPlane],
weights: Optional[List[float]] = None,
) -> FloorPlane:
"""
Compute a consensus plane from multiple plane detections.
Uses a robust median-like approach to reject outliers.
"""
if not planes:
raise ValueError("No planes provided for consensus.")
n_planes = len(planes)
if weights is None:
weights = [1.0] * n_planes
if len(weights) != n_planes:
raise ValueError(
f"Weights length {len(weights)} must match planes length {n_planes}"
)
# 1. Align all normals to be in the upper hemisphere (y > 0)
# This simplifies averaging
aligned_planes = []
for p in planes:
normal = p.normal.copy()
d = p.d
if normal[1] < 0:
normal = -normal
d = -d
aligned_planes.append(FloorPlane(normal=normal, d=d, num_inliers=p.num_inliers))
# 2. Compute median normal and d to be robust against outliers
normals = np.array([p.normal for p in aligned_planes])
ds = np.array([p.d for p in aligned_planes])
# Median of each component for normal (approximate robust mean)
median_normal = np.median(normals, axis=0)
norm = np.linalg.norm(median_normal)
if norm > 1e-6:
median_normal /= norm
else:
median_normal = np.array([0.0, 1.0, 0.0])
median_d = float(np.median(ds))
# 3. Filter outliers based on deviation from median
# Angle deviation
valid_indices = []
for i, p in enumerate(aligned_planes):
# Angle between normal and median normal
dot = np.clip(np.dot(p.normal, median_normal), -1.0, 1.0)
angle_deg = np.rad2deg(np.arccos(dot))
# Distance deviation
dist_diff = abs(p.d - median_d)
# Thresholds for outlier rejection (hardcoded for now, could be config)
if angle_deg < 15.0 and dist_diff < 0.5:
valid_indices.append(i)
if not valid_indices:
# Fallback to all if everything is rejected (should be rare)
valid_indices = list(range(n_planes))
# 4. Weighted average of valid planes
accum_normal = np.zeros(3, dtype=np.float64)
accum_d = 0.0
total_weight = 0.0
for i in valid_indices:
w = weights[i]
p = aligned_planes[i]
accum_normal += p.normal * w
accum_d += p.d * w
total_weight += w
if total_weight <= 0:
# Should not happen given checks above
return FloorPlane(normal=median_normal, d=median_d)
avg_normal = accum_normal / total_weight
avg_d = accum_d / total_weight
# Re-normalize normal
norm = np.linalg.norm(avg_normal)
if norm > 1e-6:
avg_normal /= norm
avg_d /= norm
else:
avg_normal = np.array([0.0, 1.0, 0.0])
avg_d = 0.0
return FloorPlane(normal=avg_normal, d=float(avg_d))
from .alignment import rotation_align_vectors
def compute_floor_correction(
current_floor_plane: FloorPlane,
target_floor_y: float = 0.0,
max_rotation_deg: float = 5.0,
max_translation_m: float = 0.1,
target_plane: Optional[FloorPlane] = None,
) -> FloorCorrection:
"""
Compute the correction transform to align the current floor plane to the target floor height.
Constrains correction to pitch/roll and vertical translation only.
If target_plane is provided, aligns current plane to target_plane (relative correction).
Otherwise, aligns to absolute Y=target_floor_y (absolute correction).
"""
current_normal = current_floor_plane.normal
current_d = current_floor_plane.d
# Target normal is always [0, 1, 0] (Y-up)
target_normal = np.array([0.0, 1.0, 0.0])
if target_plane is not None:
# Use target_plane.normal as the target normal
align_target_normal = target_plane.normal
# Ensure it points roughly up
if align_target_normal[1] < 0:
align_target_normal = -align_target_normal
else:
align_target_normal = target_normal
# 1. Compute rotation to align normals
try:
R_align = rotation_align_vectors(current_normal, align_target_normal)
except ValueError as e:
return FloorCorrection(
transform=np.eye(4), valid=False, reason=f"Rotation alignment failed: {e}"
)
# Check rotation magnitude
# Angle of rotation is acos((trace(R) - 1) / 2)
trace = np.trace(R_align)
# Clip to avoid numerical errors outside [-1, 1]
cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0)
angle_rad = np.arccos(cos_angle)
angle_deg = np.rad2deg(angle_rad)
if angle_deg > max_rotation_deg:
return FloorCorrection(
transform=np.eye(4),
valid=False,
reason=f"Rotation {angle_deg:.1f} deg exceeds limit {max_rotation_deg:.1f} deg",
)
# 2. Compute translation
if target_plane is not None:
# Relative correction: align d to target_plane.d
# Shift = current_d - target_plane.d (assuming normals aligned)
# We use absolute values of d to handle potential sign flips in plane detection
# But wait, d sign matters for plane side.
# If normals are aligned (which we ensured with R_align and align_target_normal),
# then d should be comparable directly.
# However, target_plane.d might be negative if normal was flipped.
# Let's use the d corresponding to align_target_normal.
target_d = target_plane.d
if np.dot(target_plane.normal, align_target_normal) < 0:
target_d = -target_d
# Current d needs to be relative to current normal?
# No, current_d is relative to current_normal.
# After rotation R_align, current_normal becomes align_target_normal.
# So current_d is preserved (distance to origin doesn't change with rotation around origin).
# So we just compare d values.
t_mag = current_d - target_d
trans_dir = align_target_normal
else:
# Absolute correction to target_y
# We want new y = target_floor_y
# So shift = target_floor_y + current_d
t_mag = target_floor_y + current_d
trans_dir = target_normal
# Check translation magnitude
if abs(t_mag) > max_translation_m:
return FloorCorrection(
transform=np.eye(4),
valid=False,
reason=f"Translation {t_mag:.3f} m exceeds limit {max_translation_m:.3f} m",
)
# Construct T
T = np.eye(4)
T[:3, :3] = R_align
# Translation is applied in the rotated frame (aligned to target normal)
T[:3, 3] = trans_dir * t_mag
return FloorCorrection(transform=T.astype(np.float64), valid=True)
def refine_ground_from_depth(
camera_data: Dict[str, Dict[str, Any]],
extrinsics: Dict[str, Mat44],
config: GroundPlaneConfig = GroundPlaneConfig(),
) -> Tuple[Dict[str, Mat44], GroundPlaneMetrics]:
"""
Orchestrate ground plane refinement across multiple cameras.
Args:
camera_data: Dict mapping serial -> {'depth': np.ndarray, 'K': np.ndarray}
extrinsics: Dict mapping serial -> world_from_cam matrix (4x4)
config: Configuration parameters
Returns:
Tuple of (new_extrinsics, metrics)
"""
metrics = GroundPlaneMetrics()
metrics.num_cameras_total = len(camera_data)
if not config.enabled:
metrics.message = "Ground plane refinement disabled in config"
return extrinsics, metrics
valid_planes: List[FloorPlane] = []
valid_serials: List[str] = []
# 1. Detect planes in each camera
for serial, data in camera_data.items():
if serial not in extrinsics:
continue
depth_map = data.get("depth")
K = data.get("K")
if depth_map is None or K is None:
continue
# Unproject to camera frame
points_cam = unproject_depth_to_points(
depth_map,
K,
stride=config.stride,
depth_min=config.depth_min,
depth_max=config.depth_max,
)
if len(points_cam) < config.min_inliers:
continue
# Transform to world frame
T_world_cam = extrinsics[serial]
# points_cam is (N, 3)
# Apply rotation and translation
R = T_world_cam[:3, :3]
t = T_world_cam[:3, 3]
points_world = (points_cam @ R.T) + t
# Detect plane
plane = detect_floor_plane(
points_world,
distance_threshold=config.ransac_dist_thresh,
ransac_n=config.ransac_n,
num_iterations=config.ransac_iters,
seed=config.seed,
)
if plane is not None:
# Check inlier count
if plane.num_inliers < config.min_inliers:
continue
# Check inlier ratio if configured
if config.min_inlier_ratio > 0:
ratio = plane.num_inliers / len(points_world)
if ratio < config.min_inlier_ratio:
continue
# Check normal orientation (must be roughly vertical)
# We expect floor normal to be roughly [0, 1, 0] or [0, -1, 0]
if abs(plane.normal[1]) < config.normal_vertical_thresh:
continue
metrics.camera_planes[serial] = plane
valid_planes.append(plane)
valid_serials.append(serial)
metrics.num_cameras_valid = len(valid_planes)
# 2. Check minimum requirements
if len(valid_planes) < config.min_valid_cameras:
metrics.message = f"Found {len(valid_planes)} valid planes, required {config.min_valid_cameras}"
return extrinsics, metrics
# 3. Compute consensus (for reporting/metrics only)
try:
consensus = compute_consensus_plane(valid_planes)
metrics.consensus_plane = consensus
except ValueError as e:
metrics.message = f"Consensus computation failed: {e}"
return extrinsics, metrics
# 4. Compute and apply per-camera correction
new_extrinsics = extrinsics.copy()
# Track max rotation/translation for summary metrics
max_rot = 0.0
max_trans = 0.0
corrections_count = 0
for serial, T_old in extrinsics.items():
# If we didn't find a plane for this camera, skip it
if serial not in metrics.camera_planes:
metrics.skipped_cameras.append(serial)
continue
plane = metrics.camera_planes[serial]
correction = compute_floor_correction(
plane,
target_floor_y=config.target_y,
max_rotation_deg=config.max_rotation_deg,
max_translation_m=config.max_translation_m,
target_plane=metrics.consensus_plane,
)
if not correction.valid:
metrics.skipped_cameras.append(serial)
continue
# Validate against consensus if available
if metrics.consensus_plane:
# Check if this camera's plane is too far from consensus
# This prevents a single bad camera from getting a huge correction
# even if it passed individual checks (e.g. it found a wall instead of floor)
# Angle check
dot = np.clip(
np.dot(plane.normal, metrics.consensus_plane.normal), -1.0, 1.0
)
# Handle flipped normals
if dot < 0:
dot = -dot
angle_deg = np.rad2deg(np.arccos(dot))
if angle_deg > config.max_consensus_deviation_deg:
metrics.skipped_cameras.append(serial)
continue
# Distance check (project consensus origin onto this plane)
# Consensus plane: n_c . p + d_c = 0
# This plane: n . p + d = 0
# Compare d values (assuming normals aligned)
d_diff = abs(abs(plane.d) - abs(metrics.consensus_plane.d))
if d_diff > config.max_consensus_deviation_m:
metrics.skipped_cameras.append(serial)
continue
T_corr = correction.transform
metrics.camera_corrections[serial] = T_corr
# Apply correction: T_new = T_corr @ T_old
new_extrinsics[serial] = T_corr @ T_old
# Update summary metrics
trace = np.trace(T_corr[:3, :3])
cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0)
rot_deg = float(np.rad2deg(np.arccos(cos_angle)))
trans_m = float(np.linalg.norm(T_corr[:3, 3]))
max_rot = max(max_rot, rot_deg)
max_trans = max(max_trans, trans_m)
corrections_count += 1
metrics.rotation_deg = max_rot
metrics.translation_m = max_trans
metrics.success = True
metrics.correction_applied = corrections_count > 0
metrics.message = (
f"Corrected {corrections_count} cameras, skipped {len(metrics.skipped_cameras)}"
)
return new_extrinsics, metrics
def create_ground_diagnostic_plot(
metrics: GroundPlaneMetrics,
camera_data: Dict[str, Dict[str, Any]],
extrinsics_before: Dict[str, Mat44],
extrinsics_after: Dict[str, Mat44],
) -> go.Figure:
"""
Create a Plotly diagnostic visualization for ground plane refinement.
"""
fig = go.Figure()
# 1. Add World Origin Axes
axis_scale = 0.5
for axis, color, name in zip(
[np.array([1, 0, 0]), np.array([0, 1, 0]), np.array([0, 0, 1])],
["red", "green", "blue"],
["X", "Y", "Z"],
):
fig.add_trace(
go.Scatter3d(
x=[0, axis[0] * axis_scale],
y=[0, axis[1] * axis_scale],
z=[0, axis[2] * axis_scale],
mode="lines",
line=dict(color=color, width=4),
name=f"World {name}",
showlegend=True,
)
)
# 2. Add Consensus Plane (if available)
if metrics.consensus_plane:
plane = metrics.consensus_plane
# Create a surface for the plane
size = 5.0
x = np.linspace(-size, size, 2)
z = np.linspace(-size, size, 2)
xx, zz = np.meshgrid(x, z)
# n.p + d = 0 => n0*x + n1*y + n2*z + d = 0 => y = -(n0*x + n2*z + d) / n1
if abs(plane.normal[1]) > 1e-6:
yy = (
-(plane.normal[0] * xx + plane.normal[2] * zz + plane.d)
/ plane.normal[1]
)
fig.add_trace(
go.Surface(
x=xx,
y=yy,
z=zz,
showscale=False,
opacity=0.3,
colorscale=[[0, "lightgray"], [1, "lightgray"]],
name="Consensus Plane",
)
)
# 3. Add Floor Points per camera
for serial, data in camera_data.items():
if serial not in extrinsics_before:
continue
depth_map = data.get("depth")
K = data.get("K")
if depth_map is None or K is None:
continue
# Use a larger stride for visualization to keep it responsive
viz_stride = 8
points_cam = unproject_depth_to_points(depth_map, K, stride=viz_stride)
if len(points_cam) == 0:
continue
# Transform to world frame (before)
T_before = extrinsics_before[serial]
R_b = T_before[:3, :3]
t_b = T_before[:3, 3]
points_world = (points_cam @ R_b.T) + t_b
fig.add_trace(
go.Scatter3d(
x=points_world[:, 0],
y=points_world[:, 1],
z=points_world[:, 2],
mode="markers",
marker=dict(size=2, opacity=0.5),
name=f"Points {serial}",
)
)
# 4. Add Camera Positions Before/After
for serial in extrinsics_before:
T_b = extrinsics_before[serial]
pos_b = T_b[:3, 3]
fig.add_trace(
go.Scatter3d(
x=[pos_b[0]],
y=[pos_b[1]],
z=[pos_b[2]],
mode="markers+text",
marker=dict(size=5, color="red"),
text=[f"{serial} (before)"],
name=f"Cam {serial} (before)",
)
)
if serial in extrinsics_after:
T_a = extrinsics_after[serial]
pos_a = T_a[:3, 3]
fig.add_trace(
go.Scatter3d(
x=[pos_a[0]],
y=[pos_a[1]],
z=[pos_a[2]],
mode="markers+text",
marker=dict(size=5, color="green"),
text=[f"{serial} (after)"],
name=f"Cam {serial} (after)",
)
)
fig.update_layout(
title="Ground Plane Refinement Diagnostics",
scene=dict(
xaxis_title="X",
yaxis_title="Y",
zaxis_title="Z",
aspectmode="data",
camera=dict(
up=dict(x=0, y=-1, z=0), # Y-down convention for visualization
eye=dict(x=1.5, y=-1.5, z=1.5),
),
),
margin=dict(l=0, r=0, b=0, t=40),
)
return fig
def save_diagnostic_plot(fig: go.Figure, path: str) -> None:
"""
Save the diagnostic plot to an HTML file.
"""
import os
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
fig.write_html(path)
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,125 @@
import numpy as np
import awkward as ak
from pathlib import Path
from typing import Union, cast
def load_marker_geometry(parquet_path: Union[str, Path]) -> dict[int, np.ndarray]:
"""
Reads ArUco marker geometry from a parquet file.
The parquet file is expected to have 'ids' and 'corners' columns,
similar to standard_box_markers.parquet.
Args:
parquet_path: Path to the parquet file.
Returns:
A dictionary mapping marker IDs to their 3D corner coordinates.
The corners are represented as a (4, 3) float32 numpy array.
"""
path = Path(parquet_path)
if not path.exists():
raise FileNotFoundError(f"Parquet file not found: {path}")
ops = ak.from_parquet(path)
# Extract IDs and corners using logic similar to find_extrinsic_object.py
total_ids = cast(np.ndarray, ak.to_numpy(ops["ids"])).flatten()
total_corners = cast(np.ndarray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
# Create the mapping and ensure coordinates are float32
marker_geometry = {
int(marker_id): corners.astype(np.float32)
for marker_id, corners in zip(total_ids, total_corners)
}
return marker_geometry
def load_face_mapping(parquet_path: Union[str, Path]) -> dict[str, list[int]]:
"""
Reads face mapping from a parquet file.
The parquet file is expected to have 'name' and 'ids' columns.
'name' should be a string (face name), and 'ids' should be a list of integers.
Args:
parquet_path: Path to the parquet file.
Returns:
A dictionary mapping face names (lowercase) to lists of marker IDs.
"""
path = Path(parquet_path)
if not path.exists():
raise FileNotFoundError(f"Parquet file not found: {path}")
ops = ak.from_parquet(path)
# Check if required columns exist
if "name" not in ops.fields or "ids" not in ops.fields:
# Fallback or empty if columns missing (e.g. old format)
return {}
names = cast(np.ndarray, ak.to_numpy(ops["name"]))
# ids might be a jagged array if different faces have different marker counts
# ak.to_list() converts to python lists which is what we want for the dict values
ids_list = ak.to_list(ops["ids"])
face_map: dict[str, list[int]] = {}
for name, m_ids in zip(names, ids_list):
if name and m_ids:
# Normalize name to lowercase
key = str(name).lower()
# Ensure IDs are ints
face_map[key] = [int(mid) for mid in m_ids]
return face_map
def validate_marker_geometry(geometry: dict[int, np.ndarray]) -> None:
"""
Validates the marker geometry dictionary.
Checks:
- Dictionary is not empty.
- Every entry has shape (4, 3).
- Every entry contains only finite numbers (no NaN or Inf).
- Coordinates are within a reasonable range (abs(coord) < 100m).
Args:
geometry: The marker geometry dictionary to validate.
Raises:
ValueError: If any validation check fails.
"""
if not geometry:
raise ValueError("Marker geometry is empty.")
for marker_id, corners in geometry.items():
# Check shape
if corners.shape != (4, 3):
raise ValueError(
f"Invalid shape for marker {marker_id}: "
f"expected (4, 3), got {corners.shape}"
)
# Check for non-finite values (NaN, Inf)
if not np.isfinite(corners).all():
raise ValueError(
f"Marker {marker_id} contains non-finite values (NaN or Inf)."
)
# Check for reasonable range (e.g., < 100 meters)
if np.any(np.abs(corners) > 100.0):
raise ValueError(
f"Marker {marker_id} has coordinates exceeding reasonable range (> 100m): "
f"{corners}"
)
def expected_ids(geometry: dict[int, np.ndarray]) -> set[int]:
"""
Returns the set of marker IDs present in the geometry.
"""
return set(geometry.keys())
@@ -0,0 +1,243 @@
import numpy as np
from typing import List, Dict, Tuple, Optional, Any
try:
from scipy.spatial.transform import Rotation as R_scipy
has_scipy = True
except ImportError:
has_scipy = False
R_scipy = None # type: ignore
def rotation_angle_deg(Ra: np.ndarray, Rb: np.ndarray) -> float:
"""
Compute the rotation angle in degrees between two rotation matrices.
Args:
Ra: 3x3 rotation matrix.
Rb: 3x3 rotation matrix.
Returns:
Angle in degrees.
"""
R_diff = Ra.T @ Rb
cos_theta = (np.trace(R_diff) - 1.0) / 2.0
# Clip to avoid numerical issues
cos_theta = np.clip(cos_theta, -1.0, 1.0)
return np.degrees(np.arccos(cos_theta))
def matrix_to_quaternion(R_mat: np.ndarray) -> np.ndarray:
"""
Convert a 3x3 rotation matrix to a quaternion (w, x, y, z).
Uses the method from "Unit Quaternions and Rotation Matrices" by Mike Day.
"""
tr = np.trace(R_mat)
if tr > 0:
s = np.sqrt(tr + 1.0) * 2
qw = 0.25 * s
qx = (R_mat[2, 1] - R_mat[1, 2]) / s
qy = (R_mat[0, 2] - R_mat[2, 0]) / s
qz = (R_mat[1, 0] - R_mat[0, 1]) / s
elif (R_mat[0, 0] > R_mat[1, 1]) and (R_mat[0, 0] > R_mat[2, 2]):
s = np.sqrt(1.0 + R_mat[0, 0] - R_mat[1, 1] - R_mat[2, 2]) * 2
qw = (R_mat[2, 1] - R_mat[1, 2]) / s
qx = 0.25 * s
qy = (R_mat[0, 1] + R_mat[1, 0]) / s
qz = (R_mat[0, 2] + R_mat[2, 0]) / s
elif R_mat[1, 1] > R_mat[2, 2]:
s = np.sqrt(1.0 + R_mat[1, 1] - R_mat[0, 0] - R_mat[2, 2]) * 2
qw = (R_mat[0, 2] - R_mat[2, 0]) / s
qx = (R_mat[0, 1] + R_mat[1, 0]) / s
qy = 0.25 * s
qz = (R_mat[1, 2] + R_mat[2, 1]) / s
else:
s = np.sqrt(1.0 + R_mat[2, 2] - R_mat[0, 0] - R_mat[1, 1]) * 2
qw = (R_mat[1, 0] - R_mat[0, 1]) / s
qx = (R_mat[0, 2] + R_mat[2, 0]) / s
qy = (R_mat[1, 2] + R_mat[2, 1]) / s
qz = 0.25 * s
return np.array([qw, qx, qy, qz])
def quaternion_to_matrix(q: np.ndarray) -> np.ndarray:
"""
Convert a quaternion (w, x, y, z) to a 3x3 rotation matrix.
"""
qw, qx, qy, qz = q
return np.array(
[
[
1 - 2 * qy**2 - 2 * qz**2,
2 * qx * qy - 2 * qz * qw,
2 * qx * qz + 2 * qy * qw,
],
[
2 * qx * qy + 2 * qz * qw,
1 - 2 * qx**2 - 2 * qz**2,
2 * qy * qz - 2 * qx * qw,
],
[
2 * qx * qz - 2 * qy * qw,
2 * qy * qz + 2 * qx * qw,
1 - 2 * qx**2 - 2 * qy**2,
],
]
)
class PoseAccumulator:
"""
Accumulates 4x4 poses and provides robust averaging methods.
"""
def __init__(self):
self.poses: List[np.ndarray] = []
self.reproj_errors: List[float] = []
self.frame_ids: List[int] = []
def add_pose(self, T: np.ndarray, reproj_error: float, frame_id: int) -> None:
"""
Add a 4x4 pose to the accumulator.
Args:
T: 4x4 transformation matrix.
reproj_error: Reprojection error associated with the pose.
frame_id: ID of the frame where the pose was detected.
"""
if not isinstance(T, np.ndarray) or T.shape != (4, 4):
raise ValueError("T must be a 4x4 numpy array")
self.poses.append(T.copy())
self.reproj_errors.append(float(reproj_error))
self.frame_ids.append(int(frame_id))
def filter_by_reproj(self, max_reproj_error: float) -> List[int]:
"""
Return indices of poses with reprojection error below threshold.
"""
return [
i for i, err in enumerate(self.reproj_errors) if err <= max_reproj_error
]
def ransac_filter(
self,
rot_thresh_deg: float = 5.0,
trans_thresh_m: float = 0.05,
max_iters: int = 200,
random_seed: int = 0,
) -> List[int]:
"""
Perform RANSAC to find the largest consensus set of poses.
Args:
rot_thresh_deg: Rotation threshold in degrees.
trans_thresh_m: Translation threshold in meters.
max_iters: Maximum number of RANSAC iterations.
random_seed: Seed for reproducibility.
Returns:
Indices of the inlier poses.
"""
n = len(self.poses)
if n == 0:
return []
if n == 1:
return [0]
rng = np.random.default_rng(random_seed)
best_inliers = []
for _ in range(max_iters):
# Pick a random reference pose
ref_idx = rng.integers(0, n)
T_ref = self.poses[ref_idx]
R_ref = T_ref[:3, :3]
t_ref = T_ref[:3, 3]
current_inliers = []
for i in range(n):
T_i = self.poses[i]
R_i = T_i[:3, :3]
t_i = T_i[:3, 3]
# Check rotation
d_rot = rotation_angle_deg(R_ref, R_i)
# Check translation
d_trans = np.linalg.norm(t_ref - t_i)
if d_rot <= rot_thresh_deg and d_trans <= trans_thresh_m:
current_inliers.append(i)
if len(current_inliers) > len(best_inliers):
best_inliers = current_inliers
if len(best_inliers) == n:
break
return best_inliers
def compute_robust_mean(
self, inlier_indices: Optional[List[int]] = None
) -> Tuple[np.ndarray, Dict[str, Any]]:
"""
Compute the robust mean of the accumulated poses.
Args:
inlier_indices: Optional list of indices to use. If None, uses all poses.
Returns:
T_mean: 4x4 averaged transformation matrix.
stats: Dictionary with statistics.
"""
if inlier_indices is None:
inlier_indices = list(range(len(self.poses)))
n_total = len(self.poses)
n_inliers = len(inlier_indices)
if n_inliers == 0:
return np.eye(4), {
"n_total": n_total,
"n_inliers": 0,
"median_reproj_error": 0.0,
}
inlier_poses = [self.poses[i] for i in inlier_indices]
inlier_errors = [self.reproj_errors[i] for i in inlier_indices]
# Translation: median
translations = np.array([T[:3, 3] for T in inlier_poses])
t_mean = np.median(translations, axis=0)
# Rotation: mean
rotations = [T[:3, :3] for T in inlier_poses]
if has_scipy and R_scipy is not None:
r_objs = R_scipy.from_matrix(rotations)
R_mean = r_objs.mean().as_matrix()
else:
# Quaternion eigen mean fallback
quats = np.array([matrix_to_quaternion(rot) for rot in rotations])
# Form the matrix M = sum(q_i * q_i^T)
M_mat = np.zeros((4, 4))
for i in range(n_inliers):
q = quats[i]
M_mat += np.outer(q, q)
# The average quaternion is the eigenvector corresponding to the largest eigenvalue
evals, evecs = np.linalg.eigh(M_mat)
q_mean = evecs[:, -1]
R_mean = quaternion_to_matrix(q_mean)
T_mean = np.eye(4)
T_mean[:3, :3] = R_mean
T_mean[:3, 3] = t_mean
stats = {
"n_total": n_total,
"n_inliers": n_inliers,
"median_reproj_error": (
float(np.median(inlier_errors)) if inlier_errors else 0.0
),
}
return T_mean, stats
@@ -0,0 +1,44 @@
import numpy as np
import cv2
def rvec_tvec_to_matrix(rvec, tvec):
rvec = np.asarray(rvec).flatten()
tvec = np.asarray(tvec).flatten()
R, _ = cv2.Rodrigues(rvec)
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = tvec
return T
def matrix_to_rvec_tvec(T):
R = T[:3, :3]
tvec = T[:3, 3]
rvec, _ = cv2.Rodrigues(R)
return rvec.flatten(), tvec.flatten()
def invert_transform(T):
R = T[:3, :3]
t = T[:3, 3]
T_inv = np.eye(4)
T_inv[:3, :3] = R.T
T_inv[:3, 3] = -R.T @ t
return T_inv
def compose_transforms(T1, T2):
return T1 @ T2
def compute_reprojection_error(obj_pts, img_pts, rvec, tvec, K, dist=None):
projected_pts, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, dist)
projected_pts = projected_pts.squeeze()
img_pts = img_pts.squeeze()
if projected_pts.ndim == 1:
projected_pts = projected_pts.reshape(1, -1)
if img_pts.ndim == 1:
img_pts = img_pts.reshape(1, -1)
error = np.linalg.norm(img_pts - projected_pts, axis=1)
return float(np.mean(error))
+93
View File
@@ -0,0 +1,93 @@
import cv2
import numpy as np
def draw_detected_markers(
image: np.ndarray, corners: list[np.ndarray] | np.ndarray, ids: np.ndarray | None
) -> np.ndarray:
"""
Draws ArUco markers outlines and IDs on the image.
Args:
image: Input image (BGR or grayscale).
corners: List of marker corners or np.ndarray.
ids: Array of marker IDs.
Returns:
Image with markers drawn.
"""
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
if corners is not None and len(corners) > 0:
# Draw outlines
# corners can be list of (1, 4, 2) or np.ndarray of (N, 4, 2)
int_corners = [c.astype(np.int32).reshape((-1, 1, 2)) for c in corners]
cv2.polylines(image, int_corners, True, (0, 255, 0), 2)
if ids is not None:
# Flatten ids to handle (N,) or (N, 1)
flat_ids = ids.flatten()
for i, corner in enumerate(corners):
# Put ID near the first corner
# corner can be (1, 4, 2) or (4, 2)
c = corner.reshape((-1, 2))
pos = c[0].astype(int)
cv2.putText(
image,
str(flat_ids[i]),
tuple(pos),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 255),
2,
)
return image
def draw_pose_axes(
image: np.ndarray,
rvec: np.ndarray,
tvec: np.ndarray,
K: np.ndarray,
dist: np.ndarray | None = None,
length: float = 0.1,
) -> np.ndarray:
"""
Draws pose axes on the image.
Args:
image: Input image.
rvec: Rotation vector.
tvec: Translation vector.
K: Camera intrinsic matrix.
dist: Distortion coefficients.
length: Length of the axes.
Returns:
Image with axes drawn.
"""
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
cv2.drawFrameAxes(image, K, dist, rvec, tvec, length)
return image
def show_preview(
frames: dict[int, np.ndarray], *, wait_ms: int = 1, window_prefix: str = "View "
) -> int:
"""
Shows multiple camera frames in separate windows.
Args:
frames: Dictionary mapping camera serials to images.
wait_ms: Time to wait for a key press in milliseconds.
window_prefix: Prefix for the window names.
Returns:
The key code pressed, or -1 if no key was pressed.
"""
for serial, frame in frames.items():
cv2.imshow(f"{window_prefix}{serial}", frame)
return cv2.waitKey(wait_ms)
+259
View File
@@ -0,0 +1,259 @@
import pyzed.sl as sl
import numpy as np
from dataclasses import dataclass
from typing import Any
import os
from loguru import logger
@dataclass
class FrameData:
"""Data structure for a single frame from an SVO."""
image: np.ndarray
timestamp_ns: int
frame_index: int
serial_number: int
depth_map: np.ndarray | None = None
confidence_map: np.ndarray | None = None
class SVOReader:
"""Handles synchronized playback of multiple SVO files."""
svo_paths: list[str]
runtime_params: sl.RuntimeParameters
_depth_mode: sl.DEPTH_MODE
def __init__(
self, svo_paths: list[str], depth_mode: sl.DEPTH_MODE = sl.DEPTH_MODE.NONE
):
self.svo_paths = svo_paths
self.cameras: list[sl.Camera] = []
self.camera_info: list[dict[str, Any]] = []
self.runtime_params = sl.RuntimeParameters()
self._depth_mode = depth_mode
for path in svo_paths:
if not os.path.exists(path):
print(f"Warning: SVO file not found: {path}")
continue
init_params = sl.InitParameters()
init_params.set_from_svo_file(path)
init_params.svo_real_time_mode = False
init_params.depth_mode = depth_mode
init_params.coordinate_units = sl.UNIT.METER
cam = sl.Camera()
status = cam.open(init_params)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Error: Could not open {path}: {status}")
continue
info = cam.get_camera_information()
self.cameras.append(cam)
self.camera_info.append(
{
"serial": info.serial_number,
"fps": info.camera_configuration.fps,
"total_frames": cam.get_svo_number_of_frames(),
}
)
def sync_to_latest_start(self):
"""Aligns all SVOs to the timestamp of the latest starting SVO."""
if not self.cameras:
return
start_timestamps = []
for cam in self.cameras:
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
start_timestamps.append(ts)
else:
start_timestamps.append(0)
if not start_timestamps:
return
max_start_ts = max(start_timestamps)
for i, cam in enumerate(self.cameras):
ts = start_timestamps[i]
if ts < max_start_ts:
diff_ns = max_start_ts - ts
fps = self.camera_info[i]["fps"]
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
cam.set_svo_position(frames_to_skip)
else:
cam.set_svo_position(0)
def grab_all(self) -> list[FrameData | None]:
"""Grabs a frame from all cameras without strict synchronization."""
frames: list[FrameData | None] = []
for i, cam in enumerate(self.cameras):
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
mat = sl.Mat()
cam.retrieve_image(mat, sl.VIEW.LEFT)
depth_map = self._retrieve_depth(cam)
confidence_map = self._retrieve_confidence(cam)
frames.append(
FrameData(
image=mat.get_data().copy(),
timestamp_ns=cam.get_timestamp(
sl.TIME_REFERENCE.IMAGE
).get_nanoseconds(),
frame_index=cam.get_svo_position(),
serial_number=self.camera_info[i]["serial"],
depth_map=depth_map,
confidence_map=confidence_map,
)
)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
cam.set_svo_position(0)
frames.append(None)
else:
frames.append(None)
return frames
def grab_synced(self, tolerance_ms: int = 33) -> list[FrameData | None]:
"""
Grabs frames from all cameras, attempting to keep them within tolerance_ms.
If a camera falls behind, it skips frames.
"""
frames = self.grab_all()
if not any(frames):
return frames
# Find latest timestamp
valid_timestamps = [f.timestamp_ns for f in frames if f is not None]
if not valid_timestamps:
return frames
max_ts = max(valid_timestamps)
tolerance_ns = tolerance_ms * 1_000_000
for i, frame in enumerate(frames):
if frame is None:
continue
if max_ts - frame.timestamp_ns > tolerance_ns:
cam = self.cameras[i]
fps = self.camera_info[i]["fps"]
diff_ns = max_ts - frame.timestamp_ns
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
if frames_to_skip > 0:
current_pos = cam.get_svo_position()
cam.set_svo_position(current_pos + frames_to_skip)
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
mat = sl.Mat()
cam.retrieve_image(mat, sl.VIEW.LEFT)
depth_map = self._retrieve_depth(cam)
confidence_map = self._retrieve_confidence(cam)
frames[i] = FrameData(
image=mat.get_data().copy(),
timestamp_ns=cam.get_timestamp(
sl.TIME_REFERENCE.IMAGE
).get_nanoseconds(),
frame_index=cam.get_svo_position(),
serial_number=self.camera_info[i]["serial"],
depth_map=depth_map,
confidence_map=confidence_map,
)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
cam.set_svo_position(0)
frames[i] = None
else:
frames[i] = None
return frames
@property
def enable_depth(self) -> bool:
return self._depth_mode != sl.DEPTH_MODE.NONE
def _retrieve_depth(self, cam: sl.Camera) -> np.ndarray | None:
if not self.enable_depth:
return None
depth_mat = sl.Mat()
cam.retrieve_measure(depth_mat, sl.MEASURE.DEPTH)
depth_data = depth_mat.get_data().copy()
# Check if units are already in meters to avoid double scaling.
# SDK coordinate_units is set to METER in __init__.
units = cam.get_init_parameters().coordinate_units
if units == sl.UNIT.METER:
depth = depth_data
else:
# Fallback for safety, though coordinate_units should be METER.
depth = depth_data / 1000.0
# Sanity check and debug logging
valid_mask = np.isfinite(depth) & (depth > 0)
if np.any(valid_mask):
valid_depths = depth[valid_mask]
logger.debug(
f"Depth stats (m) - Min: {np.min(valid_depths):.3f}, "
f"Median: {np.median(valid_depths):.3f}, "
f"Max: {np.max(valid_depths):.3f}, "
f"P95: {np.percentile(valid_depths, 95):.3f}"
)
else:
logger.warning("No valid depth values retrieved")
return depth
def _retrieve_confidence(self, cam: sl.Camera) -> np.ndarray | None:
if not self.enable_depth:
return None
conf_mat = sl.Mat()
cam.retrieve_measure(conf_mat, sl.MEASURE.CONFIDENCE)
return conf_mat.get_data().copy()
def get_depth_at(self, frame: FrameData, x: int, y: int) -> float | None:
if frame.depth_map is None:
return None
h, w = frame.depth_map.shape[:2]
if x < 0 or x >= w or y < 0 or y >= h:
return None
depth = frame.depth_map[y, x]
if not np.isfinite(depth) or depth <= 0:
return None
return float(depth)
def get_depth_window_median(
self, frame: FrameData, x: int, y: int, size: int = 5
) -> float | None:
if frame.depth_map is None:
return None
if size % 2 == 0:
size += 1
h, w = frame.depth_map.shape[:2]
half = size // 2
x_min = max(0, x - half)
x_max = min(w, x + half + 1)
y_min = max(0, y - half)
y_max = min(h, y + half + 1)
window = frame.depth_map[y_min:y_max, x_min:x_max]
valid_depths = window[np.isfinite(window) & (window > 0)]
if len(valid_depths) == 0:
return None
return float(np.median(valid_depths))
def close(self):
"""Closes all cameras."""
for cam in self.cameras:
cam.close()
self.cameras = []