""" This sample shows how to record video in Stereolabs SVO format from network cameras. SVO video files can be played with the ZED API and used with its different modules. """ import pyzed.sl as sl import threading import signal import time import sys import click import zed_network_utils import cv2 import queue import os # Global variable to handle exit exit_app = False def signal_handler(signal, frame): """Handle Ctrl+C to properly exit the program""" global exit_app exit_app = True print("\nCtrl+C pressed. Exiting...") def acquisition(zed, frame_queue=None): """Acquisition thread function to continuously grab frames""" infos = zed.get_camera_information() mat = sl.Mat() while not exit_app: if zed.grab() == sl.ERROR_CODE.SUCCESS: if frame_queue is not None: # Retrieve left image zed.retrieve_image(mat, sl.VIEW.LEFT) # Convert to numpy and copy to ensure thread safety when passing to main try: # Keep latest frame only if frame_queue.full(): try: frame_queue.get_nowait() except queue.Empty: pass frame_queue.put_nowait(mat.get_data().copy()) except queue.Full: pass print(f"{infos.camera_model}[{infos.serial_number}] QUIT") # disable Recording zed.disable_recording() # close the Camera zed.close() def open_camera(zed, config, save_dir): """Open a camera with given configuration and enable recording""" ip, port = zed_network_utils.extract_ip_port(config) if not ip or not port: return False try: serial = config["FusionConfiguration"]["serial_number"] except KeyError: print("Error: Serial number not found in config") return False # Open the remote camera using utility function if not zed_network_utils.open_remote_camera(zed, ip, port): return False print(f"ZED SN{serial} Opened from {ip}:{port}") # Enable Recording output_svo_file = os.path.join(save_dir, f"ZED_SN{serial}.svo2") recording_param = sl.RecordingParameters( output_svo_file.replace(" ", ""), sl.SVO_COMPRESSION_MODE.H265 ) record_err = zed.enable_recording(recording_param) if record_err == sl.ERROR_CODE.SUCCESS: print(f"ZED SN{serial} Enabled recording") else: print(f"ZED SN{serial} Recording initialization error: {record_err}") zed.close() return False print(f"Recording SVO file: {recording_param.video_filename}") return True @click.command() @click.option( "--monitor", is_flag=True, help="Enable local monitoring of the camera streams." ) @click.option( "--config", default=zed_network_utils.DEFAULT_CONFIG_PATH, help="Path to the network configuration JSON file.", type=click.Path(exists=True), ) @click.option( "--save-dir", default=os.getcwd(), help="Directory where SVO files will be saved.", type=click.Path(exists=True, file_okay=False, dir_okay=True, writable=True), ) def main(monitor, config, save_dir): global exit_app # Read network configuration using utility network_config = zed_network_utils.parse_network_config(config) if not network_config: return print(f"Found {len(network_config)} cameras in configuration") if len(network_config) == 0: print("No ZED configured, exit program") return zed_open = False # Open all cameras zeds = [] threads = [] queues = {} # serial -> queue for serial, config in network_config.items(): zed = sl.Camera() if open_camera(zed, config, save_dir): zeds.append(zed) zed_open = True fq = None if monitor: fq = queue.Queue(maxsize=1) queues[serial] = fq # Start acquisition thread immediately thread = threading.Thread(target=acquisition, args=(zed, fq)) thread.start() threads.append(thread) if not zed_open: print("No ZED opened, exit program") return # Set up signal handler for Ctrl+C signal.signal(signal.SIGINT, signal_handler) print("Press Ctrl+C to exit") # Main loop while not exit_app: if monitor: for serial, q in queues.items(): try: frame = q.get_nowait() # Display the frame # Use serial number as window name to distinguish cameras # Resize is handled by window automatically usually, or we can resize cv2.imshow(f"ZED {serial}", frame) except queue.Empty: pass # Check for quit key key = cv2.waitKey(10) if key == 113 or key == ord("q") or key == 27: # q or Esc exit_app = True else: time.sleep(0.02) # Wait for all threads to finish print("Exit signal, closing ZEDs") if monitor: cv2.destroyAllWindows() time.sleep(0.1) for thread in threads: thread.join() print("Program exited") return 0 if __name__ == "__main__": main()