feat(calibration): add data-driven ground alignment with debug and fast iteration flags
This commit is contained in:
@@ -0,0 +1,237 @@
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def compute_face_normal(corners: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Compute the normal vector of a face defined by its corners.
|
||||
Assumes corners are in order (e.g., clockwise or counter-clockwise).
|
||||
|
||||
Args:
|
||||
corners: (N, 3) array of corner coordinates.
|
||||
|
||||
Returns:
|
||||
(3,) normalized normal vector.
|
||||
"""
|
||||
if corners.ndim != 2 or corners.shape[1] != 3:
|
||||
raise ValueError(f"Expected (N, 3) array, got {corners.shape}")
|
||||
if corners.shape[0] < 3:
|
||||
raise ValueError("At least 3 corners are required to compute a normal.")
|
||||
|
||||
# Use the cross product of two edges
|
||||
# Stable edge definition for quad faces consistent with plan/repo convention:
|
||||
# normal from (corners[1]-corners[0]) x (corners[3]-corners[0]) when N>=4,
|
||||
# fallback to index 2 if N==3.
|
||||
v1 = corners[1] - corners[0]
|
||||
if corners.shape[0] >= 4:
|
||||
v2 = corners[3] - corners[0]
|
||||
else:
|
||||
v2 = corners[2] - corners[0]
|
||||
|
||||
normal = np.cross(v1, v2)
|
||||
norm = np.linalg.norm(normal)
|
||||
|
||||
if norm < 1e-10:
|
||||
raise ValueError("Corners are collinear or degenerate; cannot compute normal.")
|
||||
|
||||
return (normal / norm).astype(np.float64)
|
||||
|
||||
|
||||
def rotation_align_vectors(from_vec: np.ndarray, to_vec: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Compute the 3x3 rotation matrix that aligns from_vec to to_vec.
|
||||
|
||||
Args:
|
||||
from_vec: (3,) source vector.
|
||||
to_vec: (3,) target vector.
|
||||
|
||||
Returns:
|
||||
(3, 3) rotation matrix.
|
||||
"""
|
||||
if from_vec.shape != (3,) or to_vec.shape != (3,):
|
||||
raise ValueError(
|
||||
f"Expected (3,) vectors, got {from_vec.shape} and {to_vec.shape}"
|
||||
)
|
||||
|
||||
norm_from = np.linalg.norm(from_vec)
|
||||
norm_to = np.linalg.norm(to_vec)
|
||||
|
||||
if norm_from < 1e-10 or norm_to < 1e-10:
|
||||
raise ValueError("Cannot align zero-norm vectors.")
|
||||
|
||||
# Normalize inputs
|
||||
a = from_vec / norm_from
|
||||
b = to_vec / norm_to
|
||||
|
||||
v = np.cross(a, b)
|
||||
c = np.dot(a, b)
|
||||
s = np.linalg.norm(v)
|
||||
|
||||
# Handle parallel case
|
||||
if s < 1e-10:
|
||||
if c > 0:
|
||||
return np.eye(3, dtype=np.float64)
|
||||
else:
|
||||
# Anti-parallel case: 180 degree rotation around an orthogonal axis
|
||||
# Find an orthogonal axis
|
||||
if abs(a[0]) < 0.9:
|
||||
ortho = np.array([1.0, 0.0, 0.0])
|
||||
else:
|
||||
ortho = np.array([0.0, 1.0, 0.0])
|
||||
|
||||
axis = np.cross(a, ortho)
|
||||
axis /= np.linalg.norm(axis)
|
||||
|
||||
# Rodrigues formula for 180 degrees
|
||||
K = np.array(
|
||||
[
|
||||
[0.0, -axis[2], axis[1]],
|
||||
[axis[2], 0.0, -axis[0]],
|
||||
[-axis[1], axis[0], 0.0],
|
||||
]
|
||||
)
|
||||
return (np.eye(3) + 2 * (K @ K)).astype(np.float64)
|
||||
|
||||
# General case using Rodrigues' rotation formula
|
||||
# R = I + [v]_x + [v]_x^2 * (1-c)/s^2
|
||||
vx = np.array([[0.0, -v[2], v[1]], [v[2], 0.0, -v[0]], [-v[1], v[0], 0.0]])
|
||||
|
||||
R = np.eye(3) + vx + (vx @ vx) * ((1 - c) / (s**2))
|
||||
return R.astype(np.float64)
|
||||
|
||||
|
||||
def apply_alignment_to_pose(T: np.ndarray, R_align: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Apply an alignment rotation to a 4x4 pose matrix.
|
||||
The alignment is applied in the global frame (pre-multiplication of rotation).
|
||||
|
||||
Args:
|
||||
T: (4, 4) homogeneous transformation matrix.
|
||||
R_align: (3, 3) alignment rotation matrix.
|
||||
|
||||
Returns:
|
||||
(4, 4) aligned transformation matrix.
|
||||
"""
|
||||
if T.shape != (4, 4):
|
||||
raise ValueError(f"Expected 4x4 matrix, got {T.shape}")
|
||||
if R_align.shape != (3, 3):
|
||||
raise ValueError(f"Expected 3x3 matrix, got {R_align.shape}")
|
||||
|
||||
T_align = np.eye(4, dtype=np.float64)
|
||||
T_align[:3, :3] = R_align
|
||||
|
||||
return (T_align @ T).astype(np.float64)
|
||||
|
||||
|
||||
def get_face_normal_from_geometry(
|
||||
face_name: str,
|
||||
marker_geometry: dict[int, np.ndarray],
|
||||
face_marker_map: dict[str, list[int]] | None = None,
|
||||
) -> np.ndarray | None:
|
||||
"""
|
||||
Compute the average normal vector for a face based on available marker geometry.
|
||||
|
||||
Args:
|
||||
face_name: Name of the face (key in face_marker_map).
|
||||
marker_geometry: Dictionary mapping marker IDs to their (4, 3) corner coordinates.
|
||||
face_marker_map: Dictionary mapping face names to marker IDs.
|
||||
|
||||
Returns:
|
||||
(3,) normalized average normal vector, or None if no markers are available.
|
||||
"""
|
||||
if face_marker_map is None:
|
||||
return None
|
||||
|
||||
if face_name not in face_marker_map:
|
||||
return None
|
||||
|
||||
marker_ids = face_marker_map[face_name]
|
||||
normals = []
|
||||
|
||||
for mid in marker_ids:
|
||||
if mid in marker_geometry:
|
||||
try:
|
||||
n = compute_face_normal(marker_geometry[mid])
|
||||
normals.append(n)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if not normals:
|
||||
return None
|
||||
|
||||
avg_normal = np.mean(normals, axis=0)
|
||||
norm = np.linalg.norm(avg_normal)
|
||||
|
||||
if norm < 1e-10:
|
||||
return None
|
||||
|
||||
return (avg_normal / norm).astype(np.float64)
|
||||
|
||||
|
||||
def detect_ground_face(
|
||||
visible_marker_ids: set[int],
|
||||
marker_geometry: dict[int, np.ndarray],
|
||||
camera_up_vector: np.ndarray = np.array([0, -1, 0]),
|
||||
face_marker_map: dict[str, list[int]] | None = None,
|
||||
) -> tuple[str, np.ndarray] | None:
|
||||
"""
|
||||
Detect which face of the object is most likely the ground face.
|
||||
The ground face is the one whose normal is most aligned with the camera's up vector.
|
||||
|
||||
Args:
|
||||
visible_marker_ids: Set of marker IDs currently visible.
|
||||
marker_geometry: Dictionary mapping marker IDs to their (4, 3) corner coordinates.
|
||||
camera_up_vector: (3,) vector representing the 'up' direction in camera frame.
|
||||
face_marker_map: Dictionary mapping face names to marker IDs.
|
||||
|
||||
Returns:
|
||||
Tuple of (best_face_name, best_face_normal) or None if no face is detected.
|
||||
"""
|
||||
if not visible_marker_ids:
|
||||
return None
|
||||
|
||||
if face_marker_map is None:
|
||||
return None
|
||||
|
||||
if camera_up_vector.shape != (3,):
|
||||
raise ValueError(
|
||||
f"Expected (3,) camera_up_vector, got {camera_up_vector.shape}"
|
||||
)
|
||||
|
||||
up_norm = np.linalg.norm(camera_up_vector)
|
||||
if up_norm < 1e-10:
|
||||
raise ValueError("camera_up_vector cannot be zero-norm.")
|
||||
|
||||
up_vec = camera_up_vector / up_norm
|
||||
best_face = None
|
||||
best_normal = None
|
||||
best_score = -np.inf
|
||||
|
||||
# Iterate faces in mapping
|
||||
for face_name, face_marker_ids in face_marker_map.items():
|
||||
# Consider only faces with any visible marker ID
|
||||
if not any(mid in visible_marker_ids for mid in face_marker_ids):
|
||||
continue
|
||||
|
||||
normal = get_face_normal_from_geometry(
|
||||
face_name, marker_geometry, face_marker_map=face_marker_map
|
||||
)
|
||||
if normal is None:
|
||||
continue
|
||||
|
||||
# Score by dot(normal, normalized camera_up_vector)
|
||||
score = np.dot(normal, up_vec)
|
||||
logger.debug(f"Face '{face_name}' considered. Score: {score:.4f}")
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_face = face_name
|
||||
best_normal = normal
|
||||
|
||||
if best_face is not None and best_normal is not None:
|
||||
logger.debug(
|
||||
f"Best ground face detected: '{best_face}' with score {best_score:.4f}"
|
||||
)
|
||||
return best_face, best_normal
|
||||
|
||||
return None
|
||||
@@ -37,6 +37,46 @@ def load_marker_geometry(parquet_path: Union[str, Path]) -> dict[int, np.ndarray
|
||||
return marker_geometry
|
||||
|
||||
|
||||
def load_face_mapping(parquet_path: Union[str, Path]) -> dict[str, list[int]]:
|
||||
"""
|
||||
Reads face mapping from a parquet file.
|
||||
|
||||
The parquet file is expected to have 'name' and 'ids' columns.
|
||||
'name' should be a string (face name), and 'ids' should be a list of integers.
|
||||
|
||||
Args:
|
||||
parquet_path: Path to the parquet file.
|
||||
|
||||
Returns:
|
||||
A dictionary mapping face names (lowercase) to lists of marker IDs.
|
||||
"""
|
||||
path = Path(parquet_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Parquet file not found: {path}")
|
||||
|
||||
ops = ak.from_parquet(path)
|
||||
|
||||
# Check if required columns exist
|
||||
if "name" not in ops.fields or "ids" not in ops.fields:
|
||||
# Fallback or empty if columns missing (e.g. old format)
|
||||
return {}
|
||||
|
||||
names = cast(np.ndarray, ak.to_numpy(ops["name"]))
|
||||
# ids might be a jagged array if different faces have different marker counts
|
||||
# ak.to_list() converts to python lists which is what we want for the dict values
|
||||
ids_list = ak.to_list(ops["ids"])
|
||||
|
||||
face_map: dict[str, list[int]] = {}
|
||||
for name, m_ids in zip(names, ids_list):
|
||||
if name and m_ids:
|
||||
# Normalize name to lowercase
|
||||
key = str(name).lower()
|
||||
# Ensure IDs are ints
|
||||
face_map[key] = [int(mid) for mid in m_ids]
|
||||
|
||||
return face_map
|
||||
|
||||
|
||||
def validate_marker_geometry(geometry: dict[int, np.ndarray]) -> None:
|
||||
"""
|
||||
Validates the marker geometry dictionary.
|
||||
|
||||
Binary file not shown.
@@ -14,6 +14,7 @@ class FrameData:
|
||||
frame_index: int
|
||||
serial_number: int
|
||||
depth_map: np.ndarray | None = None
|
||||
confidence_map: np.ndarray | None = None
|
||||
|
||||
|
||||
class SVOReader:
|
||||
@@ -96,6 +97,7 @@ class SVOReader:
|
||||
mat = sl.Mat()
|
||||
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
||||
depth_map = self._retrieve_depth(cam)
|
||||
confidence_map = self._retrieve_confidence(cam)
|
||||
frames.append(
|
||||
FrameData(
|
||||
image=mat.get_data().copy(),
|
||||
@@ -105,6 +107,7 @@ class SVOReader:
|
||||
frame_index=cam.get_svo_position(),
|
||||
serial_number=self.camera_info[i]["serial"],
|
||||
depth_map=depth_map,
|
||||
confidence_map=confidence_map,
|
||||
)
|
||||
)
|
||||
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
||||
@@ -151,6 +154,7 @@ class SVOReader:
|
||||
mat = sl.Mat()
|
||||
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
||||
depth_map = self._retrieve_depth(cam)
|
||||
confidence_map = self._retrieve_confidence(cam)
|
||||
frames[i] = FrameData(
|
||||
image=mat.get_data().copy(),
|
||||
timestamp_ns=cam.get_timestamp(
|
||||
@@ -159,6 +163,7 @@ class SVOReader:
|
||||
frame_index=cam.get_svo_position(),
|
||||
serial_number=self.camera_info[i]["serial"],
|
||||
depth_map=depth_map,
|
||||
confidence_map=confidence_map,
|
||||
)
|
||||
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
||||
cam.set_svo_position(0)
|
||||
@@ -179,6 +184,13 @@ class SVOReader:
|
||||
cam.retrieve_measure(depth_mat, sl.MEASURE.DEPTH)
|
||||
return depth_mat.get_data().copy()
|
||||
|
||||
def _retrieve_confidence(self, cam: sl.Camera) -> np.ndarray | None:
|
||||
if not self.enable_depth:
|
||||
return None
|
||||
conf_mat = sl.Mat()
|
||||
cam.retrieve_measure(conf_mat, sl.MEASURE.CONFIDENCE)
|
||||
return conf_mat.get_data().copy()
|
||||
|
||||
def get_depth_at(self, frame: FrameData, x: int, y: int) -> float | None:
|
||||
if frame.depth_map is None:
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user