219 lines
5.8 KiB
Python
219 lines
5.8 KiB
Python
import numpy as np
|
|
from typing import Optional, Tuple, List
|
|
from jaxtyping import Float
|
|
from typing import TYPE_CHECKING
|
|
import open3d as o3d
|
|
|
|
if TYPE_CHECKING:
|
|
Vec3 = Float[np.ndarray, "3"]
|
|
Mat44 = Float[np.ndarray, "4 4"]
|
|
PointsNC = Float[np.ndarray, "N 3"]
|
|
else:
|
|
Vec3 = np.ndarray
|
|
Mat44 = np.ndarray
|
|
PointsNC = np.ndarray
|
|
|
|
|
|
def unproject_depth_to_points(
|
|
depth_map: np.ndarray,
|
|
K: np.ndarray,
|
|
stride: int = 1,
|
|
depth_min: float = 0.1,
|
|
depth_max: float = 10.0,
|
|
) -> PointsNC:
|
|
"""
|
|
Unproject a depth map to a point cloud.
|
|
"""
|
|
h, w = depth_map.shape
|
|
fx = K[0, 0]
|
|
fy = K[1, 1]
|
|
cx = K[0, 2]
|
|
cy = K[1, 2]
|
|
|
|
# Create meshgrid of pixel coordinates
|
|
# Use stride to reduce number of points
|
|
u_coords = np.arange(0, w, stride)
|
|
v_coords = np.arange(0, h, stride)
|
|
u, v = np.meshgrid(u_coords, v_coords)
|
|
|
|
# Sample depth map
|
|
z = depth_map[0:h:stride, 0:w:stride]
|
|
|
|
# Filter by depth bounds
|
|
valid_mask = (z > depth_min) & (z < depth_max) & np.isfinite(z)
|
|
|
|
# Apply mask
|
|
z_valid = z[valid_mask]
|
|
u_valid = u[valid_mask]
|
|
v_valid = v[valid_mask]
|
|
|
|
# Unproject
|
|
x_valid = (u_valid - cx) * z_valid / fx
|
|
y_valid = (v_valid - cy) * z_valid / fy
|
|
|
|
# Stack into (N, 3) array
|
|
points = np.stack((x_valid, y_valid, z_valid), axis=-1)
|
|
|
|
return points.astype(np.float64)
|
|
|
|
|
|
def detect_floor_plane(
|
|
points: PointsNC,
|
|
distance_threshold: float = 0.02,
|
|
ransac_n: int = 3,
|
|
num_iterations: int = 1000,
|
|
seed: Optional[int] = None,
|
|
) -> Tuple[Optional[Vec3], float, int]:
|
|
"""
|
|
Detect the floor plane from a point cloud using RANSAC.
|
|
Returns (normal, d, num_inliers) where plane is normal.dot(p) + d = 0.
|
|
"""
|
|
if points.shape[0] < ransac_n:
|
|
return None, 0.0, 0
|
|
|
|
# Convert to Open3D PointCloud
|
|
pcd = o3d.geometry.PointCloud()
|
|
pcd.points = o3d.utility.Vector3dVector(points)
|
|
|
|
# Set seed for determinism if provided
|
|
if seed is not None:
|
|
o3d.utility.random.seed(seed)
|
|
|
|
# Segment plane
|
|
plane_model, inliers = pcd.segment_plane(
|
|
distance_threshold=distance_threshold,
|
|
ransac_n=ransac_n,
|
|
num_iterations=num_iterations,
|
|
)
|
|
|
|
if not plane_model:
|
|
return None, 0.0, 0
|
|
|
|
# plane_model is [a, b, c, d]
|
|
a, b, c, d = plane_model
|
|
normal = np.array([a, b, c], dtype=np.float64)
|
|
|
|
# Normalize normal (Open3D usually returns normalized, but be safe)
|
|
norm = np.linalg.norm(normal)
|
|
if norm > 1e-6:
|
|
normal /= norm
|
|
d /= norm
|
|
|
|
return normal, d, len(inliers)
|
|
|
|
|
|
def compute_consensus_plane(
|
|
planes: List[Tuple[Vec3, float]],
|
|
weights: Optional[List[float]] = None,
|
|
) -> Tuple[Vec3, float]:
|
|
"""
|
|
Compute a consensus plane from multiple plane detections.
|
|
"""
|
|
if not planes:
|
|
raise ValueError("No planes provided for consensus.")
|
|
|
|
n_planes = len(planes)
|
|
if weights is None:
|
|
weights = [1.0] * n_planes
|
|
|
|
if len(weights) != n_planes:
|
|
raise ValueError(
|
|
f"Weights length {len(weights)} must match planes length {n_planes}"
|
|
)
|
|
|
|
# Use the first plane as reference for orientation
|
|
ref_normal = planes[0][0]
|
|
|
|
accum_normal = np.zeros(3, dtype=np.float64)
|
|
accum_d = 0.0
|
|
total_weight = 0.0
|
|
|
|
for i, (normal, d) in enumerate(planes):
|
|
w = weights[i]
|
|
|
|
# Check orientation against reference
|
|
if np.dot(normal, ref_normal) < 0:
|
|
# Flip normal and d to align with reference
|
|
normal = -normal
|
|
d = -d
|
|
|
|
accum_normal += normal * w
|
|
accum_d += d * w
|
|
total_weight += w
|
|
|
|
if total_weight <= 0:
|
|
raise ValueError("Total weight must be positive.")
|
|
|
|
avg_normal = accum_normal / total_weight
|
|
avg_d = accum_d / total_weight
|
|
|
|
# Re-normalize normal
|
|
norm = np.linalg.norm(avg_normal)
|
|
if norm > 1e-6:
|
|
avg_normal /= norm
|
|
# Scale d by 1/norm to maintain plane equation consistency
|
|
avg_d /= norm
|
|
else:
|
|
# Fallback (should be rare if inputs are valid)
|
|
avg_normal = np.array([0.0, 1.0, 0.0])
|
|
avg_d = 0.0
|
|
|
|
return avg_normal, float(avg_d)
|
|
|
|
|
|
from .alignment import rotation_align_vectors
|
|
|
|
|
|
def compute_floor_correction(
|
|
current_floor_plane: Tuple[Vec3, float],
|
|
target_floor_y: float = 0.0,
|
|
max_rotation_deg: float = 5.0,
|
|
max_translation_m: float = 0.1,
|
|
) -> Optional[Mat44]:
|
|
"""
|
|
Compute the correction transform to align the current floor plane to the target floor height.
|
|
Constrains correction to pitch/roll and vertical translation only.
|
|
"""
|
|
current_normal, current_d = current_floor_plane
|
|
|
|
# Target normal is always [0, 1, 0] (Y-up)
|
|
target_normal = np.array([0.0, 1.0, 0.0])
|
|
|
|
# 1. Compute rotation to align normals
|
|
try:
|
|
R_align = rotation_align_vectors(current_normal, target_normal)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Check rotation magnitude
|
|
# Angle of rotation is acos((trace(R) - 1) / 2)
|
|
trace = np.trace(R_align)
|
|
# Clip to avoid numerical errors outside [-1, 1]
|
|
cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0)
|
|
angle_rad = np.arccos(cos_angle)
|
|
angle_deg = np.rad2deg(angle_rad)
|
|
|
|
if angle_deg > max_rotation_deg:
|
|
return None
|
|
|
|
# 2. Compute translation
|
|
# We want to move points such that the floor is at y = target_floor_y
|
|
# Plane equation: n . p + d = 0
|
|
# Current floor at y = -current_d (if n=[0,1,0])
|
|
# We want new y = target_floor_y
|
|
# So shift = target_floor_y - (-current_d) = target_floor_y + current_d
|
|
|
|
t_y = target_floor_y + current_d
|
|
|
|
# Check translation magnitude
|
|
if abs(t_y) > max_translation_m:
|
|
return None
|
|
|
|
# Construct T
|
|
T = np.eye(4)
|
|
T[:3, :3] = R_align
|
|
# Translation is applied in the rotated frame (aligned to target normal)
|
|
T[:3, 3] = target_normal * t_y
|
|
|
|
return T.astype(np.float64)
|