From bd9ea1c7954b4480c19a9b90eff10779eef9ef2c Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 5 Feb 2026 03:43:21 +0000 Subject: [PATCH] feat(aruco): extend SVOReader with depth map support - Add depth_mode parameter to SVOReader.__init__() - Add enable_depth property - Add depth_map field to FrameData dataclass - Add _retrieve_depth(), get_depth_at(), get_depth_window_median() methods - Update grab_all() and grab_synced() to retrieve depth when enabled --- py_workspace/aruco/svo_sync.py | 58 ++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/py_workspace/aruco/svo_sync.py b/py_workspace/aruco/svo_sync.py index 78864a3..025591c 100644 --- a/py_workspace/aruco/svo_sync.py +++ b/py_workspace/aruco/svo_sync.py @@ -13,16 +13,20 @@ class FrameData: timestamp_ns: int frame_index: int serial_number: int + depth_map: Optional[np.ndarray] = None class SVOReader: """Handles synchronized playback of multiple SVO files.""" - def __init__(self, svo_paths: List[str]): + def __init__( + self, svo_paths: List[str], depth_mode: sl.DEPTH_MODE = sl.DEPTH_MODE.NONE + ): self.svo_paths = svo_paths self.cameras: List[sl.Camera] = [] self.camera_info: List[Dict] = [] self.runtime_params = sl.RuntimeParameters() + self._depth_mode = depth_mode for path in svo_paths: if not os.path.exists(path): @@ -32,7 +36,7 @@ class SVOReader: init_params = sl.InitParameters() init_params.set_from_svo_file(path) init_params.svo_real_time_mode = False - init_params.depth_mode = sl.DEPTH_MODE.NONE + init_params.depth_mode = depth_mode cam = sl.Camera() status = cam.open(init_params) @@ -87,6 +91,7 @@ class SVOReader: if err == sl.ERROR_CODE.SUCCESS: mat = sl.Mat() cam.retrieve_image(mat, sl.VIEW.LEFT) + depth_map = self._retrieve_depth(cam) frames.append( FrameData( image=mat.get_data().copy(), @@ -95,6 +100,7 @@ class SVOReader: ).get_nanoseconds(), frame_index=cam.get_svo_position(), serial_number=self.camera_info[i]["serial"], + depth_map=depth_map, ) ) elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: @@ -140,6 +146,7 @@ class SVOReader: if err == sl.ERROR_CODE.SUCCESS: mat = sl.Mat() cam.retrieve_image(mat, sl.VIEW.LEFT) + depth_map = self._retrieve_depth(cam) frames[i] = FrameData( image=mat.get_data().copy(), timestamp_ns=cam.get_timestamp( @@ -147,6 +154,7 @@ class SVOReader: ).get_nanoseconds(), frame_index=cam.get_svo_position(), serial_number=self.camera_info[i]["serial"], + depth_map=depth_map, ) elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: cam.set_svo_position(0) @@ -156,6 +164,52 @@ class SVOReader: return frames + @property + def enable_depth(self) -> bool: + return self._depth_mode != sl.DEPTH_MODE.NONE + + def _retrieve_depth(self, cam: sl.Camera) -> Optional[np.ndarray]: + if not self.enable_depth: + return None + depth_mat = sl.Mat() + cam.retrieve_measure(depth_mat, sl.MEASURE.DEPTH) + return depth_mat.get_data().copy() + + def get_depth_at(self, frame: FrameData, x: int, y: int) -> Optional[float]: + if frame.depth_map is None: + return None + h, w = frame.depth_map.shape[:2] + if x < 0 or x >= w or y < 0 or y >= h: + return None + depth = frame.depth_map[y, x] + if not np.isfinite(depth) or depth <= 0: + return None + return float(depth) + + def get_depth_window_median( + self, frame: FrameData, x: int, y: int, size: int = 5 + ) -> Optional[float]: + if frame.depth_map is None: + return None + if size % 2 == 0: + size += 1 + + h, w = frame.depth_map.shape[:2] + half = size // 2 + + x_min = max(0, x - half) + x_max = min(w, x + half + 1) + y_min = max(0, y - half) + y_max = min(h, y + half + 1) + + window = frame.depth_map[y_min:y_max, x_min:x_max] + valid_depths = window[np.isfinite(window) & (window > 0)] + + if len(valid_depths) == 0: + return None + + return float(np.median(valid_depths)) + def close(self): """Closes all cameras.""" for cam in self.cameras: