Files

216 lines
6.8 KiB
Python
Executable File

import sys
import pyzed.sl as sl
import cv2
import argparse
import os
import math
from pathlib import Path
def progress_bar(percent_done, bar_length=50):
# Display progress bar
done_length = int(bar_length * percent_done / 100)
bar = "=" * done_length + "-" * (bar_length - done_length)
sys.stdout.write("[%s] %i%s\r" % (bar, percent_done, "%"))
sys.stdout.flush()
def main(opt):
input_paths = [Path(p) for p in opt.input_svo_files]
svo_files = []
for path in input_paths:
if path.is_dir():
print(f"Searching for SVO files in {path}...")
found = sorted(
[
str(f)
for f in path.iterdir()
if f.is_file() and f.suffix.lower() in (".svo", ".svo2")
]
)
if found:
print(f"Found {len(found)} files in {path}")
svo_files.extend(found)
else:
print(f"No .svo or .svo2 files found in {path}")
elif path.is_file():
svo_files.append(str(path))
else:
print(f"Path not found: {path}")
if not svo_files:
print("No valid SVO files provided. Exiting.")
return
# Sort files to ensure deterministic order
svo_files.sort()
cameras = []
cam_data = [] # List of dicts to store camera info
print(f"Opening {len(svo_files)} SVO files...")
for filepath in svo_files:
if not os.path.isfile(filepath):
print(f"File {filepath} does not exist. Skipping.")
continue
init = sl.InitParameters()
init.set_from_svo_file(filepath)
init.svo_real_time_mode = False
init.depth_mode = (
sl.DEPTH_MODE.NONE
) # Use NONE for performance with multiple cameras
cam = sl.Camera()
status = cam.open(init)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Failed to open {filepath}: {status}")
continue
cameras.append(cam)
# Store metadata
info = cam.get_camera_information()
cam_data.append(
{
"cam": cam,
"serial": info.serial_number,
"res": info.camera_configuration.resolution,
"fps": info.camera_configuration.fps,
"nb_frames": cam.get_svo_number_of_frames(),
"start_ts": 0, # To be filled
}
)
if not cameras:
print("No cameras opened. Exiting.")
return
# SVO Synchronization
# Since Python API might miss get_svo_position_at_timestamp, we approximate using FPS and start timestamps.
print("Syncing SVOs...")
max_start_ts = 0
runtime = sl.RuntimeParameters()
# Grab first frame to determine start timestamps
for data in cam_data:
cam = data["cam"]
err = cam.grab(runtime)
if err == sl.ERROR_CODE.SUCCESS:
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
data["start_ts"] = ts
if ts > max_start_ts:
max_start_ts = ts
else:
print(f"Error grabbing first frame from {data['serial']}: {err}")
# Align cameras
for data in cam_data:
cam = data["cam"]
ts = data["start_ts"]
if ts < max_start_ts:
# Calculate frames to skip
diff_ns = max_start_ts - ts
fps = data["fps"]
# frames = seconds * fps
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
print(
f"Camera {data['serial']} starts {diff_ns / 1e9:.2f}s earlier. Skipping {frames_to_skip} frames."
)
cam.set_svo_position(frames_to_skip)
else:
# This is the latest camera, just reset to 0 (or stay at 1 since we grabbed one)
# To be perfectly synced with the skip, we should ideally be at 'frame 0' relative to the sync point.
# If we simply continue, we are at frame 1.
# Let's reset to 0 for the "base" cameras to ensure we don't miss the *very* first synced frame if possible,
# or just accept we are at frame 1.
# set_svo_position(0) is safer.
cam.set_svo_position(0)
print("Starting playback...")
print(" Press 's' to save SVO image as a PNG")
print(" Press 'f' to jump forward in the video")
print(" Press 'b' to jump backward in the video")
print(" Press 'q' to exit...")
key = ""
while key != 113: # for 'q' key
frames_displayed = 0
for data in cam_data:
cam = data["cam"]
err = cam.grab(runtime)
if err == sl.ERROR_CODE.SUCCESS:
# Resize for display
res = data["res"]
w = min(720, res.width)
h = min(404, res.height)
# Ensure even dimensions for video
low_resolution = sl.Resolution(w, h)
svo_image = sl.Mat()
cam.retrieve_image(svo_image, sl.VIEW.LEFT, sl.MEM.CPU, low_resolution)
img = svo_image.get_data()
cv2.imshow(f"View {data['serial']}", img)
frames_displayed += 1
# Update progress bar
if data == cam_data[0]:
pos = cam.get_svo_position()
total = data["nb_frames"]
if total > 0:
progress_bar(int(pos / total * 100), 30)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
print(f"\nSVO end reached for {data['serial']}. Looping.")
cam.set_svo_position(0)
else:
pass
key = cv2.waitKey(10)
if key == 115: # 's'
# Save snapshots
for data in cam_data:
cam = data["cam"]
mat = sl.Mat()
cam.retrieve_image(mat)
pos = cam.get_svo_position()
filename = f"capture_{data['serial']}_{pos}.png"
mat.write(filename)
print(f"Saved {filename}")
if key == 102: # 'f'
for data in cam_data:
cam = data["cam"]
pos = cam.get_svo_position()
cam.set_svo_position(pos + int(data["fps"]))
if key == 98: # 'b'
for data in cam_data:
cam = data["cam"]
pos = cam.get_svo_position()
cam.set_svo_position(max(0, pos - int(data["fps"])))
cv2.destroyAllWindows()
for cam in cameras:
cam.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_svo_files",
nargs="+",
type=str,
help="Path to .svo/.svo2 files or directories",
required=True,
)
opt = parser.parse_args()
main(opt)