Files
zed-playground/py_workspace/streaming_receiver.py
T
crosstyan 9c861105f7 feat: add aruco-svo-calibration plan and utils scripts
- Add comprehensive work plan for ArUco-based multi-camera calibration
- Add recording_multi.py for multi-camera SVO recording
- Add streaming_receiver.py for network streaming
- Add svo_playback.py for synchronized playback
- Add zed_network_utils.py for camera configuration
- Add AGENTS.md with project context
2026-02-05 03:17:05 +00:00

438 lines
16 KiB
Python
Executable File

########################################################################
#
# Copyright (c) 2022, STEREOLABS.
#
# All rights reserved.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
########################################################################
"""
Read a stream and display the left images using OpenCV
"""
import sys
import pyzed.sl as sl
import cv2
import socket
import threading
import queue
import zed_network_utils
import click
import time
# Global variables
exit_app = False
camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS
str_camera_settings = "BRIGHTNESS"
step_camera_settings = 1
led_on = True
# Map to store selection state for each camera window
# Key: window_name (str), Value: dict with selection state
window_selections = {}
class CameraHandler:
def __init__(self, ip, port, serial_number=None):
self.ip = ip
self.port = port
self.serial_number = serial_number
self.id = f"{serial_number}" if serial_number else f"{ip}:{port}"
self.cam = sl.Camera()
self.runtime = sl.RuntimeParameters()
self.mat = sl.Mat()
self.frame_queue = queue.Queue(maxsize=1) # Keep only latest frame
self.running = False
self.thread = None
self.win_name = f"Camera {self.id}"
self.status_msg = "Initializing..."
def start(self):
init_parameters = sl.InitParameters()
init_parameters.depth_mode = sl.DEPTH_MODE.NONE
init_parameters.sdk_verbose = 1
init_parameters.set_from_stream(self.ip, self.port)
status = self.cam.open(init_parameters)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Camera {self.id} Open : {repr(status)}. Skipping.")
self.status_msg = f"Error: {status}"
return False
print(f"\n--- Camera {self.id} Info ---")
print_camera_information(self.cam)
self.running = True
self.thread = threading.Thread(target=self._grab_loop)
self.thread.daemon = True
self.thread.start()
return True
def stop(self):
self.running = False
if self.thread and self.thread.is_alive():
self.thread.join(timeout=1.0)
self.cam.close()
def _grab_loop(self):
while self.running:
err = self.cam.grab(self.runtime)
if err == sl.ERROR_CODE.SUCCESS:
# Retrieve image into a local Mat, then convert to numpy for queue
# We need a lock or just copy it because Mat is not thread-safe if modified
self.cam.retrieve_image(self.mat, sl.VIEW.LEFT)
frame = self.mat.get_data()
# We can clone it to be safe for passing to main thread
# (get_data() returns a numpy view, so deep copy might be needed if buffer is reused)
# However, for simple display, usually it's fine if we consume fast enough.
# To be perfectly safe, let's copy.
frame_copy = frame.copy()
try:
# Remove old frame if exists to keep latest
if self.frame_queue.full():
self.frame_queue.get_nowait()
self.frame_queue.put_nowait(frame_copy)
except queue.Full:
pass
else:
sl.sleep_ms(10)
def get_selection_state(win_name):
if win_name not in window_selections:
window_selections[win_name] = {
"rect": sl.Rect(),
"in_progress": False,
"origin": (-1, -1),
}
return window_selections[win_name]
def on_mouse(event, x, y, flags, param):
win_name = param
state = get_selection_state(win_name)
if event == cv2.EVENT_LBUTTONDOWN:
state["origin"] = (x, y)
state["in_progress"] = True
elif event == cv2.EVENT_LBUTTONUP:
state["in_progress"] = False
elif event == cv2.EVENT_RBUTTONDOWN:
state["in_progress"] = False
state["rect"] = sl.Rect(0, 0, 0, 0)
if state["in_progress"]:
state["rect"].x = min(x, state["origin"][0])
state["rect"].y = min(y, state["origin"][1])
state["rect"].width = abs(x - state["origin"][0]) + 1
state["rect"].height = abs(y - state["origin"][1]) + 1
@click.command()
@click.option(
"--config",
"-c",
type=click.Path(exists=True),
help="Path to JSON configuration file.",
)
@click.option(
"--ip",
"-i",
multiple=True,
help="IP address(es) in format IP:PORT. Can be used multiple times.",
)
def main(config, ip):
"""
ZED Streaming Receiver.
"""
global exit_app
cameras_to_open = []
# 1. Parse Config File if provided
if config:
print(f"Reading configuration from {config}")
network_config = zed_network_utils.parse_network_config(config)
if network_config:
for serial, cam_config in network_config.items():
cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config)
if cam_ip and cam_port:
cameras_to_open.append((cam_ip, cam_port, serial))
# 2. Parse CLI IPs if provided
if ip:
for ip_str in ip:
if ":" in ip_str:
try:
host, port_str = ip_str.split(":")
port = int(port_str)
cameras_to_open.append((host, port, ip_str))
except ValueError:
print(f"Invalid format for IP: {ip_str}. Expected IP:PORT")
else:
print(
f"Treating '{ip_str}' as serial number, looking up in default config..."
)
cam_config = zed_network_utils.get_camera_config_by_serial(ip_str)
if cam_config:
cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config)
if cam_ip and cam_port:
cameras_to_open.append((cam_ip, cam_port, ip_str))
else:
print(f"Could not find IP/Port for serial {ip_str}")
else:
print(f"Invalid format or unknown serial: {ip_str}.")
if not cameras_to_open:
print("No valid cameras specified. Use --config or --ip.")
print_help()
return
print(f"Starting {len(cameras_to_open)} camera streams...")
print_help()
handlers = []
# Initialize all cameras
for cam_ip, cam_port, cam_id in cameras_to_open:
handler = CameraHandler(cam_ip, cam_port, cam_id)
if handler.start():
handlers.append(handler)
# Create window for this camera
cv2.namedWindow(handler.win_name)
cv2.setMouseCallback(handler.win_name, on_mouse, param=handler.win_name)
if not handlers:
print("No cameras could be opened. Exiting.")
return
try:
while not exit_app:
# Main GUI Loop
for handler in handlers:
try:
# Get frame from queue (non-blocking)
frame = handler.frame_queue.get_nowait()
# Draw selection if exists
state = get_selection_state(handler.win_name)
rect = state["rect"]
if not rect.is_empty() and rect.is_contained(
sl.Rect(0, 0, frame.shape[1], frame.shape[0])
):
cv2.rectangle(
frame,
(rect.x, rect.y),
(rect.width + rect.x, rect.height + rect.y),
(220, 180, 20),
2,
)
cv2.imshow(handler.win_name, frame)
except queue.Empty:
pass # No new frame, just continue
# Handle key events (once per loop iteration, for all windows)
key = cv2.waitKey(10)
if key != -1:
if key == 113: # q
exit_app = True
else:
# Apply settings to ALL cameras
# Note: We pick the first camera to get current value, then set to all
# This implies settings are synchronized
if handlers:
update_camera_settings(
key, handlers[0].cam, handlers, handlers[0].runtime
)
except KeyboardInterrupt:
print("\nCtrl+C pressed. Exiting...")
finally:
print("Closing cameras...")
for handler in handlers:
handler.stop()
cv2.destroyAllWindows()
print("Program exited")
def print_camera_information(cam):
cam_info = cam.get_camera_information()
print("ZED Model : {0}".format(cam_info.camera_model))
print("ZED Serial Number : {0}".format(cam_info.serial_number))
print(
"ZED Camera Firmware : {0}/{1}".format(
cam_info.camera_configuration.firmware_version,
cam_info.sensors_configuration.firmware_version,
)
)
print(
"ZED Camera Resolution : {0}x{1}".format(
round(cam_info.camera_configuration.resolution.width, 2),
cam.get_camera_information().camera_configuration.resolution.height,
)
)
print(
"ZED Camera FPS : {0}".format(int(cam_info.camera_configuration.fps))
)
def print_help():
print("\n\nCamera controls hotkeys:")
print("* Increase camera settings value: '+'")
print("* Decrease camera settings value: '-'")
print("* Toggle camera settings: 's'")
print("* Toggle camera LED: 'l' (lower L)")
print("* Reset all parameters: 'r'")
print("* Reset exposure ROI to full image 'f'")
print("* Use mouse to select an image area to apply exposure (press 'a')")
print("* Exit : 'q'\n")
# Update camera setting on key press
def update_camera_settings(key, reference_cam, handlers, runtime):
global led_on, camera_settings, str_camera_settings
# This logic updates ALL cameras based on the input key
if key == 115: # for 's' key
switch_camera_settings()
return
if key == 108: # for 'l' key
led_on = not led_on
print(f"LED Status: {led_on}")
elif key == 114: # 'r'
print("[Sample] Reset all settings to default")
# Determine action and value
action = None # 'inc', 'dec', 'set'
val = None
if key == 43:
action = "inc"
elif key == 45:
action = "dec"
elif key == 114:
action = "reset"
elif key == 108:
action = "led"
elif key == 97 or key == 102:
action = "roi"
# Apply to all cameras
for handler in handlers:
cam = handler.cam
if action == "inc":
current_value = cam.get_camera_settings(camera_settings)[1]
cam.set_camera_settings(
camera_settings, current_value + step_camera_settings
)
if handler == handlers[0]: # Print only once
print(
str_camera_settings
+ ": "
+ str(current_value + step_camera_settings)
)
elif action == "dec":
current_value = cam.get_camera_settings(camera_settings)[1]
if current_value >= 1:
cam.set_camera_settings(
camera_settings, current_value - step_camera_settings
)
if handler == handlers[0]:
print(
str_camera_settings
+ ": "
+ str(current_value - step_camera_settings)
)
elif action == "reset":
cam.set_camera_settings(sl.VIDEO_SETTINGS.BRIGHTNESS, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.CONTRAST, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.HUE, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.SATURATION, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.SHARPNESS, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.GAIN, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.EXPOSURE, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE, -1)
elif action == "led":
cam.set_camera_settings(sl.VIDEO_SETTINGS.LED_STATUS, led_on)
elif action == "roi":
state = get_selection_state(handler.win_name)
rect = state["rect"]
reset = key == 102
if reset:
cam.set_camera_settings_roi(
sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH, True
)
if handler == handlers[0]:
print("[Sample] reset AEC_AGC_ROI to full res")
else:
print(f"[Sample] set AEC_AGC_ROI on {handler.win_name}")
cam.set_camera_settings_roi(
sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH
)
# Function to switch between different camera settings (brightness, contrast, etc.).
def switch_camera_settings():
global camera_settings
global str_camera_settings
if camera_settings == sl.VIDEO_SETTINGS.BRIGHTNESS:
camera_settings = sl.VIDEO_SETTINGS.CONTRAST
str_camera_settings = "Contrast"
print("[Sample] Switch to camera settings: CONTRAST")
elif camera_settings == sl.VIDEO_SETTINGS.CONTRAST:
camera_settings = sl.VIDEO_SETTINGS.HUE
str_camera_settings = "Hue"
print("[Sample] Switch to camera settings: HUE")
elif camera_settings == sl.VIDEO_SETTINGS.HUE:
camera_settings = sl.VIDEO_SETTINGS.SATURATION
str_camera_settings = "Saturation"
print("[Sample] Switch to camera settings: SATURATION")
elif camera_settings == sl.VIDEO_SETTINGS.SATURATION:
camera_settings = sl.VIDEO_SETTINGS.SHARPNESS
str_camera_settings = "Sharpness"
print("[Sample] Switch to camera settings: Sharpness")
elif camera_settings == sl.VIDEO_SETTINGS.SHARPNESS:
camera_settings = sl.VIDEO_SETTINGS.GAIN
str_camera_settings = "Gain"
print("[Sample] Switch to camera settings: GAIN")
elif camera_settings == sl.VIDEO_SETTINGS.GAIN:
camera_settings = sl.VIDEO_SETTINGS.EXPOSURE
str_camera_settings = "Exposure"
print("[Sample] Switch to camera settings: EXPOSURE")
elif camera_settings == sl.VIDEO_SETTINGS.EXPOSURE:
camera_settings = sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE
str_camera_settings = "White Balance"
print("[Sample] Switch to camera settings: WHITEBALANCE")
elif camera_settings == sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE:
camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS
str_camera_settings = "Brightness"
print("[Sample] Switch to camera settings: BRIGHTNESS")
if __name__ == "__main__":
main()