import sys import pyzed.sl as sl import cv2 import argparse import os import math from pathlib import Path def progress_bar(percent_done, bar_length=50): # Display progress bar done_length = int(bar_length * percent_done / 100) bar = "=" * done_length + "-" * (bar_length - done_length) sys.stdout.write("[%s] %i%s\r" % (bar, percent_done, "%")) sys.stdout.flush() def main(opt): input_paths = [Path(p) for p in opt.input_svo_files] svo_files = [] for path in input_paths: if path.is_dir(): print(f"Searching for SVO files in {path}...") found = sorted( [ str(f) for f in path.iterdir() if f.is_file() and f.suffix.lower() in (".svo", ".svo2") ] ) if found: print(f"Found {len(found)} files in {path}") svo_files.extend(found) else: print(f"No .svo or .svo2 files found in {path}") elif path.is_file(): svo_files.append(str(path)) else: print(f"Path not found: {path}") if not svo_files: print("No valid SVO files provided. Exiting.") return # Sort files to ensure deterministic order svo_files.sort() cameras = [] cam_data = [] # List of dicts to store camera info print(f"Opening {len(svo_files)} SVO files...") for filepath in svo_files: if not os.path.isfile(filepath): print(f"File {filepath} does not exist. Skipping.") continue init = sl.InitParameters() init.set_from_svo_file(filepath) init.svo_real_time_mode = False init.depth_mode = ( sl.DEPTH_MODE.NONE ) # Use NONE for performance with multiple cameras cam = sl.Camera() status = cam.open(init) if status != sl.ERROR_CODE.SUCCESS: print(f"Failed to open {filepath}: {status}") continue cameras.append(cam) # Store metadata info = cam.get_camera_information() cam_data.append( { "cam": cam, "serial": info.serial_number, "res": info.camera_configuration.resolution, "fps": info.camera_configuration.fps, "nb_frames": cam.get_svo_number_of_frames(), "start_ts": 0, # To be filled } ) if not cameras: print("No cameras opened. Exiting.") return # SVO Synchronization # Since Python API might miss get_svo_position_at_timestamp, we approximate using FPS and start timestamps. print("Syncing SVOs...") max_start_ts = 0 runtime = sl.RuntimeParameters() # Grab first frame to determine start timestamps for data in cam_data: cam = data["cam"] err = cam.grab(runtime) if err == sl.ERROR_CODE.SUCCESS: ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds() data["start_ts"] = ts if ts > max_start_ts: max_start_ts = ts else: print(f"Error grabbing first frame from {data['serial']}: {err}") # Align cameras for data in cam_data: cam = data["cam"] ts = data["start_ts"] if ts < max_start_ts: # Calculate frames to skip diff_ns = max_start_ts - ts fps = data["fps"] # frames = seconds * fps frames_to_skip = int((diff_ns / 1_000_000_000) * fps) print( f"Camera {data['serial']} starts {diff_ns / 1e9:.2f}s earlier. Skipping {frames_to_skip} frames." ) cam.set_svo_position(frames_to_skip) else: # This is the latest camera, just reset to 0 (or stay at 1 since we grabbed one) # To be perfectly synced with the skip, we should ideally be at 'frame 0' relative to the sync point. # If we simply continue, we are at frame 1. # Let's reset to 0 for the "base" cameras to ensure we don't miss the *very* first synced frame if possible, # or just accept we are at frame 1. # set_svo_position(0) is safer. cam.set_svo_position(0) print("Starting playback...") print(" Press 's' to save SVO image as a PNG") print(" Press 'f' to jump forward in the video") print(" Press 'b' to jump backward in the video") print(" Press 'q' to exit...") key = "" while key != 113: # for 'q' key frames_displayed = 0 for data in cam_data: cam = data["cam"] err = cam.grab(runtime) if err == sl.ERROR_CODE.SUCCESS: # Resize for display res = data["res"] w = min(720, res.width) h = min(404, res.height) # Ensure even dimensions for video low_resolution = sl.Resolution(w, h) svo_image = sl.Mat() cam.retrieve_image(svo_image, sl.VIEW.LEFT, sl.MEM.CPU, low_resolution) img = svo_image.get_data() cv2.imshow(f"View {data['serial']}", img) frames_displayed += 1 # Update progress bar if data == cam_data[0]: pos = cam.get_svo_position() total = data["nb_frames"] if total > 0: progress_bar(int(pos / total * 100), 30) elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: print(f"\nSVO end reached for {data['serial']}. Looping.") cam.set_svo_position(0) else: pass key = cv2.waitKey(10) if key == 115: # 's' # Save snapshots for data in cam_data: cam = data["cam"] mat = sl.Mat() cam.retrieve_image(mat) pos = cam.get_svo_position() filename = f"capture_{data['serial']}_{pos}.png" mat.write(filename) print(f"Saved {filename}") if key == 102: # 'f' for data in cam_data: cam = data["cam"] pos = cam.get_svo_position() cam.set_svo_position(pos + int(data["fps"])) if key == 98: # 'b' for data in cam_data: cam = data["cam"] pos = cam.get_svo_position() cam.set_svo_position(max(0, pos - int(data["fps"]))) cv2.destroyAllWindows() for cam in cameras: cam.close() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--input_svo_files", nargs="+", type=str, help="Path to .svo/.svo2 files or directories", required=True, ) opt = parser.parse_args() main(opt)