Files
zed-playground/py_workspace/aruco/svo_sync.py
T
2026-02-04 11:44:37 +00:00

164 lines
5.6 KiB
Python

import pyzed.sl as sl
import numpy as np
from dataclasses import dataclass
from typing import List, Optional, Dict
import os
@dataclass
class FrameData:
"""Data structure for a single frame from an SVO."""
image: np.ndarray
timestamp_ns: int
frame_index: int
serial_number: int
class SVOReader:
"""Handles synchronized playback of multiple SVO files."""
def __init__(self, svo_paths: List[str]):
self.svo_paths = svo_paths
self.cameras: List[sl.Camera] = []
self.camera_info: List[Dict] = []
self.runtime_params = sl.RuntimeParameters()
for path in svo_paths:
if not os.path.exists(path):
print(f"Warning: SVO file not found: {path}")
continue
init_params = sl.InitParameters()
init_params.set_from_svo_file(path)
init_params.svo_real_time_mode = False
init_params.depth_mode = sl.DEPTH_MODE.NONE
cam = sl.Camera()
status = cam.open(init_params)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Error: Could not open {path}: {status}")
continue
info = cam.get_camera_information()
self.cameras.append(cam)
self.camera_info.append(
{
"serial": info.serial_number,
"fps": info.camera_configuration.fps,
"total_frames": cam.get_svo_number_of_frames(),
}
)
def sync_to_latest_start(self):
"""Aligns all SVOs to the timestamp of the latest starting SVO."""
if not self.cameras:
return
start_timestamps = []
for cam in self.cameras:
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
start_timestamps.append(ts)
else:
start_timestamps.append(0)
if not start_timestamps:
return
max_start_ts = max(start_timestamps)
for i, cam in enumerate(self.cameras):
ts = start_timestamps[i]
if ts < max_start_ts:
diff_ns = max_start_ts - ts
fps = self.camera_info[i]["fps"]
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
cam.set_svo_position(frames_to_skip)
else:
cam.set_svo_position(0)
def grab_all(self) -> List[Optional[FrameData]]:
"""Grabs a frame from all cameras without strict synchronization."""
frames = []
for i, cam in enumerate(self.cameras):
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
mat = sl.Mat()
cam.retrieve_image(mat, sl.VIEW.LEFT)
frames.append(
FrameData(
image=mat.get_data().copy(),
timestamp_ns=cam.get_timestamp(
sl.TIME_REFERENCE.IMAGE
).get_nanoseconds(),
frame_index=cam.get_svo_position(),
serial_number=self.camera_info[i]["serial"],
)
)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
cam.set_svo_position(0)
frames.append(None)
else:
frames.append(None)
return frames
def grab_synced(self, tolerance_ms: int = 33) -> List[Optional[FrameData]]:
"""
Grabs frames from all cameras, attempting to keep them within tolerance_ms.
If a camera falls behind, it skips frames.
"""
frames = self.grab_all()
if not any(frames):
return frames
# Find latest timestamp
valid_timestamps = [f.timestamp_ns for f in frames if f is not None]
if not valid_timestamps:
return frames
max_ts = max(valid_timestamps)
tolerance_ns = tolerance_ms * 1_000_000
for i, frame in enumerate(frames):
if frame is None:
continue
if max_ts - frame.timestamp_ns > tolerance_ns:
cam = self.cameras[i]
fps = self.camera_info[i]["fps"]
diff_ns = max_ts - frame.timestamp_ns
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
if frames_to_skip > 0:
current_pos = cam.get_svo_position()
cam.set_svo_position(current_pos + frames_to_skip)
err = cam.grab(self.runtime_params)
if err == sl.ERROR_CODE.SUCCESS:
mat = sl.Mat()
cam.retrieve_image(mat, sl.VIEW.LEFT)
frames[i] = FrameData(
image=mat.get_data().copy(),
timestamp_ns=cam.get_timestamp(
sl.TIME_REFERENCE.IMAGE
).get_nanoseconds(),
frame_index=cam.get_svo_position(),
serial_number=self.camera_info[i]["serial"],
)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
cam.set_svo_position(0)
frames[i] = None
else:
frames[i] = None
return frames
def close(self):
"""Closes all cameras."""
for cam in self.cameras:
cam.close()
self.cameras = []