""" Read a stream and display the left images using OpenCV """ import sys import pyzed.sl as sl import cv2 import socket import threading import queue import zed_network_utils import click import time # Global variables exit_app = False camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS str_camera_settings = "BRIGHTNESS" step_camera_settings = 1 led_on = True # Map to store selection state for each camera window # Key: window_name (str), Value: dict with selection state window_selections = {} class CameraHandler: def __init__(self, ip, port, serial_number=None): self.ip = ip self.port = port self.serial_number = serial_number self.id = f"{serial_number}" if serial_number else f"{ip}:{port}" self.cam = sl.Camera() self.runtime = sl.RuntimeParameters() self.mat = sl.Mat() self.frame_queue = queue.Queue(maxsize=1) # Keep only latest frame self.running = False self.thread = None self.win_name = f"Camera {self.id}" self.status_msg = "Initializing..." def start(self): init_parameters = sl.InitParameters() init_parameters.depth_mode = sl.DEPTH_MODE.NONE init_parameters.sdk_verbose = 1 init_parameters.set_from_stream(self.ip, self.port) status = self.cam.open(init_parameters) if status != sl.ERROR_CODE.SUCCESS: print(f"Camera {self.id} Open : {repr(status)}. Skipping.") self.status_msg = f"Error: {status}" return False print(f"\n--- Camera {self.id} Info ---") print_camera_information(self.cam) self.running = True self.thread = threading.Thread(target=self._grab_loop) self.thread.daemon = True self.thread.start() return True def stop(self): self.running = False if self.thread and self.thread.is_alive(): self.thread.join(timeout=1.0) self.cam.close() def _grab_loop(self): while self.running: err = self.cam.grab(self.runtime) if err == sl.ERROR_CODE.SUCCESS: # Retrieve image into a local Mat, then convert to numpy for queue # We need a lock or just copy it because Mat is not thread-safe if modified self.cam.retrieve_image(self.mat, sl.VIEW.LEFT) frame = self.mat.get_data() # We can clone it to be safe for passing to main thread # (get_data() returns a numpy view, so deep copy might be needed if buffer is reused) # However, for simple display, usually it's fine if we consume fast enough. # To be perfectly safe, let's copy. frame_copy = frame.copy() try: # Remove old frame if exists to keep latest if self.frame_queue.full(): self.frame_queue.get_nowait() self.frame_queue.put_nowait(frame_copy) except queue.Full: pass else: sl.sleep_ms(10) def get_selection_state(win_name): if win_name not in window_selections: window_selections[win_name] = { "rect": sl.Rect(), "in_progress": False, "origin": (-1, -1), } return window_selections[win_name] def on_mouse(event, x, y, flags, param): win_name = param state = get_selection_state(win_name) if event == cv2.EVENT_LBUTTONDOWN: state["origin"] = (x, y) state["in_progress"] = True elif event == cv2.EVENT_LBUTTONUP: state["in_progress"] = False elif event == cv2.EVENT_RBUTTONDOWN: state["in_progress"] = False state["rect"] = sl.Rect(0, 0, 0, 0) if state["in_progress"]: state["rect"].x = min(x, state["origin"][0]) state["rect"].y = min(y, state["origin"][1]) state["rect"].width = abs(x - state["origin"][0]) + 1 state["rect"].height = abs(y - state["origin"][1]) + 1 @click.command() @click.option( "--config", "-c", type=click.Path(exists=True), help="Path to JSON configuration file.", ) @click.option( "--ip", "-i", multiple=True, help="IP address(es) in format IP:PORT. Can be used multiple times.", ) def main(config, ip): """ ZED Streaming Receiver. """ global exit_app cameras_to_open = [] # 1. Parse Config File if provided if config: print(f"Reading configuration from {config}") network_config = zed_network_utils.parse_network_config(config) if network_config: for serial, cam_config in network_config.items(): cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config) if cam_ip and cam_port: cameras_to_open.append((cam_ip, cam_port, serial)) # 2. Parse CLI IPs if provided if ip: for ip_str in ip: if ":" in ip_str: try: host, port_str = ip_str.split(":") port = int(port_str) cameras_to_open.append((host, port, ip_str)) except ValueError: print(f"Invalid format for IP: {ip_str}. Expected IP:PORT") else: print( f"Treating '{ip_str}' as serial number, looking up in default config..." ) cam_config = zed_network_utils.get_camera_config_by_serial(ip_str) if cam_config: cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config) if cam_ip and cam_port: cameras_to_open.append((cam_ip, cam_port, ip_str)) else: print(f"Could not find IP/Port for serial {ip_str}") else: print(f"Invalid format or unknown serial: {ip_str}.") if not cameras_to_open: print("No valid cameras specified. Use --config or --ip.") print_help() return print(f"Starting {len(cameras_to_open)} camera streams...") print_help() handlers = [] # Initialize all cameras for cam_ip, cam_port, cam_id in cameras_to_open: handler = CameraHandler(cam_ip, cam_port, cam_id) if handler.start(): handlers.append(handler) # Create window for this camera cv2.namedWindow(handler.win_name) cv2.setMouseCallback(handler.win_name, on_mouse, param=handler.win_name) if not handlers: print("No cameras could be opened. Exiting.") return try: while not exit_app: # Main GUI Loop for handler in handlers: try: # Get frame from queue (non-blocking) frame = handler.frame_queue.get_nowait() # Draw selection if exists state = get_selection_state(handler.win_name) rect = state["rect"] if not rect.is_empty() and rect.is_contained( sl.Rect(0, 0, frame.shape[1], frame.shape[0]) ): cv2.rectangle( frame, (rect.x, rect.y), (rect.width + rect.x, rect.height + rect.y), (220, 180, 20), 2, ) cv2.imshow(handler.win_name, frame) except queue.Empty: pass # No new frame, just continue # Handle key events (once per loop iteration, for all windows) key = cv2.waitKey(10) if key != -1: if key == 113: # q exit_app = True else: # Apply settings to ALL cameras # Note: We pick the first camera to get current value, then set to all # This implies settings are synchronized if handlers: update_camera_settings( key, handlers[0].cam, handlers, handlers[0].runtime ) except KeyboardInterrupt: print("\nCtrl+C pressed. Exiting...") finally: print("Closing cameras...") for handler in handlers: handler.stop() cv2.destroyAllWindows() print("Program exited") def print_camera_information(cam): cam_info = cam.get_camera_information() print("ZED Model : {0}".format(cam_info.camera_model)) print("ZED Serial Number : {0}".format(cam_info.serial_number)) print( "ZED Camera Firmware : {0}/{1}".format( cam_info.camera_configuration.firmware_version, cam_info.sensors_configuration.firmware_version, ) ) print( "ZED Camera Resolution : {0}x{1}".format( round(cam_info.camera_configuration.resolution.width, 2), cam.get_camera_information().camera_configuration.resolution.height, ) ) print( "ZED Camera FPS : {0}".format(int(cam_info.camera_configuration.fps)) ) def print_help(): print("\n\nCamera controls hotkeys:") print("* Increase camera settings value: '+'") print("* Decrease camera settings value: '-'") print("* Toggle camera settings: 's'") print("* Toggle camera LED: 'l' (lower L)") print("* Reset all parameters: 'r'") print("* Reset exposure ROI to full image 'f'") print("* Use mouse to select an image area to apply exposure (press 'a')") print("* Exit : 'q'\n") # Update camera setting on key press def update_camera_settings(key, reference_cam, handlers, runtime): global led_on, camera_settings, str_camera_settings # This logic updates ALL cameras based on the input key if key == 115: # for 's' key switch_camera_settings() return if key == 108: # for 'l' key led_on = not led_on print(f"LED Status: {led_on}") elif key == 114: # 'r' print("[Sample] Reset all settings to default") # Determine action and value action = None # 'inc', 'dec', 'set' val = None if key == 43: action = "inc" elif key == 45: action = "dec" elif key == 114: action = "reset" elif key == 108: action = "led" elif key == 97 or key == 102: action = "roi" # Apply to all cameras for handler in handlers: cam = handler.cam if action == "inc": current_value = cam.get_camera_settings(camera_settings)[1] cam.set_camera_settings( camera_settings, current_value + step_camera_settings ) if handler == handlers[0]: # Print only once print( str_camera_settings + ": " + str(current_value + step_camera_settings) ) elif action == "dec": current_value = cam.get_camera_settings(camera_settings)[1] if current_value >= 1: cam.set_camera_settings( camera_settings, current_value - step_camera_settings ) if handler == handlers[0]: print( str_camera_settings + ": " + str(current_value - step_camera_settings) ) elif action == "reset": cam.set_camera_settings(sl.VIDEO_SETTINGS.BRIGHTNESS, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.CONTRAST, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.HUE, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.SATURATION, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.SHARPNESS, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.GAIN, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.EXPOSURE, -1) cam.set_camera_settings(sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE, -1) elif action == "led": cam.set_camera_settings(sl.VIDEO_SETTINGS.LED_STATUS, led_on) elif action == "roi": state = get_selection_state(handler.win_name) rect = state["rect"] reset = key == 102 if reset: cam.set_camera_settings_roi( sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH, True ) if handler == handlers[0]: print("[Sample] reset AEC_AGC_ROI to full res") else: print(f"[Sample] set AEC_AGC_ROI on {handler.win_name}") cam.set_camera_settings_roi( sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH ) # Function to switch between different camera settings (brightness, contrast, etc.). def switch_camera_settings(): global camera_settings global str_camera_settings if camera_settings == sl.VIDEO_SETTINGS.BRIGHTNESS: camera_settings = sl.VIDEO_SETTINGS.CONTRAST str_camera_settings = "Contrast" print("[Sample] Switch to camera settings: CONTRAST") elif camera_settings == sl.VIDEO_SETTINGS.CONTRAST: camera_settings = sl.VIDEO_SETTINGS.HUE str_camera_settings = "Hue" print("[Sample] Switch to camera settings: HUE") elif camera_settings == sl.VIDEO_SETTINGS.HUE: camera_settings = sl.VIDEO_SETTINGS.SATURATION str_camera_settings = "Saturation" print("[Sample] Switch to camera settings: SATURATION") elif camera_settings == sl.VIDEO_SETTINGS.SATURATION: camera_settings = sl.VIDEO_SETTINGS.SHARPNESS str_camera_settings = "Sharpness" print("[Sample] Switch to camera settings: Sharpness") elif camera_settings == sl.VIDEO_SETTINGS.SHARPNESS: camera_settings = sl.VIDEO_SETTINGS.GAIN str_camera_settings = "Gain" print("[Sample] Switch to camera settings: GAIN") elif camera_settings == sl.VIDEO_SETTINGS.GAIN: camera_settings = sl.VIDEO_SETTINGS.EXPOSURE str_camera_settings = "Exposure" print("[Sample] Switch to camera settings: EXPOSURE") elif camera_settings == sl.VIDEO_SETTINGS.EXPOSURE: camera_settings = sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE str_camera_settings = "White Balance" print("[Sample] Switch to camera settings: WHITEBALANCE") elif camera_settings == sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE: camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS str_camera_settings = "Brightness" print("[Sample] Switch to camera settings: BRIGHTNESS") if __name__ == "__main__": main()