90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
import numpy as np
|
|
|
|
|
|
def pool_depth_maps(
|
|
depth_maps: list[np.ndarray],
|
|
confidence_maps: list[np.ndarray] | None = None,
|
|
confidence_thresh: float = 50.0,
|
|
min_valid_count: int = 1,
|
|
) -> tuple[np.ndarray, np.ndarray | None]:
|
|
"""
|
|
Pool multiple depth maps into a single depth map using per-pixel median.
|
|
|
|
Args:
|
|
depth_maps: List of depth maps (H, W) in meters.
|
|
confidence_maps: Optional list of confidence maps (H, W).
|
|
ZED semantics: lower is better, 100 is often invalid/occluded.
|
|
confidence_thresh: Confidence values > threshold are considered invalid.
|
|
min_valid_count: Minimum number of valid depth values required to produce a pooled value.
|
|
|
|
Returns:
|
|
Tuple of (pooled_depth_map, pooled_confidence_map).
|
|
pooled_depth_map: (H, W) array with median depth or NaN.
|
|
pooled_confidence_map: (H, W) array with per-pixel minimum confidence, or None.
|
|
|
|
Raises:
|
|
ValueError: If depth_maps is empty or shapes are inconsistent.
|
|
"""
|
|
if not depth_maps:
|
|
raise ValueError("depth_maps list cannot be empty")
|
|
|
|
n_maps = len(depth_maps)
|
|
shape = depth_maps[0].shape
|
|
|
|
for i, dm in enumerate(depth_maps):
|
|
if dm.shape != shape:
|
|
raise ValueError(
|
|
f"Depth map {i} has inconsistent shape {dm.shape} != {shape}"
|
|
)
|
|
|
|
if confidence_maps:
|
|
if len(confidence_maps) != n_maps:
|
|
raise ValueError(
|
|
f"Number of confidence maps ({len(confidence_maps)}) "
|
|
+ f"must match number of depth maps ({n_maps})"
|
|
)
|
|
for i, cm in enumerate(confidence_maps):
|
|
if cm.shape != shape:
|
|
raise ValueError(
|
|
f"Confidence map {i} has inconsistent shape {cm.shape} != {shape}"
|
|
)
|
|
|
|
if n_maps == 1:
|
|
pooled_depth = depth_maps[0].copy()
|
|
invalid_mask = ~np.isfinite(pooled_depth) | (pooled_depth <= 0)
|
|
if confidence_maps:
|
|
invalid_mask |= confidence_maps[0] > confidence_thresh
|
|
|
|
pooled_depth[invalid_mask] = np.nan
|
|
|
|
if min_valid_count > 1:
|
|
pooled_depth[:] = np.nan
|
|
|
|
pooled_conf = confidence_maps[0].copy() if confidence_maps else None
|
|
return pooled_depth, pooled_conf
|
|
|
|
depth_stack = np.stack(depth_maps, axis=0)
|
|
|
|
valid_mask = np.isfinite(depth_stack) & (depth_stack > 0)
|
|
|
|
conf_stack = None
|
|
if confidence_maps:
|
|
conf_stack = np.stack(confidence_maps, axis=0)
|
|
valid_mask &= conf_stack <= confidence_thresh
|
|
|
|
masked_depths = depth_stack.copy()
|
|
masked_depths[~valid_mask] = np.nan
|
|
|
|
valid_counts = np.sum(valid_mask, axis=0)
|
|
|
|
with np.errstate(invalid="ignore"):
|
|
pooled_depth = np.nanmedian(masked_depths, axis=0)
|
|
|
|
pooled_depth[valid_counts < min_valid_count] = np.nan
|
|
|
|
pooled_conf = None
|
|
if conf_stack is not None:
|
|
pooled_conf = np.min(conf_stack, axis=0)
|
|
|
|
return pooled_depth, pooled_conf
|