feat(rgbd): add RGB-D reconstruction pipeline
Add end-to-end RGB-D reconstruction support across the C++ core and Python API. - add a native merge_rgbd_views path, view-aware 3D pose containers, and nanobind bindings - expose Python helpers to sample aligned depth, apply per-joint offsets, lift UVD poses to world space, and run reconstruct_rgbd - add RGB-D regression tests for merging, manual pipeline parity, symmetric depth sampling windows, and out-of-bounds joints - bump the project version from 0.1.0 to 0.2.0 for the new feature surface
This commit is contained in:
+74
-1
@@ -24,6 +24,7 @@ from ._core import (
|
||||
build_pair_candidates as _build_pair_candidates,
|
||||
filter_pairs_with_previous_poses as _filter_pairs_with_previous_poses,
|
||||
make_camera as _make_camera,
|
||||
merge_rgbd_views as _merge_rgbd_views,
|
||||
triangulate_debug as _triangulate_debug,
|
||||
triangulate_poses as _triangulate_poses,
|
||||
triangulate_with_report as _triangulate_with_report,
|
||||
@@ -33,10 +34,11 @@ if TYPE_CHECKING:
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
|
||||
from ._helpers import CameraLike, CameraModelLike, Matrix3x3Like, PoseViewLike, VectorLike
|
||||
from ._helpers import CameraLike, CameraModelLike, DepthImageLike, Matrix3x3Like, PoseViewLike, VectorLike
|
||||
|
||||
PoseArray2D = npt.NDArray[np.float32]
|
||||
PoseArray3D = npt.NDArray[np.float32]
|
||||
PoseArray3DByView = npt.NDArray[np.float32]
|
||||
PersonCountArray = npt.NDArray[np.uint32]
|
||||
TrackIdArray = npt.NDArray[np.int64]
|
||||
|
||||
@@ -103,6 +105,42 @@ def pack_poses_2d(
|
||||
return _pack_poses_2d(views, joint_count=joint_count)
|
||||
|
||||
|
||||
def sample_depth_for_poses(
|
||||
poses_2d: "PoseArray2D",
|
||||
person_counts: "PersonCountArray",
|
||||
depth_images: "Sequence[DepthImageLike]",
|
||||
*,
|
||||
window_size: int = 7,
|
||||
) -> "PoseArray3D":
|
||||
"""Sample aligned depth for visible 2D joints and return `[u, v, d, score]` rows."""
|
||||
|
||||
from ._helpers import sample_depth_for_poses as _sample_depth_for_poses
|
||||
|
||||
return _sample_depth_for_poses(poses_2d, person_counts, depth_images, window_size=window_size)
|
||||
|
||||
|
||||
def apply_depth_offsets(
|
||||
poses_uvd: "PoseArray3D",
|
||||
joint_names: "Sequence[str]",
|
||||
) -> "PoseArray3D":
|
||||
"""Apply the SimpleDepthPose per-joint depth offsets to `[u, v, d, score]` rows."""
|
||||
|
||||
from ._helpers import apply_depth_offsets as _apply_depth_offsets
|
||||
|
||||
return _apply_depth_offsets(poses_uvd, joint_names)
|
||||
|
||||
|
||||
def lift_depth_poses_to_world(
|
||||
poses_uvd: "PoseArray3D",
|
||||
cameras: "Sequence[CameraLike]",
|
||||
) -> "PoseArray3DByView":
|
||||
"""Lift `[u, v, d, score]` joints into world-space `[x, y, z, score]` poses."""
|
||||
|
||||
from ._helpers import lift_depth_poses_to_world as _lift_depth_poses_to_world
|
||||
|
||||
return _lift_depth_poses_to_world(poses_uvd, cameras)
|
||||
|
||||
|
||||
def make_triangulation_config(
|
||||
cameras: "Sequence[CameraLike]",
|
||||
roomparams: "npt.NDArray[np.generic] | Sequence[Sequence[float]]",
|
||||
@@ -172,6 +210,36 @@ def triangulate_poses(
|
||||
return _triangulate_poses(poses_2d, person_counts, config)
|
||||
|
||||
|
||||
def merge_rgbd_views(
|
||||
poses_3d: "PoseArray3DByView",
|
||||
person_counts: "PersonCountArray",
|
||||
config: TriangulationConfig,
|
||||
*,
|
||||
max_distance: float = 0.5,
|
||||
) -> "PoseArray3D":
|
||||
"""Merge per-view world-space RGBD pose proposals into final 3D poses."""
|
||||
return _merge_rgbd_views(poses_3d, person_counts, config, float(max_distance))
|
||||
|
||||
|
||||
def reconstruct_rgbd(
|
||||
poses_2d: "PoseArray2D",
|
||||
person_counts: "PersonCountArray",
|
||||
depth_images: "Sequence[DepthImageLike]",
|
||||
config: TriangulationConfig,
|
||||
*,
|
||||
use_depth_offsets: bool = True,
|
||||
window_size: int = 7,
|
||||
max_distance: float = 0.5,
|
||||
) -> "PoseArray3D":
|
||||
"""Reconstruct per-frame RGBD poses from calibrated detections and aligned depth images."""
|
||||
|
||||
poses_uvd = sample_depth_for_poses(poses_2d, person_counts, depth_images, window_size=window_size)
|
||||
if use_depth_offsets:
|
||||
poses_uvd = apply_depth_offsets(poses_uvd, config.joint_names)
|
||||
poses_3d = lift_depth_poses_to_world(poses_uvd, config.cameras)
|
||||
return merge_rgbd_views(poses_3d, person_counts, config, max_distance=max_distance)
|
||||
|
||||
|
||||
def triangulate_with_report(
|
||||
poses_2d: "PoseArray2D",
|
||||
person_counts: "PersonCountArray",
|
||||
@@ -200,6 +268,7 @@ __all__ = [
|
||||
"CameraModel",
|
||||
"AssociationReport",
|
||||
"AssociationStatus",
|
||||
"apply_depth_offsets",
|
||||
"FinalPoseAssociationDebug",
|
||||
"TriangulationConfig",
|
||||
"TriangulationOptions",
|
||||
@@ -216,9 +285,13 @@ __all__ = [
|
||||
"build_pair_candidates",
|
||||
"convert_cameras",
|
||||
"filter_pairs_with_previous_poses",
|
||||
"lift_depth_poses_to_world",
|
||||
"make_camera",
|
||||
"make_triangulation_config",
|
||||
"merge_rgbd_views",
|
||||
"pack_poses_2d",
|
||||
"reconstruct_rgbd",
|
||||
"sample_depth_for_poses",
|
||||
"triangulate_debug",
|
||||
"triangulate_poses",
|
||||
"triangulate_with_report",
|
||||
|
||||
+44
-1
@@ -23,10 +23,11 @@ from ._core import (
|
||||
TriangulationResult,
|
||||
TriangulationTrace,
|
||||
)
|
||||
from ._helpers import CameraLike, CameraModelLike, Matrix3x3Like, PoseViewLike, RoomParamsLike, VectorLike
|
||||
from ._helpers import CameraLike, CameraModelLike, DepthImageLike, Matrix3x3Like, PoseViewLike, RoomParamsLike, VectorLike
|
||||
|
||||
PoseArray2D: TypeAlias = npt.NDArray[np.float32]
|
||||
PoseArray3D: TypeAlias = npt.NDArray[np.float32]
|
||||
PoseArray3DByView: TypeAlias = npt.NDArray[np.float32]
|
||||
PersonCountArray: TypeAlias = npt.NDArray[np.uint32]
|
||||
TrackIdArray: TypeAlias = npt.NDArray[np.int64]
|
||||
|
||||
@@ -59,6 +60,27 @@ def pack_poses_2d(
|
||||
) -> tuple[npt.NDArray[np.float32], npt.NDArray[np.uint32]]: ...
|
||||
|
||||
|
||||
def sample_depth_for_poses(
|
||||
poses_2d: PoseArray2D,
|
||||
person_counts: PersonCountArray,
|
||||
depth_images: Sequence[DepthImageLike],
|
||||
*,
|
||||
window_size: int = 7,
|
||||
) -> PoseArray3D: ...
|
||||
|
||||
|
||||
def apply_depth_offsets(
|
||||
poses_uvd: PoseArray3D,
|
||||
joint_names: Sequence[str],
|
||||
) -> PoseArray3D: ...
|
||||
|
||||
|
||||
def lift_depth_poses_to_world(
|
||||
poses_uvd: PoseArray3D,
|
||||
cameras: Sequence[CameraLike],
|
||||
) -> PoseArray3DByView: ...
|
||||
|
||||
|
||||
def make_triangulation_config(
|
||||
cameras: Sequence[CameraLike],
|
||||
roomparams: RoomParamsLike,
|
||||
@@ -103,6 +125,27 @@ def triangulate_poses(
|
||||
) -> PoseArray3D: ...
|
||||
|
||||
|
||||
def merge_rgbd_views(
|
||||
poses_3d: PoseArray3DByView,
|
||||
person_counts: PersonCountArray,
|
||||
config: TriangulationConfig,
|
||||
*,
|
||||
max_distance: float = 0.5,
|
||||
) -> PoseArray3D: ...
|
||||
|
||||
|
||||
def reconstruct_rgbd(
|
||||
poses_2d: PoseArray2D,
|
||||
person_counts: PersonCountArray,
|
||||
depth_images: Sequence[DepthImageLike],
|
||||
config: TriangulationConfig,
|
||||
*,
|
||||
use_depth_offsets: bool = True,
|
||||
window_size: int = 7,
|
||||
max_distance: float = 0.5,
|
||||
) -> PoseArray3D: ...
|
||||
|
||||
|
||||
def triangulate_with_report(
|
||||
poses_2d: PoseArray2D,
|
||||
person_counts: PersonCountArray,
|
||||
|
||||
@@ -12,6 +12,7 @@ Matrix3x3Like: TypeAlias = Sequence[Sequence[float]]
|
||||
VectorLike: TypeAlias = Sequence[float]
|
||||
RoomParamsLike: TypeAlias = npt.NDArray[np.generic] | Sequence[Sequence[float]]
|
||||
PoseViewLike: TypeAlias = npt.NDArray[np.generic] | Sequence[Sequence[Sequence[float]]] | Sequence[Sequence[float]]
|
||||
DepthImageLike: TypeAlias = npt.NDArray[np.generic] | Sequence[Sequence[float]]
|
||||
|
||||
|
||||
class CameraDict(TypedDict, total=False):
|
||||
@@ -29,6 +30,29 @@ class CameraDict(TypedDict, total=False):
|
||||
CameraModelLike: TypeAlias = CameraModel | Literal["pinhole", "fisheye"]
|
||||
CameraLike = Camera | CameraDict
|
||||
|
||||
DEFAULT_DEPTH_OFFSETS_METERS: dict[str, float] = {
|
||||
"nose": 0.005,
|
||||
"eye_left": 0.005,
|
||||
"eye_right": 0.005,
|
||||
"ear_left": 0.005,
|
||||
"ear_right": 0.005,
|
||||
"shoulder_left": 0.03,
|
||||
"shoulder_right": 0.03,
|
||||
"elbow_left": 0.02,
|
||||
"elbow_right": 0.02,
|
||||
"wrist_left": 0.01,
|
||||
"wrist_right": 0.01,
|
||||
"hip_left": 0.04,
|
||||
"hip_right": 0.04,
|
||||
"knee_left": 0.03,
|
||||
"knee_right": 0.03,
|
||||
"ankle_left": 0.03,
|
||||
"ankle_right": 0.03,
|
||||
"hip_middle": 0.04,
|
||||
"shoulder_middle": 0.03,
|
||||
"head": 0.0,
|
||||
}
|
||||
|
||||
|
||||
def _coerce_camera_model(model: CameraModelLike) -> CameraModel:
|
||||
if isinstance(model, CameraModel):
|
||||
@@ -55,6 +79,15 @@ def _coerce_distortion(distortion: VectorLike, camera_model: CameraModel) -> tup
|
||||
return values
|
||||
|
||||
|
||||
def _coerce_depth_image(depth_image: DepthImageLike) -> npt.NDArray[np.float32]:
|
||||
array = np.asarray(depth_image, dtype=np.float32)
|
||||
if array.ndim == 3 and array.shape[-1] == 1:
|
||||
array = np.squeeze(array, axis=-1)
|
||||
if array.ndim != 2:
|
||||
raise ValueError("Each depth image must have shape [height, width] or [height, width, 1].")
|
||||
return np.ascontiguousarray(array, dtype=np.float32)
|
||||
|
||||
|
||||
def convert_cameras(cameras: Sequence[CameraLike]) -> list[Camera]:
|
||||
"""Normalize mappings or existing Camera objects into bound Camera instances."""
|
||||
|
||||
@@ -157,3 +190,136 @@ def make_triangulation_config(
|
||||
options.min_group_size = int(min_group_size)
|
||||
config.options = options
|
||||
return config
|
||||
|
||||
|
||||
def sample_depth_for_poses(
|
||||
poses_2d: npt.NDArray[np.generic],
|
||||
person_counts: npt.NDArray[np.generic],
|
||||
depth_images: Sequence[DepthImageLike],
|
||||
*,
|
||||
window_size: int = 7,
|
||||
) -> npt.NDArray[np.float32]:
|
||||
"""Sample aligned depth for each visible 2D joint and return `[u, v, d, score]` rows."""
|
||||
|
||||
poses = np.asarray(poses_2d, dtype=np.float32)
|
||||
counts = np.asarray(person_counts, dtype=np.uint32)
|
||||
if poses.ndim != 4 or poses.shape[-1] != 3:
|
||||
raise ValueError("poses_2d must have shape [views, max_persons, joints, 3].")
|
||||
if counts.ndim != 1 or counts.shape[0] != poses.shape[0]:
|
||||
raise ValueError("person_counts must be a 1D array aligned with the pose views.")
|
||||
if len(depth_images) != poses.shape[0]:
|
||||
raise ValueError("depth_images must have the same number of views as poses_2d.")
|
||||
if window_size <= 0:
|
||||
raise ValueError("window_size must be positive.")
|
||||
radius = window_size // 2
|
||||
|
||||
poses_uvd = np.zeros((poses.shape[0], poses.shape[1], poses.shape[2], 4), dtype=np.float32)
|
||||
for view_idx, depth_image in enumerate(depth_images):
|
||||
depth = _coerce_depth_image(depth_image)
|
||||
poses_uvd[view_idx, :, :, :2] = poses[view_idx, :, :, :2]
|
||||
poses_uvd[view_idx, :, :, 3] = poses[view_idx, :, :, 2]
|
||||
|
||||
valid_persons = int(counts[view_idx])
|
||||
if valid_persons == 0:
|
||||
continue
|
||||
|
||||
joints = poses[view_idx, :valid_persons, :, :2].astype(np.int32, copy=False).reshape(-1, 2)
|
||||
scores = poses[view_idx, :valid_persons, :, 2:3].reshape(-1, 1)
|
||||
|
||||
depth_padded = np.pad(depth, radius, mode="constant", constant_values=0)
|
||||
offsets = np.arange(-radius, radius + 1, dtype=np.int32)
|
||||
valid_xy = (
|
||||
(joints[:, 0] >= 0)
|
||||
& (joints[:, 0] < depth.shape[1])
|
||||
& (joints[:, 1] >= 0)
|
||||
& (joints[:, 1] < depth.shape[0])
|
||||
)
|
||||
clamped_x = np.clip(joints[:, 0], 0, depth.shape[1] - 1)
|
||||
clamped_y = np.clip(joints[:, 1], 0, depth.shape[0] - 1)
|
||||
center_x = clamped_x[:, None] + radius
|
||||
center_y = clamped_y[:, None] + radius
|
||||
vertical_grid = np.clip(np.add.outer(clamped_y, offsets) + radius, 0, depth_padded.shape[0] - 1)
|
||||
horizontal_grid = np.clip(
|
||||
np.add.outer(clamped_x, offsets) + radius, 0, depth_padded.shape[1] - 1
|
||||
)
|
||||
|
||||
vertical_depths = depth_padded[vertical_grid, center_x]
|
||||
horizontal_depths = depth_padded[center_y, horizontal_grid]
|
||||
all_depths = np.concatenate((vertical_depths, horizontal_depths), axis=1).astype(np.float32)
|
||||
all_depths[~valid_xy] = np.nan
|
||||
all_depths[all_depths <= 0] = np.nan
|
||||
|
||||
valid_depth_rows = ~np.isnan(all_depths).all(axis=1)
|
||||
sampled_depths = np.zeros((all_depths.shape[0],), dtype=np.float32)
|
||||
if np.any(valid_depth_rows):
|
||||
with np.errstate(all="ignore"):
|
||||
sampled_depths[valid_depth_rows] = np.nanmedian(all_depths[valid_depth_rows], axis=1)
|
||||
|
||||
valid_mask = ((sampled_depths > 0.0).astype(np.float32)[:, None] * (scores > 0.0).astype(np.float32))
|
||||
sampled_depths = sampled_depths.reshape(valid_persons, poses.shape[2], 1)
|
||||
valid_mask = valid_mask.reshape(valid_persons, poses.shape[2], 1)
|
||||
|
||||
poses_uvd[view_idx, :valid_persons, :, 2:3] = sampled_depths
|
||||
poses_uvd[view_idx, :valid_persons] *= np.concatenate((valid_mask, valid_mask, valid_mask, valid_mask), axis=-1)
|
||||
|
||||
return poses_uvd
|
||||
|
||||
|
||||
def apply_depth_offsets(
|
||||
poses_uvd: npt.NDArray[np.generic],
|
||||
joint_names: Sequence[str],
|
||||
) -> npt.NDArray[np.float32]:
|
||||
"""Apply the SimpleDepthPose per-joint depth offsets in meters."""
|
||||
|
||||
poses = np.asarray(poses_uvd, dtype=np.float32)
|
||||
if poses.ndim != 4 or poses.shape[-1] != 4:
|
||||
raise ValueError("poses_uvd must have shape [views, max_persons, joints, 4].")
|
||||
if len(joint_names) != poses.shape[2]:
|
||||
raise ValueError("joint_names must have the same number of joints as poses_uvd.")
|
||||
|
||||
result = poses.copy()
|
||||
offsets = np.asarray(
|
||||
[DEFAULT_DEPTH_OFFSETS_METERS.get(str(joint_name), 0.0) for joint_name in joint_names],
|
||||
dtype=np.float32,
|
||||
)
|
||||
depth_mask = (result[:, :, :, 2:3] > 0.0).astype(np.float32)
|
||||
result[:, :, :, 2:3] += depth_mask * offsets[np.newaxis, np.newaxis, :, np.newaxis] * 1000.0
|
||||
return result
|
||||
|
||||
|
||||
def lift_depth_poses_to_world(
|
||||
poses_uvd: npt.NDArray[np.generic],
|
||||
cameras: Sequence[CameraLike],
|
||||
) -> npt.NDArray[np.float32]:
|
||||
"""Lift `[u, v, d, score]` joints into world-space `[x, y, z, score]` poses."""
|
||||
|
||||
poses = np.asarray(poses_uvd, dtype=np.float32)
|
||||
if poses.ndim != 4 or poses.shape[-1] != 4:
|
||||
raise ValueError("poses_uvd must have shape [views, max_persons, joints, 4].")
|
||||
|
||||
converted_cameras = convert_cameras(cameras)
|
||||
if len(converted_cameras) != poses.shape[0]:
|
||||
raise ValueError("cameras must have the same number of views as poses_uvd.")
|
||||
|
||||
result = np.zeros_like(poses, dtype=np.float32)
|
||||
for view_idx, camera in enumerate(converted_cameras):
|
||||
uv = poses[view_idx, :, :, :2].reshape(-1, 2)
|
||||
depth_mm = poses[view_idx, :, :, 2:3].reshape(-1, 1)
|
||||
scores = poses[view_idx, :, :, 3:4].reshape(-1, 1)
|
||||
|
||||
depth_m = depth_mm * 0.001
|
||||
uv_ones = np.concatenate((uv, np.ones((uv.shape[0], 1), dtype=np.float32)), axis=1)
|
||||
k_inv = np.linalg.inv(np.asarray(camera.K, dtype=np.float32))
|
||||
xyz_cam = depth_m * (uv_ones @ k_inv.T)
|
||||
|
||||
rotation = np.asarray(camera.R, dtype=np.float32)
|
||||
translation = np.asarray(camera.T, dtype=np.float32).reshape(1, 3)
|
||||
xyz_world = (rotation @ xyz_cam.T).T + translation
|
||||
|
||||
pose_world = np.concatenate((xyz_world, scores), axis=1).reshape(
|
||||
poses.shape[1], poses.shape[2], 4
|
||||
)
|
||||
pose_world *= (pose_world[:, :, 3:4] > 0.0).astype(np.float32)
|
||||
result[view_idx] = pose_world
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user