bd9ea1c795
- Add depth_mode parameter to SVOReader.__init__() - Add enable_depth property - Add depth_map field to FrameData dataclass - Add _retrieve_depth(), get_depth_at(), get_depth_window_median() methods - Update grab_all() and grab_synced() to retrieve depth when enabled
218 lines
7.3 KiB
Python
218 lines
7.3 KiB
Python
import pyzed.sl as sl
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Dict
|
|
import os
|
|
|
|
|
|
@dataclass
|
|
class FrameData:
|
|
"""Data structure for a single frame from an SVO."""
|
|
|
|
image: np.ndarray
|
|
timestamp_ns: int
|
|
frame_index: int
|
|
serial_number: int
|
|
depth_map: Optional[np.ndarray] = None
|
|
|
|
|
|
class SVOReader:
|
|
"""Handles synchronized playback of multiple SVO files."""
|
|
|
|
def __init__(
|
|
self, svo_paths: List[str], depth_mode: sl.DEPTH_MODE = sl.DEPTH_MODE.NONE
|
|
):
|
|
self.svo_paths = svo_paths
|
|
self.cameras: List[sl.Camera] = []
|
|
self.camera_info: List[Dict] = []
|
|
self.runtime_params = sl.RuntimeParameters()
|
|
self._depth_mode = depth_mode
|
|
|
|
for path in svo_paths:
|
|
if not os.path.exists(path):
|
|
print(f"Warning: SVO file not found: {path}")
|
|
continue
|
|
|
|
init_params = sl.InitParameters()
|
|
init_params.set_from_svo_file(path)
|
|
init_params.svo_real_time_mode = False
|
|
init_params.depth_mode = depth_mode
|
|
|
|
cam = sl.Camera()
|
|
status = cam.open(init_params)
|
|
if status != sl.ERROR_CODE.SUCCESS:
|
|
print(f"Error: Could not open {path}: {status}")
|
|
continue
|
|
|
|
info = cam.get_camera_information()
|
|
self.cameras.append(cam)
|
|
self.camera_info.append(
|
|
{
|
|
"serial": info.serial_number,
|
|
"fps": info.camera_configuration.fps,
|
|
"total_frames": cam.get_svo_number_of_frames(),
|
|
}
|
|
)
|
|
|
|
def sync_to_latest_start(self):
|
|
"""Aligns all SVOs to the timestamp of the latest starting SVO."""
|
|
if not self.cameras:
|
|
return
|
|
|
|
start_timestamps = []
|
|
for cam in self.cameras:
|
|
err = cam.grab(self.runtime_params)
|
|
if err == sl.ERROR_CODE.SUCCESS:
|
|
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
|
|
start_timestamps.append(ts)
|
|
else:
|
|
start_timestamps.append(0)
|
|
|
|
if not start_timestamps:
|
|
return
|
|
|
|
max_start_ts = max(start_timestamps)
|
|
|
|
for i, cam in enumerate(self.cameras):
|
|
ts = start_timestamps[i]
|
|
if ts < max_start_ts:
|
|
diff_ns = max_start_ts - ts
|
|
fps = self.camera_info[i]["fps"]
|
|
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
|
|
cam.set_svo_position(frames_to_skip)
|
|
else:
|
|
cam.set_svo_position(0)
|
|
|
|
def grab_all(self) -> List[Optional[FrameData]]:
|
|
"""Grabs a frame from all cameras without strict synchronization."""
|
|
frames = []
|
|
for i, cam in enumerate(self.cameras):
|
|
err = cam.grab(self.runtime_params)
|
|
if err == sl.ERROR_CODE.SUCCESS:
|
|
mat = sl.Mat()
|
|
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
|
depth_map = self._retrieve_depth(cam)
|
|
frames.append(
|
|
FrameData(
|
|
image=mat.get_data().copy(),
|
|
timestamp_ns=cam.get_timestamp(
|
|
sl.TIME_REFERENCE.IMAGE
|
|
).get_nanoseconds(),
|
|
frame_index=cam.get_svo_position(),
|
|
serial_number=self.camera_info[i]["serial"],
|
|
depth_map=depth_map,
|
|
)
|
|
)
|
|
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
|
cam.set_svo_position(0)
|
|
frames.append(None)
|
|
else:
|
|
frames.append(None)
|
|
return frames
|
|
|
|
def grab_synced(self, tolerance_ms: int = 33) -> List[Optional[FrameData]]:
|
|
"""
|
|
Grabs frames from all cameras, attempting to keep them within tolerance_ms.
|
|
If a camera falls behind, it skips frames.
|
|
"""
|
|
frames = self.grab_all()
|
|
if not any(frames):
|
|
return frames
|
|
|
|
# Find latest timestamp
|
|
valid_timestamps = [f.timestamp_ns for f in frames if f is not None]
|
|
if not valid_timestamps:
|
|
return frames
|
|
|
|
max_ts = max(valid_timestamps)
|
|
tolerance_ns = tolerance_ms * 1_000_000
|
|
|
|
for i, frame in enumerate(frames):
|
|
if frame is None:
|
|
continue
|
|
|
|
if max_ts - frame.timestamp_ns > tolerance_ns:
|
|
cam = self.cameras[i]
|
|
fps = self.camera_info[i]["fps"]
|
|
diff_ns = max_ts - frame.timestamp_ns
|
|
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
|
|
|
|
if frames_to_skip > 0:
|
|
current_pos = cam.get_svo_position()
|
|
cam.set_svo_position(current_pos + frames_to_skip)
|
|
|
|
err = cam.grab(self.runtime_params)
|
|
|
|
if err == sl.ERROR_CODE.SUCCESS:
|
|
mat = sl.Mat()
|
|
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
|
depth_map = self._retrieve_depth(cam)
|
|
frames[i] = FrameData(
|
|
image=mat.get_data().copy(),
|
|
timestamp_ns=cam.get_timestamp(
|
|
sl.TIME_REFERENCE.IMAGE
|
|
).get_nanoseconds(),
|
|
frame_index=cam.get_svo_position(),
|
|
serial_number=self.camera_info[i]["serial"],
|
|
depth_map=depth_map,
|
|
)
|
|
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
|
cam.set_svo_position(0)
|
|
frames[i] = None
|
|
else:
|
|
frames[i] = None
|
|
|
|
return frames
|
|
|
|
@property
|
|
def enable_depth(self) -> bool:
|
|
return self._depth_mode != sl.DEPTH_MODE.NONE
|
|
|
|
def _retrieve_depth(self, cam: sl.Camera) -> Optional[np.ndarray]:
|
|
if not self.enable_depth:
|
|
return None
|
|
depth_mat = sl.Mat()
|
|
cam.retrieve_measure(depth_mat, sl.MEASURE.DEPTH)
|
|
return depth_mat.get_data().copy()
|
|
|
|
def get_depth_at(self, frame: FrameData, x: int, y: int) -> Optional[float]:
|
|
if frame.depth_map is None:
|
|
return None
|
|
h, w = frame.depth_map.shape[:2]
|
|
if x < 0 or x >= w or y < 0 or y >= h:
|
|
return None
|
|
depth = frame.depth_map[y, x]
|
|
if not np.isfinite(depth) or depth <= 0:
|
|
return None
|
|
return float(depth)
|
|
|
|
def get_depth_window_median(
|
|
self, frame: FrameData, x: int, y: int, size: int = 5
|
|
) -> Optional[float]:
|
|
if frame.depth_map is None:
|
|
return None
|
|
if size % 2 == 0:
|
|
size += 1
|
|
|
|
h, w = frame.depth_map.shape[:2]
|
|
half = size // 2
|
|
|
|
x_min = max(0, x - half)
|
|
x_max = min(w, x + half + 1)
|
|
y_min = max(0, y - half)
|
|
y_max = min(h, y + half + 1)
|
|
|
|
window = frame.depth_map[y_min:y_max, x_min:x_max]
|
|
valid_depths = window[np.isfinite(window) & (window > 0)]
|
|
|
|
if len(valid_depths) == 0:
|
|
return None
|
|
|
|
return float(np.median(valid_depths))
|
|
|
|
def close(self):
|
|
"""Closes all cameras."""
|
|
for cam in self.cameras:
|
|
cam.close()
|
|
self.cameras = []
|