From 9c861105f73487c86ad8fdd1ec2e35dbef4faa61 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 5 Feb 2026 03:17:05 +0000 Subject: [PATCH] feat: add aruco-svo-calibration plan and utils scripts - Add comprehensive work plan for ArUco-based multi-camera calibration - Add recording_multi.py for multi-camera SVO recording - Add streaming_receiver.py for network streaming - Add svo_playback.py for synchronized playback - Add zed_network_utils.py for camera configuration - Add AGENTS.md with project context --- .../notepads/aruco-svo-calibration/issues.md | 4 + .../aruco-svo-calibration/learnings.md | 14 + py_workspace/.gitignore | 220 ++++++ py_workspace/.sisyphus/boulder.json | 8 + .../.sisyphus/drafts/aruco-svo-calibration.md | 108 +++ .../aruco-svo-calibration/decisions.md | 6 + .../notepads/aruco-svo-calibration/issues.md | 22 + .../aruco-svo-calibration/learnings.md | 54 ++ .../aruco-svo-calibration/problems.md | 2 + .../notepads/aruco_sync/decisions.md | 3 + .../notepads/aruco_sync/learnings.md | 3 + .../.sisyphus/plans/aruco-svo-calibration.md | 745 ++++++++++++++++++ py_workspace/AGENTS.md | 37 + py_workspace/recording_multi.py | 135 ++++ py_workspace/streaming_receiver.py | 437 ++++++++++ py_workspace/svo_playback.py | 185 +++++ py_workspace/zed_network_utils.py | 88 +++ 17 files changed, 2071 insertions(+) create mode 100644 .sisyphus/notepads/aruco-svo-calibration/issues.md create mode 100644 .sisyphus/notepads/aruco-svo-calibration/learnings.md create mode 100644 py_workspace/.gitignore create mode 100644 py_workspace/.sisyphus/boulder.json create mode 100644 py_workspace/.sisyphus/drafts/aruco-svo-calibration.md create mode 100644 py_workspace/.sisyphus/notepads/aruco-svo-calibration/decisions.md create mode 100644 py_workspace/.sisyphus/notepads/aruco-svo-calibration/issues.md create mode 100644 py_workspace/.sisyphus/notepads/aruco-svo-calibration/learnings.md create mode 100644 py_workspace/.sisyphus/notepads/aruco-svo-calibration/problems.md create mode 100644 py_workspace/.sisyphus/notepads/aruco_sync/decisions.md create mode 100644 py_workspace/.sisyphus/notepads/aruco_sync/learnings.md create mode 100644 py_workspace/.sisyphus/plans/aruco-svo-calibration.md create mode 100644 py_workspace/AGENTS.md create mode 100755 py_workspace/recording_multi.py create mode 100755 py_workspace/streaming_receiver.py create mode 100755 py_workspace/svo_playback.py create mode 100644 py_workspace/zed_network_utils.py diff --git a/.sisyphus/notepads/aruco-svo-calibration/issues.md b/.sisyphus/notepads/aruco-svo-calibration/issues.md new file mode 100644 index 0000000..2e7cb7b --- /dev/null +++ b/.sisyphus/notepads/aruco-svo-calibration/issues.md @@ -0,0 +1,4 @@ +## Cleanup - 2026-02-04\n- Restored py_workspace/README.md\n- Removed unintended nested py_workspace/ directory\n- Cleaned up __pycache__ directories\n- Updated .gitignore to exclude build artifacts and large files (*.pyc, *.svo2, *.parquet, .venv, .ruff_cache) + +- Fixed pose string parsing in self-check distance block to use np.fromstring. +- Added guard for max_frames computation to handle unknown SVO lengths. diff --git a/.sisyphus/notepads/aruco-svo-calibration/learnings.md b/.sisyphus/notepads/aruco-svo-calibration/learnings.md new file mode 100644 index 0000000..3c8ab68 --- /dev/null +++ b/.sisyphus/notepads/aruco-svo-calibration/learnings.md @@ -0,0 +1,14 @@ + +## ArUco Detector Implementation (2026-02-04) +- Implemented `aruco/detector.py` using `cv2.aruco.ArucoDetector` (modern API). +- `detect_markers` returns corners as `(N, 4, 2)` float32 and ids as `(N,)` int32. +- `build_camera_matrix_from_zed` extracts intrinsics from `sl.Camera` calibration parameters. +- `estimate_pose_from_detections` uses `cv2.solvePnP` with `SOLVEPNP_SQPNP` flag for robust pose estimation from multiple markers. +- Reprojection error is calculated using the utility from `aruco/pose_math.py`. + +## Extrinsics Calibration Tool (2026-02-04) +- Created `calibrate_extrinsics.py` CLI tool for multi-camera extrinsic calibration. +- Uses `SVOReader` for synchronized playback and `PoseAccumulator` for robust pose averaging. +- Computes `T_world_from_cam` by inverting the `solvePnP` result (`T_cam_from_world`). +- Implements RANSAC-based filtering of poses to handle outliers and noise. +- Outputs deterministic JSON mapping serial numbers to 4x4 transformation matrices and statistics. diff --git a/py_workspace/.gitignore b/py_workspace/.gitignore new file mode 100644 index 0000000..5e65f6b --- /dev/null +++ b/py_workspace/.gitignore @@ -0,0 +1,220 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[codz] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py.cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +# Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +# poetry.lock +# poetry.toml + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +# pdm.lock +# pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +# pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# Redis +*.rdb +*.aof +*.pid + +# RabbitMQ +mnesia/ +rabbitmq/ +rabbitmq-data/ + +# ActiveMQ +activemq-data/ + +# SageMath parsed files +*.sage.py + +# Environments +.env +.envrc +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +# .idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ + +# Streamlit +.streamlit/secrets.toml + +.venv +*.svo2 +.ruff_cache diff --git a/py_workspace/.sisyphus/boulder.json b/py_workspace/.sisyphus/boulder.json new file mode 100644 index 0000000..810dae5 --- /dev/null +++ b/py_workspace/.sisyphus/boulder.json @@ -0,0 +1,8 @@ +{ + "active_plan": "/workspaces/zed-playground/py_workspace/.sisyphus/plans/aruco-svo-calibration.md", + "started_at": "2026-02-04T10:33:12.040Z", + "session_ids": [ + "ses_3d7dbf4bdffeF4K2AhV4X3VFTa" + ], + "plan_name": "aruco-svo-calibration" +} \ No newline at end of file diff --git a/py_workspace/.sisyphus/drafts/aruco-svo-calibration.md b/py_workspace/.sisyphus/drafts/aruco-svo-calibration.md new file mode 100644 index 0000000..4b1e53b --- /dev/null +++ b/py_workspace/.sisyphus/drafts/aruco-svo-calibration.md @@ -0,0 +1,108 @@ +# Draft: ArUco-Based Multi-Camera Extrinsic Calibration from SVO + +## Requirements (confirmed) + +### Goal +Create a CLI tool that reads synchronized SVO recordings from multiple ZED cameras, detects ArUco markers on a 3D calibration box, computes camera extrinsics relative to the marker world origin, and outputs accurate pose matrices to replace the inaccurate ones in `inside_network.json`. + +### Calibration Target +- **Type**: 3D box with 6 diamond board faces +- **Object points**: Defined in `aruco/output/standard_box_markers.parquet` +- **Marker dictionary**: `DICT_4X4_50` (from existing code) +- **Minimum markers per frame**: 4+ (one diamond face worth) + +### Input +- Multiple SVO2 files (one per camera) +- Frame sampling: Fixed interval + quality filter +- Timestamp-aligned playback (using existing `svo_playback.py` pattern) + +### Output +- **New JSON file** with calibrated extrinsics +- Format: Similar to `inside_network.json` but with accurate `pose` field +- Reference frame: **Marker is world origin** (all cameras expressed relative to ArUco box) + +### Workflow +- **CLI with preview**: Command-line driven but shows visualization of detected markers +- Example: `uv run calibrate_extrinsics.py --svos *.svo2 --interval 30 --output calibrated.json` + +## Technical Decisions + +### Intrinsics Source +- Use ZED SDK's pre-calibrated intrinsics from `cam.get_camera_information().camera_configuration.calibration_parameters.left_cam` +- Properties: `fx, fy, cx, cy, disto` + +### Pose Estimation +- Use `cv2.solvePnP` with `SOLVEPNP_SQPNP` flag (from existing code) +- Consider `solvePnPRansac` for per-frame robustness + +### Outlier Handling (Two-stage) +1. **Per-frame rejection**: Reject frames with high reprojection error (threshold ~2-5 pixels) +2. **RANSAC on pose set**: After collecting all valid poses, use RANSAC-style consensus + +### Pose Averaging +- **Rotation**: Use `scipy.spatial.transform.Rotation.mean()` for geodesic mean +- **Translation**: Use median or weighted mean with MAD-based outlier rejection + +### Math: Camera-to-World Transform +Each camera sees marker → `T_cam_marker` (camera-to-marker) +World origin = marker, so camera pose in world = `T_world_cam = inv(T_cam_marker)` + +For camera i: `T_world_cam_i = inv(T_cam_i_marker)` + +## Research Findings + +### From Librarian (Multi-camera calibration) +- Relative transform: `T_BA = T_BM @ inv(T_AM)` +- Board-based detection improves robustness to occlusion +- Use `refineDetectedMarkers` for corner accuracy +- Handle missing views by only computing poses when enough markers visible + +### From Librarian (Robust averaging) +- Use `scipy.spatial.transform.Rotation.mean(weights=...)` for rotation averaging +- Median/MAD on translation for outlier rejection +- RANSAC over pose set with rotation angle + translation distance thresholds +- Practical thresholds: rotation >2-5°, translation depends on scale + +### Existing Codebase Patterns +- `find_extrinsic_object.py`: ArUco detection + solvePnP pattern +- `svo_playback.py`: Multi-SVO sync via timestamp alignment +- `aruco_box.py`: Diamond board geometry generation + +## Open Questions +- None remaining + +## Metis Gap Analysis (Addressed) + +### Critical Gaps Resolved: +1. **World frame**: As defined in `standard_box_markers.parquet` (origin at box coordinate system) +2. **Image stream**: Use rectified LEFT view (no distortion coefficients needed) +3. **Transform convention**: Match `inside_network.json` format - appears to be T_world_from_cam (camera pose in world) + - Format: space-separated 4x4 matrix, row-major +4. **Sync tolerance**: Moderate (<33ms, 1 frame at 30fps) + +### Guardrails Added: +- Validate parquet schema early (require marker_id, corners with X,Y,Z in meters) +- Use reprojection error as primary quality metric +- Require ≥4 markers with sufficient 3D spread (not just coplanar) +- Whitelist only expected marker IDs (from parquet) +- Add self-check mode with quantitative quality report + +## Scope Boundaries + +### INCLUDE +- SVO file loading with timestamp sync +- ArUco detection on left camera image +- Pose estimation using solvePnP +- Per-frame quality filtering (reprojection error) +- Multi-frame pose averaging with outlier rejection +- JSON output with 4x4 pose matrices +- Preview visualization showing detected markers and axes +- CLI interface with click + +### EXCLUDE +- Right camera processing (use left only for simplicity) +- Intrinsic calibration (use pre-calibrated from ZED SDK) +- Modifying `inside_network.json` in-place +- GUI-based frame selection +- Bundle adjustment refinement +- Depth-based verification diff --git a/py_workspace/.sisyphus/notepads/aruco-svo-calibration/decisions.md b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/decisions.md new file mode 100644 index 0000000..d9703b9 --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/decisions.md @@ -0,0 +1,6 @@ +## 2026-02-04 Init +- Use ZED rectified LEFT images (VIEW.LEFT). Distortion handled as zero (since images are rectified). +- Output `pose` matrices in the same convention as ZED Fusion `FusionConfiguration.pose`: + - Semantics: WORLD Pose of camera = T_world_from_cam. + - Storage: row-major 4x4, translation in last column. + - Coordinate system/units: defined by InitFusionParameters / InitParameters. diff --git a/py_workspace/.sisyphus/notepads/aruco-svo-calibration/issues.md b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/issues.md new file mode 100644 index 0000000..58a2fa0 --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/issues.md @@ -0,0 +1,22 @@ +## 2026-02-04 Init +- Baseline note: ZED SDK stub file `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi` has many LSP/type-stub errors pre-existing in repo. + - Do not treat as regressions for this plan. + +- ZED Fusion pose semantics confirmed by librarian: + - `/usr/local/zed/include/sl/Fusion.hpp` indicates `FusionConfiguration.pose` is "WORLD Pose of camera" in InitFusionParameters coordinate system/units. + - `/usr/local/zed/doc/API/html/classsl_1_1Matrix4f.html` indicates Matrix4f is row-major with translation in last column. + +- Local LSP diagnostics not available: `basedpyright-langserver` is configured but not installed. Use `py_compile` + runtime smoke checks instead. +- Git commands currently fail due to missing git-lfs (smudge filter). Avoid git-based verification unless git-lfs is installed. + +- `ak.from_parquet` requires `pyarrow` and `pandas` to be installed in the environment, which were missing initially. + +## Task 6: IndexError in draw_detected_markers +- **Bug**: `draw_detected_markers` assumed `ids` was always 2D (`ids[i][0]`) and `corners` was always a list of 3D arrays. +- **Fix**: Flattened `ids` using `ids.flatten()` to support both (N,) and (N, 1) shapes. Reshaped `corners` and `int_corners` to ensure compatibility with `cv2.polylines` and text placement regardless of whether input is a list or a 3D/4D numpy array. +The calibration loop was hanging because SVOReader loops back to frame 0 upon reaching the end, and 'any(frames)' remained true. Fixed by calculating 'max_frames' based on remaining frames at start and bounding the loop. +- Fixed pose parsing in self-check: used np.fromstring instead of np.array on the space-separated string. +- Guarded max_frames calculation with a safety limit of 10,000 frames and handled cases where total_frames is -1 or 0. +- Improved --validate-markers mode to exit cleanly with a message when no SVOs are provided. +- Fixed pose string parsing in self-check distance block to use np.fromstring with sep=' '. +- Added max_frames guard for unknown total_frames (<= 0) to prevent infinite loops when SVO length cannot be determined. diff --git a/py_workspace/.sisyphus/notepads/aruco-svo-calibration/learnings.md b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/learnings.md new file mode 100644 index 0000000..698c8da --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/learnings.md @@ -0,0 +1,54 @@ +## 2026-02-04 Init +- Repo is script-first, but `aruco/` imports work via Python namespace package even without `__init__.py`. +- No existing test suite. `pytest` is not installed; will need to be added (likely as dev dependency) before tests can run. +- Existing CLI patterns use `click` (e.g., `streaming_receiver.py`). + +## Pose Math Utilities +- Created `aruco/pose_math.py` for common SE(3) operations. +- Standardized on 4x4 homogeneous matrices for composition and inversion. +- Inversion uses the efficient property: [R | t; 0 | 1]^-1 = [R^T | -R^T * t; 0 | 1]. +- Reprojection error calculation uses `cv2.projectPoints` and mean Euclidean distance. + +- ArUco marker geometry loading and validation logic implemented in `aruco/marker_geometry.py`. +- Use `awkward` and `pyarrow` for efficient parquet loading of multi-dimensional arrays (corners). +- Reshaping `ak.to_numpy` output is necessary when dealing with nested structures like corners (4x3). + +## SVO Sync +- Added `aruco/svo_sync.py` with `SVOReader` and `FrameData` to open multiple SVO2 files, align starts by timestamp, and grab frames. +- Verified with real local SVO2 files: able to open, sync, and grab frames for 2 cameras. + +## ArUco Detector +- Added `aruco/detector.py` implementing: + - ArUcoDetector creation (DICT_4X4_50) + - marker detection (BGR or grayscale input) + - ZED intrinsics -> K matrix extraction + - multi-marker solvePnP pose estimation + reprojection error +- Verified pose estimation with synthetic projected points and with a real SVO-opened camera for intrinsics. +- Implemented `PoseAccumulator` in `aruco/pose_averaging.py` for robust SE(3) pose averaging. +- Added RANSAC-based filtering for rotation and translation consensus. +- Implemented quaternion eigen-mean fallback for rotation averaging when `scipy` is unavailable. +- Used median for robust translation averaging. + +## Task 6: ArUco Preview Helpers +- Implemented `aruco/preview.py` with `draw_detected_markers`, `draw_pose_axes`, and `show_preview`. +- Ensured grayscale images are converted to BGR before drawing to support colored overlays. +- Used `cv2.drawFrameAxes` for pose visualization. +- `show_preview` handles multiple windows based on camera serial numbers. + +- Removed *.parquet from .gitignore to allow versioning of marker geometry data. + +## Unit Testing +- Added pytest as a dev dependency. +- Implemented synthetic tests for pose math and averaging. +- Discovered that OpenCV's `projectPoints` is strict about `tvec` being floating-point; ensured tests use `dtype=np.float64`. +- Verified that `PoseAccumulator` correctly filters outliers using RANSAC and computes robust means. + +## Pytest sys.path Pitfall +When running pytest via a console script (e.g., `uv run pytest`), the current working directory is not always automatically added to `sys.path`. This can lead to `ModuleNotFoundError` for local packages like `aruco`. +**Fix:** Create a `tests/conftest.py` file that explicitly inserts the project root into `sys.path`: +```python +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\"))) +``` +This ensures deterministic import behavior regardless of how pytest is invoked. diff --git a/py_workspace/.sisyphus/notepads/aruco-svo-calibration/problems.md b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/problems.md new file mode 100644 index 0000000..754032e --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco-svo-calibration/problems.md @@ -0,0 +1,2 @@ +## 2026-02-04 Init +- (empty) diff --git a/py_workspace/.sisyphus/notepads/aruco_sync/decisions.md b/py_workspace/.sisyphus/notepads/aruco_sync/decisions.md new file mode 100644 index 0000000..c90738d --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco_sync/decisions.md @@ -0,0 +1,3 @@ +## Architectural Decisions +- Implemented SVOReader to encapsulate multi-camera management and synchronization logic. +- Used rectified sl.VIEW.LEFT for frame retrieval as requested. diff --git a/py_workspace/.sisyphus/notepads/aruco_sync/learnings.md b/py_workspace/.sisyphus/notepads/aruco_sync/learnings.md new file mode 100644 index 0000000..de2923d --- /dev/null +++ b/py_workspace/.sisyphus/notepads/aruco_sync/learnings.md @@ -0,0 +1,3 @@ +## SVO Synchronization Patterns +- Use sl.Camera.set_svo_position(0) to loop SVOs when END_OF_SVOFILE_REACHED is encountered. +- Synchronization can be achieved by comparing timestamps and skipping frames based on FPS. diff --git a/py_workspace/.sisyphus/plans/aruco-svo-calibration.md b/py_workspace/.sisyphus/plans/aruco-svo-calibration.md new file mode 100644 index 0000000..e3ac91f --- /dev/null +++ b/py_workspace/.sisyphus/plans/aruco-svo-calibration.md @@ -0,0 +1,745 @@ +# ArUco-Based Multi-Camera Extrinsic Calibration from SVO + +## TL;DR + +> **Quick Summary**: Create a CLI tool that reads synchronized SVO recordings from multiple ZED cameras, detects ArUco markers on a 3D calibration box, computes camera extrinsics using robust pose averaging, and outputs accurate 4x4 transform matrices. +> +> **Deliverables**: +> - `calibrate_extrinsics.py` - Main CLI tool +> - `pose_averaging.py` - Robust pose estimation utilities +> - `svo_sync.py` - Multi-SVO timestamp synchronization +> - `tests/test_pose_math.py` - Unit tests for pose calculations +> - Output JSON with calibrated extrinsics +> +> **Estimated Effort**: Medium (3-5 days) +> **Parallel Execution**: YES - 2 waves +> **Critical Path**: Task 1 → Task 3 → Task 5 → Task 7 → Task 8 + +--- + +## Context + +### Original Request +User wants to integrate ArUco marker detection with SVO recording playback to calibrate multi-camera extrinsics. The idea is to use timestamp-aligned SVO reading to extract frame batches at certain intervals, calculate camera extrinsics by averaging multiple pose estimates, and handle outliers. + +### Interview Summary +**Key Discussions**: +- Calibration target: 3D box with 6 diamond board faces (24 markers), defined in `standard_box_markers.parquet` +- Current extrinsics in `inside_network.json` are **inaccurate** and need replacement +- Output: New JSON file with 4x4 pose matrices, marker box as world origin +- Workflow: CLI with preview visualization + +**User Decisions**: +- Frame sampling: Fixed interval + quality filter +- Outlier handling: Two-stage (per-frame + RANSAC on pose set) +- Minimum markers: 4+ per frame +- Image stream: Rectified LEFT (no distortion needed) +- Sync tolerance: <33ms (1 frame at 30fps) +- Tests: Add after implementation + +### Research Findings +- **Existing patterns**: `find_extrinsic_object.py` (ArUco + solvePnP), `svo_playback.py` (multi-SVO sync) +- **ZED SDK intrinsics**: `cam.get_camera_information().camera_configuration.calibration_parameters.left_cam` +- **Rotation averaging**: `scipy.spatial.transform.Rotation.mean()` for geodesic mean +- **Translation averaging**: Median with MAD-based outlier rejection +- **Transform math**: `T_world_cam = inv(T_cam_marker)` when marker is world origin + +### Metis Review +**Identified Gaps** (addressed): +- World frame definition → Use coordinates from `standard_box_markers.parquet` +- Transform convention → Match `inside_network.json` format (T_world_from_cam, space-separated 4x4) +- Image stream → Rectified LEFT view (no distortion) +- Sync tolerance → Moderate (<33ms) +- Parquet validation → Must validate schema early +- Planar degeneracy → Require multi-face visibility or 3D spread check + +--- + +## Work Objectives + +### Core Objective +Build a robust CLI tool for multi-camera extrinsic calibration using ArUco markers detected in synchronized SVO playback. + +### Concrete Deliverables +- `py_workspace/calibrate_extrinsics.py` - Main entry point +- `py_workspace/aruco/pose_averaging.py` - Robust averaging utilities +- `py_workspace/aruco/svo_sync.py` - Multi-SVO synchronization +- `py_workspace/tests/test_pose_math.py` - Unit tests +- Output: `calibrated_extrinsics.json` with per-camera 4x4 transforms + +### Definition of Done +- [ ] `uv run calibrate_extrinsics.py --help` → exits 0, shows required args +- [ ] `uv run calibrate_extrinsics.py --validate-markers` → validates parquet schema +- [ ] `uv run calibrate_extrinsics.py --svos ... --output out.json` → produces valid JSON +- [ ] Output JSON contains 4 cameras with 4x4 matrices in correct format +- [ ] `uv run pytest tests/test_pose_math.py` → all tests pass +- [ ] Preview mode shows detected markers with axes overlay + +### Must Have +- Load multiple SVO files with timestamp synchronization +- Detect ArUco markers using cv2.aruco with DICT_4X4_50 +- Estimate per-frame poses using cv2.solvePnP +- Two-stage outlier rejection (reprojection error + pose RANSAC) +- Robust pose averaging (geodesic rotation mean + median translation) +- Output 4x4 transforms in `inside_network.json`-compatible format +- CLI with click for argument parsing +- Preview visualization with detected markers and axes + +### Must NOT Have (Guardrails) +- NO intrinsic calibration (use ZED SDK pre-calibrated values) +- NO bundle adjustment or SLAM +- NO modification of `inside_network.json` in-place +- NO right camera processing (use left only) +- NO GUI beyond simple preview window +- NO depth-based verification +- NO automatic config file updates + +--- + +## Verification Strategy + +> **UNIVERSAL RULE: ZERO HUMAN INTERVENTION** +> +> ALL tasks must be verifiable by agent-executed commands. No "user visually confirms" criteria. + +### Test Decision +- **Infrastructure exists**: NO (need to set up pytest) +- **Automated tests**: YES (tests-after) +- **Framework**: pytest + +### Agent-Executed QA Scenarios (MANDATORY) + +**Verification Tool by Deliverable Type:** + +| Type | Tool | How Agent Verifies | +|------|------|-------------------| +| CLI | Bash | Run command, check exit code, parse output | +| JSON output | Bash (jq) | Parse JSON, validate structure and values | +| Preview | Playwright | Capture window screenshot (optional) | +| Unit tests | Bash (pytest) | Run tests, assert all pass | + +--- + +## Execution Strategy + +### Parallel Execution Waves + +``` +Wave 1 (Start Immediately): +├── Task 1: Core pose math utilities +├── Task 2: Parquet loader and validator +└── Task 4: SVO synchronization module + +Wave 2 (After Wave 1): +├── Task 3: ArUco detection integration (depends: 1, 2) +├── Task 5: Robust pose aggregation (depends: 1) +└── Task 6: Preview visualization (depends: 3) + +Wave 3 (After Wave 2): +├── Task 7: CLI integration (depends: 3, 4, 5, 6) +└── Task 8: Tests and validation (depends: all) + +Critical Path: Task 1 → Task 3 → Task 7 → Task 8 +``` + +### Dependency Matrix + +| Task | Depends On | Blocks | Can Parallelize With | +|------|------------|--------|---------------------| +| 1 | None | 3, 5 | 2, 4 | +| 2 | None | 3 | 1, 4 | +| 3 | 1, 2 | 6, 7 | 5 | +| 4 | None | 7 | 1, 2 | +| 5 | 1 | 7 | 3, 6 | +| 6 | 3 | 7 | 5 | +| 7 | 3, 4, 5, 6 | 8 | None | +| 8 | 7 | None | None | + +--- + +## TODOs + +- [ ] 1. Create pose math utilities module + + **What to do**: + - Create `py_workspace/aruco/pose_math.py` + - Implement `rvec_tvec_to_matrix(rvec, tvec) -> np.ndarray` (4x4 homogeneous) + - Implement `matrix_to_rvec_tvec(T) -> tuple[np.ndarray, np.ndarray]` + - Implement `invert_transform(T) -> np.ndarray` + - Implement `compose_transforms(T1, T2) -> np.ndarray` + - Implement `compute_reprojection_error(obj_pts, img_pts, rvec, tvec, K) -> float` + - Use numpy for all matrix operations + + **Must NOT do**: + - Do NOT use scipy in this module (keep it pure numpy for core math) + - Do NOT implement averaging here (that's Task 5) + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Pure math utilities, straightforward implementation + - **Skills**: [] + - No special skills needed + + **Parallelization**: + - **Can Run In Parallel**: YES + - **Parallel Group**: Wave 1 (with Tasks 2, 4) + - **Blocks**: Tasks 3, 5 + - **Blocked By**: None + + **References**: + - `py_workspace/aruco/find_extrinsic_object.py:123-145` - solvePnP usage and rvec/tvec handling + - OpenCV docs: `cv2.Rodrigues()` for rvec↔rotation matrix conversion + - OpenCV docs: `cv2.projectPoints()` for reprojection + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: rvec/tvec round-trip conversion + Tool: Bash (python) + Steps: + 1. python -c "from aruco.pose_math import *; import numpy as np; rvec=np.array([0.1,0.2,0.3]); tvec=np.array([1,2,3]); T=rvec_tvec_to_matrix(rvec,tvec); r2,t2=matrix_to_rvec_tvec(T); assert np.allclose(rvec,r2,atol=1e-6) and np.allclose(tvec,t2,atol=1e-6); print('PASS')" + Expected Result: Prints "PASS" + + Scenario: Transform inversion identity + Tool: Bash (python) + Steps: + 1. python -c "from aruco.pose_math import *; import numpy as np; T=np.eye(4); T[:3,3]=[1,2,3]; T_inv=invert_transform(T); result=compose_transforms(T,T_inv); assert np.allclose(result,np.eye(4),atol=1e-9); print('PASS')" + Expected Result: Prints "PASS" + ``` + + **Commit**: YES + - Message: `feat(aruco): add pose math utilities for transform operations` + - Files: `py_workspace/aruco/pose_math.py` + +--- + +- [ ] 2. Create parquet loader and validator + + **What to do**: + - Create `py_workspace/aruco/marker_geometry.py` + - Implement `load_marker_geometry(parquet_path) -> dict[int, np.ndarray]` + - Returns mapping: marker_id → corner coordinates (4, 3) + - Implement `validate_marker_geometry(geometry) -> bool` + - Check all expected marker IDs present + - Check coordinates are in meters (reasonable range) + - Check corner ordering is consistent + - Use awkward-array (already in project) for parquet reading + + **Must NOT do**: + - Do NOT hardcode marker IDs (read from parquet) + - Do NOT assume specific number of markers (validate dynamically) + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Simple data loading and validation + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES + - **Parallel Group**: Wave 1 (with Tasks 1, 4) + - **Blocks**: Task 3 + - **Blocked By**: None + + **References**: + - `py_workspace/aruco/find_extrinsic_object.py:55-66` - Parquet loading with awkward-array + - `py_workspace/aruco/output/standard_box_markers.parquet` - Actual data file + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: Load marker geometry from parquet + Tool: Bash (python) + Preconditions: standard_box_markers.parquet exists + Steps: + 1. cd /workspaces/zed-playground/py_workspace + 2. python -c "from aruco.marker_geometry import load_marker_geometry; g=load_marker_geometry('aruco/output/standard_box_markers.parquet'); print(f'Loaded {len(g)} markers'); assert len(g) >= 4; print('PASS')" + Expected Result: Prints marker count and "PASS" + + Scenario: Validate geometry returns True for valid data + Tool: Bash (python) + Steps: + 1. python -c "from aruco.marker_geometry import *; g=load_marker_geometry('aruco/output/standard_box_markers.parquet'); assert validate_marker_geometry(g); print('PASS')" + Expected Result: Prints "PASS" + ``` + + **Commit**: YES + - Message: `feat(aruco): add marker geometry loader with validation` + - Files: `py_workspace/aruco/marker_geometry.py` + +--- + +- [ ] 3. Integrate ArUco detection with ZED intrinsics + + **What to do**: + - Create `py_workspace/aruco/detector.py` + - Implement `create_detector() -> cv2.aruco.ArucoDetector` using DICT_4X4_50 + - Implement `detect_markers(image, detector) -> tuple[corners, ids]` + - Implement `get_zed_intrinsics(camera) -> tuple[np.ndarray, np.ndarray]` + - Extract K matrix (3x3) and distortion from ZED SDK + - For rectified images, distortion should be zeros + - Implement `estimate_pose(corners, ids, marker_geometry, K, dist) -> tuple[rvec, tvec, error]` + - Match detected markers to known 3D points + - Call solvePnP with SOLVEPNP_SQPNP + - Compute and return reprojection error + - Require minimum 4 markers for valid pose + + **Must NOT do**: + - Do NOT use deprecated `estimatePoseSingleMarkers` + - Do NOT accept poses with <4 markers + + **Recommended Agent Profile**: + - **Category**: `unspecified-low` + - Reason: Integration of existing patterns, moderate complexity + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: NO + - **Parallel Group**: Wave 2 (after Task 1, 2) + - **Blocks**: Tasks 6, 7 + - **Blocked By**: Tasks 1, 2 + + **References**: + - `py_workspace/aruco/find_extrinsic_object.py:54-145` - Full ArUco detection and solvePnP pattern + - `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi:5110-5180` - CameraParameters with fx, fy, cx, cy, disto + - `py_workspace/svo_playback.py:46` - get_camera_information() usage + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: Detector creation succeeds + Tool: Bash (python) + Steps: + 1. python -c "from aruco.detector import create_detector; d=create_detector(); print(type(d)); print('PASS')" + Expected Result: Prints detector type and "PASS" + + Scenario: Pose estimation with synthetic data + Tool: Bash (python) + Steps: + 1. python -c " + import numpy as np + from aruco.detector import estimate_pose + from aruco.marker_geometry import load_marker_geometry + # Create synthetic test with known geometry + geom = load_marker_geometry('aruco/output/standard_box_markers.parquet') + K = np.array([[700,0,960],[0,700,540],[0,0,1]], dtype=np.float64) + # Test passes if function runs without error + print('PASS') + " + Expected Result: Prints "PASS" + ``` + + **Commit**: YES + - Message: `feat(aruco): add ArUco detector with ZED intrinsics integration` + - Files: `py_workspace/aruco/detector.py` + +--- + +- [ ] 4. Create multi-SVO synchronization module + + **What to do**: + - Create `py_workspace/aruco/svo_sync.py` + - Implement `SVOReader` class: + - `__init__(svo_paths: list[str])` - Open all SVOs + - `get_camera_info(idx) -> CameraInfo` - Serial, resolution, intrinsics + - `sync_to_latest_start()` - Align all cameras to latest start timestamp + - `grab_synced(tolerance_ms=33) -> dict[serial, Frame] | None` - Get synced frames + - `seek_to_frame(frame_num)` - Seek all cameras + - `close()` - Cleanup + - Frame should contain: image (numpy), timestamp_ns, serial_number + - Use pattern from `svo_playback.py` for sync logic + + **Must NOT do**: + - Do NOT implement complex clock drift correction + - Do NOT handle streaming (SVO only) + + **Recommended Agent Profile**: + - **Category**: `unspecified-low` + - Reason: Adapting existing pattern, moderate complexity + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES + - **Parallel Group**: Wave 1 (with Tasks 1, 2) + - **Blocks**: Task 7 + - **Blocked By**: None + + **References**: + - `py_workspace/svo_playback.py:18-102` - Complete multi-SVO sync pattern + - `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi:10010-10097` - SVO position and frame methods + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: SVOReader opens multiple files + Tool: Bash (python) + Preconditions: SVO files exist in py_workspace + Steps: + 1. python -c " + from aruco.svo_sync import SVOReader + import glob + svos = glob.glob('*.svo2')[:2] + if len(svos) >= 2: + reader = SVOReader(svos) + print(f'Opened {len(svos)} SVOs') + reader.close() + print('PASS') + else: + print('SKIP: Need 2+ SVOs') + " + Expected Result: Prints "PASS" or "SKIP" + + Scenario: Sync aligns timestamps + Tool: Bash (python) + Steps: + 1. Test sync_to_latest_start returns without error + Expected Result: No exception raised + ``` + + **Commit**: YES + - Message: `feat(aruco): add multi-SVO synchronization reader` + - Files: `py_workspace/aruco/svo_sync.py` + +--- + +- [ ] 5. Implement robust pose aggregation + + **What to do**: + - Create `py_workspace/aruco/pose_averaging.py` + - Implement `PoseAccumulator` class: + - `add_pose(T: np.ndarray, reproj_error: float, frame_id: int)` + - `get_inlier_poses(max_reproj_error=2.0) -> list[np.ndarray]` + - `compute_robust_mean() -> tuple[np.ndarray, dict]` + - Use scipy.spatial.transform.Rotation.mean() for rotation + - Use median for translation + - Return stats dict: {n_total, n_inliers, median_error, std_rotation_deg} + - Implement `ransac_filter_poses(poses, rot_thresh_deg=5.0, trans_thresh_m=0.05) -> list[int]` + - Return indices of inlier poses + + **Must NOT do**: + - Do NOT implement bundle adjustment + - Do NOT modify poses in-place + + **Recommended Agent Profile**: + - **Category**: `unspecified-low` + - Reason: Math-focused but requires scipy understanding + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES + - **Parallel Group**: Wave 2 (with Task 3) + - **Blocks**: Task 7 + - **Blocked By**: Task 1 + + **References**: + - Librarian findings on `scipy.spatial.transform.Rotation.mean()` + - Librarian findings on RANSAC-style pose filtering + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: Rotation averaging produces valid result + Tool: Bash (python) + Steps: + 1. python -c " + from aruco.pose_averaging import PoseAccumulator + import numpy as np + acc = PoseAccumulator() + T = np.eye(4) + acc.add_pose(T, reproj_error=1.0, frame_id=0) + acc.add_pose(T, reproj_error=1.5, frame_id=1) + mean_T, stats = acc.compute_robust_mean() + assert mean_T.shape == (4,4) + assert stats['n_inliers'] == 2 + print('PASS') + " + Expected Result: Prints "PASS" + + Scenario: RANSAC rejects outliers + Tool: Bash (python) + Steps: + 1. python -c " + from aruco.pose_averaging import ransac_filter_poses + import numpy as np + # Create 3 similar poses + 1 outlier + poses = [np.eye(4) for _ in range(3)] + outlier = np.eye(4); outlier[:3,3] = [10,10,10] # Far away + poses.append(outlier) + inliers = ransac_filter_poses(poses, trans_thresh_m=0.1) + assert len(inliers) == 3 + assert 3 not in inliers + print('PASS') + " + Expected Result: Prints "PASS" + ``` + + **Commit**: YES + - Message: `feat(aruco): add robust pose averaging with RANSAC filtering` + - Files: `py_workspace/aruco/pose_averaging.py` + +--- + +- [ ] 6. Add preview visualization + + **What to do**: + - Create `py_workspace/aruco/preview.py` + - Implement `draw_detected_markers(image, corners, ids) -> np.ndarray` + - Draw marker outlines and IDs + - Implement `draw_pose_axes(image, rvec, tvec, K, length=0.1) -> np.ndarray` + - Use cv2.drawFrameAxes + - Implement `show_preview(images: dict[str, np.ndarray], wait_ms=1) -> int` + - Show multiple camera views in separate windows + - Return key pressed + + **Must NOT do**: + - Do NOT implement complex GUI + - Do NOT block indefinitely (use waitKey with timeout) + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Simple OpenCV visualization + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES + - **Parallel Group**: Wave 2 (with Task 5) + - **Blocks**: Task 7 + - **Blocked By**: Task 3 + + **References**: + - `py_workspace/aruco/find_extrinsic_object.py:138-145` - drawFrameAxes usage + - `py_workspace/aruco/find_extrinsic_object.py:84-105` - Marker visualization + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: Draw functions return valid images + Tool: Bash (python) + Steps: + 1. python -c " + from aruco.preview import draw_detected_markers + import numpy as np + img = np.zeros((480,640,3), dtype=np.uint8) + corners = [np.array([[100,100],[200,100],[200,200],[100,200]], dtype=np.float32)] + ids = np.array([[1]]) + result = draw_detected_markers(img, corners, ids) + assert result.shape == (480,640,3) + print('PASS') + " + Expected Result: Prints "PASS" + ``` + + **Commit**: YES + - Message: `feat(aruco): add preview visualization utilities` + - Files: `py_workspace/aruco/preview.py` + +--- + +- [ ] 7. Create main CLI tool + + **What to do**: + - Create `py_workspace/calibrate_extrinsics.py` + - Use click for CLI: + - `--svo PATH` (multiple) - SVO file paths + - `--markers PATH` - Marker geometry parquet + - `--output PATH` - Output JSON path + - `--sample-interval INT` - Frame interval (default 30) + - `--max-reproj-error FLOAT` - Threshold (default 2.0) + - `--preview / --no-preview` - Show visualization + - `--validate-markers` - Only validate parquet and exit + - `--self-check` - Run and report quality metrics + - Main workflow: + 1. Load marker geometry and validate + 2. Open SVOs and sync + 3. Sample frames at interval + 4. For each synced frame set: + - Detect markers in each camera + - Estimate pose if ≥4 markers + - Accumulate poses per camera + 5. Compute robust mean per camera + 6. Output JSON in inside_network.json-compatible format + - Output JSON format: + ```json + { + "serial": { + "pose": "r00 r01 r02 tx r10 r11 r12 ty ...", + "stats": { "n_frames": N, "median_reproj_error": X } + } + } + ``` + + **Must NOT do**: + - Do NOT modify existing config files + - Do NOT implement auto-update of inside_network.json + + **Recommended Agent Profile**: + - **Category**: `unspecified-high` + - Reason: Integration of all components, complex workflow + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: NO + - **Parallel Group**: Wave 3 (final integration) + - **Blocks**: Task 8 + - **Blocked By**: Tasks 3, 4, 5, 6 + + **References**: + - `py_workspace/svo_playback.py` - CLI structure with argparse (adapt to click) + - `py_workspace/aruco/find_extrinsic_object.py` - Main loop pattern + - `zed_settings/inside_network.json:20` - Output pose format + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: CLI help works + Tool: Bash + Steps: + 1. cd /workspaces/zed-playground/py_workspace + 2. uv run calibrate_extrinsics.py --help + Expected Result: Exit code 0, shows --svo, --markers, --output options + + Scenario: Validate markers only mode + Tool: Bash + Steps: + 1. uv run calibrate_extrinsics.py --markers aruco/output/standard_box_markers.parquet --validate-markers + Expected Result: Exit code 0, prints marker count + + Scenario: Full calibration produces JSON + Tool: Bash + Preconditions: SVO files exist + Steps: + 1. uv run calibrate_extrinsics.py \ + --svo ZED_SN46195029.svo2 \ + --svo ZED_SN44435674.svo2 \ + --markers aruco/output/standard_box_markers.parquet \ + --output /tmp/test_extrinsics.json \ + --no-preview \ + --sample-interval 100 + 2. jq 'keys' /tmp/test_extrinsics.json + Expected Result: Exit code 0, JSON contains camera serials + + Scenario: Self-check reports quality + Tool: Bash + Steps: + 1. uv run calibrate_extrinsics.py ... --self-check + Expected Result: Prints per-camera stats including median reproj error + ``` + + **Commit**: YES + - Message: `feat(aruco): add calibrate_extrinsics CLI tool` + - Files: `py_workspace/calibrate_extrinsics.py` + +--- + +- [ ] 8. Add unit tests and final validation + + **What to do**: + - Create `py_workspace/tests/test_pose_math.py` + - Test cases: + - `test_rvec_tvec_roundtrip` - Convert and back + - `test_transform_inversion` - T @ inv(T) = I + - `test_transform_composition` - Known compositions + - `test_reprojection_error_zero` - Perfect projection = 0 error + - Create `py_workspace/tests/test_pose_averaging.py` + - Test cases: + - `test_mean_of_identical_poses` - Returns same pose + - `test_outlier_rejection` - Outliers removed + - Add `scipy` to pyproject.toml if not present + - Run full test suite + + **Must NOT do**: + - Do NOT require real SVO files for unit tests (use synthetic data) + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Straightforward test implementation + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: NO + - **Parallel Group**: Wave 3 (final) + - **Blocks**: None + - **Blocked By**: Task 7 + + **References**: + - Task 1 acceptance criteria for test patterns + - Task 5 acceptance criteria for averaging tests + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios:** + + ``` + Scenario: All unit tests pass + Tool: Bash + Steps: + 1. cd /workspaces/zed-playground/py_workspace + 2. uv run pytest tests/ -v + Expected Result: Exit code 0, all tests pass + + Scenario: Coverage check + Tool: Bash + Steps: + 1. uv run pytest tests/ --tb=short + Expected Result: Shows test results summary + ``` + + **Commit**: YES + - Message: `test(aruco): add unit tests for pose math and averaging` + - Files: `py_workspace/tests/test_pose_math.py`, `py_workspace/tests/test_pose_averaging.py` + +--- + +## Commit Strategy + +| After Task | Message | Files | Verification | +|------------|---------|-------|--------------| +| 1 | `feat(aruco): add pose math utilities` | pose_math.py | python import test | +| 2 | `feat(aruco): add marker geometry loader` | marker_geometry.py | python import test | +| 3 | `feat(aruco): add ArUco detector` | detector.py | python import test | +| 4 | `feat(aruco): add multi-SVO sync` | svo_sync.py | python import test | +| 5 | `feat(aruco): add pose averaging` | pose_averaging.py | python import test | +| 6 | `feat(aruco): add preview utils` | preview.py | python import test | +| 7 | `feat(aruco): add calibrate CLI` | calibrate_extrinsics.py | --help works | +| 8 | `test(aruco): add unit tests` | tests/*.py | pytest passes | + +--- + +## Success Criteria + +### Verification Commands +```bash +# CLI works +uv run calibrate_extrinsics.py --help # Expected: exit 0 + +# Marker validation +uv run calibrate_extrinsics.py --markers aruco/output/standard_box_markers.parquet --validate-markers # Expected: exit 0 + +# Tests pass +uv run pytest tests/ -v # Expected: all pass + +# Full calibration (with real SVOs) +uv run calibrate_extrinsics.py --svo *.svo2 --markers aruco/output/standard_box_markers.parquet --output calibrated.json --no-preview +jq 'keys' calibrated.json # Expected: camera serials +``` + +### Final Checklist +- [ ] All "Must Have" present +- [ ] All "Must NOT Have" absent +- [ ] All tests pass +- [ ] CLI --help shows all options +- [ ] Output JSON matches inside_network.json pose format +- [ ] Preview shows detected markers with axes diff --git a/py_workspace/AGENTS.md b/py_workspace/AGENTS.md new file mode 100644 index 0000000..ad5aac5 --- /dev/null +++ b/py_workspace/AGENTS.md @@ -0,0 +1,37 @@ +# Python Agent Context + +## Environment +- **Directory**: `/workspaces/zed-playground/py_workspace` +- **Package Manager**: `uv` +- **Python Version**: 3.12+ (Managed by `uv`) +- **Dependencies**: Defined in `pyproject.toml` + - `pyzed`: ZED SDK Python wrapper + - `opencv-python`: GUI and image processing + - `click`: CLI argument parsing + - `numpy`, `cupy-cuda12x`: Data manipulation + +## Workflow & Commands +- **Run Scripts**: Always use `uv run` to ensure correct environment. + ```bash + uv run streaming_receiver.py --help + uv run recording_multi.py + ``` +- **New Dependencies**: Add with `uv add ` (e.g., `uv add requests`). + +## Architecture & Patterns +- **Network Camera Handling**: + - Use `zed_network_utils.py` for all network config parsing. + - Config file: `/workspaces/zed-playground/zed_settings/inside_network.json` +- **Threading Model**: + - **Main Thread**: MUST handle all OpenCV GUI (`cv2.imshow`, `cv2.waitKey`). + - **Worker Threads**: Handle `camera.grab()` and data retrieval. + - **Communication**: Use `queue.Queue` to pass frames from workers to main. +- **ZED API Patterns**: + - Streaming Input: `init_params.set_from_stream(ip, port)` + - Serial Number: Use `camera.get_camera_information().serial_number`. + +## Documentation & References +- **Python API Docs**: `/usr/local/zed/doc/API/html/python/index.html` +- **ZED SDK General Docs**: `/usr/local/zed/doc/` +- **C++ Headers (Reference)**: `/usr/local/zed/include/sl/` + - Useful for understanding underlying enum values or behaviors not fully detailed in Python docstrings. \ No newline at end of file diff --git a/py_workspace/recording_multi.py b/py_workspace/recording_multi.py new file mode 100755 index 0000000..90daf44 --- /dev/null +++ b/py_workspace/recording_multi.py @@ -0,0 +1,135 @@ +""" +This sample shows how to record video in Stereolabs SVO format from network cameras. +SVO video files can be played with the ZED API and used with its different modules. +""" + +import pyzed.sl as sl +import threading +import signal +import time +import sys +import zed_network_utils + +# Global variable to handle exit +exit_app = False + + +def signal_handler(signal, frame): + """Handle Ctrl+C to properly exit the program""" + global exit_app + exit_app = True + print("\nCtrl+C pressed. Exiting...") + + +def acquisition(zed): + """Acquisition thread function to continuously grab frames""" + infos = zed.get_camera_information() + + while not exit_app: + if zed.grab() == sl.ERROR_CODE.SUCCESS: + # If needed, add more processing here + # But be aware that any processing involving the GiL will slow down the multi threading performance + pass + + print(f"{infos.camera_model}[{infos.serial_number}] QUIT") + + # disable Recording + zed.disable_recording() + # close the Camera + zed.close() + + +def open_camera(zed, config): + """Open a camera with given configuration and enable streaming""" + ip, port = zed_network_utils.extract_ip_port(config) + + if not ip or not port: + return False + + try: + serial = config["FusionConfiguration"]["serial_number"] + except KeyError: + print("Error: Serial number not found in config") + return False + + # Open the remote camera using utility function + if not zed_network_utils.open_remote_camera(zed, ip, port): + return False + + print(f"ZED SN{serial} Opened from {ip}:{port}") + + # Enable Recording + output_svo_file = f"ZED_SN{serial}.svo2" + recording_param = sl.RecordingParameters( + output_svo_file.replace(" ", ""), sl.SVO_COMPRESSION_MODE.H265 + ) + record_err = zed.enable_recording(recording_param) + if record_err == sl.ERROR_CODE.SUCCESS: + print(f"ZED SN{serial} Enabled recording") + else: + print(f"ZED SN{serial} Recording initialization error: {record_err}") + zed.close() + return False + + print(f"Recording SVO file: {recording_param.video_filename}") + + return True + + +def main(): + global exit_app + + # Read network configuration using utility + network_config = zed_network_utils.parse_network_config() + + if not network_config: + return 1 + + print(f"Found {len(network_config)} cameras in configuration") + + if len(network_config) == 0: + print("No ZED configured, exit program") + return 1 + + zed_open = False + + # Open all cameras + zeds = [] + threads = [] + + for serial, config in network_config.items(): + zed = sl.Camera() + if open_camera(zed, config): + zeds.append(zed) + zed_open = True + + # Start acquisition thread immediately + thread = threading.Thread(target=acquisition, args=(zed,)) + thread.start() + threads.append(thread) + + if not zed_open: + print("No ZED opened, exit program") + return 1 + + # Set up signal handler for Ctrl+C + signal.signal(signal.SIGINT, signal_handler) + print("Press Ctrl+C to exit") + + # Main loop + while not exit_app: + time.sleep(0.02) + + # Wait for all threads to finish + print("Exit signal, closing ZEDs") + time.sleep(0.1) + + for thread in threads: + thread.join() + + print("Program exited") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/py_workspace/streaming_receiver.py b/py_workspace/streaming_receiver.py new file mode 100755 index 0000000..3e716df --- /dev/null +++ b/py_workspace/streaming_receiver.py @@ -0,0 +1,437 @@ +######################################################################## +# +# Copyright (c) 2022, STEREOLABS. +# +# All rights reserved. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +######################################################################## + +""" +Read a stream and display the left images using OpenCV +""" + +import sys +import pyzed.sl as sl +import cv2 +import socket +import threading +import queue +import zed_network_utils +import click +import time + +# Global variables +exit_app = False +camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS +str_camera_settings = "BRIGHTNESS" +step_camera_settings = 1 +led_on = True + +# Map to store selection state for each camera window +# Key: window_name (str), Value: dict with selection state +window_selections = {} + + +class CameraHandler: + def __init__(self, ip, port, serial_number=None): + self.ip = ip + self.port = port + self.serial_number = serial_number + self.id = f"{serial_number}" if serial_number else f"{ip}:{port}" + self.cam = sl.Camera() + self.runtime = sl.RuntimeParameters() + self.mat = sl.Mat() + self.frame_queue = queue.Queue(maxsize=1) # Keep only latest frame + self.running = False + self.thread = None + self.win_name = f"Camera {self.id}" + self.status_msg = "Initializing..." + + def start(self): + init_parameters = sl.InitParameters() + init_parameters.depth_mode = sl.DEPTH_MODE.NONE + init_parameters.sdk_verbose = 1 + init_parameters.set_from_stream(self.ip, self.port) + + status = self.cam.open(init_parameters) + if status != sl.ERROR_CODE.SUCCESS: + print(f"Camera {self.id} Open : {repr(status)}. Skipping.") + self.status_msg = f"Error: {status}" + return False + + print(f"\n--- Camera {self.id} Info ---") + print_camera_information(self.cam) + + self.running = True + self.thread = threading.Thread(target=self._grab_loop) + self.thread.daemon = True + self.thread.start() + return True + + def stop(self): + self.running = False + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=1.0) + self.cam.close() + + def _grab_loop(self): + while self.running: + err = self.cam.grab(self.runtime) + if err == sl.ERROR_CODE.SUCCESS: + # Retrieve image into a local Mat, then convert to numpy for queue + # We need a lock or just copy it because Mat is not thread-safe if modified + self.cam.retrieve_image(self.mat, sl.VIEW.LEFT) + frame = self.mat.get_data() + + # We can clone it to be safe for passing to main thread + # (get_data() returns a numpy view, so deep copy might be needed if buffer is reused) + # However, for simple display, usually it's fine if we consume fast enough. + # To be perfectly safe, let's copy. + frame_copy = frame.copy() + + try: + # Remove old frame if exists to keep latest + if self.frame_queue.full(): + self.frame_queue.get_nowait() + self.frame_queue.put_nowait(frame_copy) + except queue.Full: + pass + else: + sl.sleep_ms(10) + + +def get_selection_state(win_name): + if win_name not in window_selections: + window_selections[win_name] = { + "rect": sl.Rect(), + "in_progress": False, + "origin": (-1, -1), + } + return window_selections[win_name] + + +def on_mouse(event, x, y, flags, param): + win_name = param + state = get_selection_state(win_name) + + if event == cv2.EVENT_LBUTTONDOWN: + state["origin"] = (x, y) + state["in_progress"] = True + elif event == cv2.EVENT_LBUTTONUP: + state["in_progress"] = False + elif event == cv2.EVENT_RBUTTONDOWN: + state["in_progress"] = False + state["rect"] = sl.Rect(0, 0, 0, 0) + + if state["in_progress"]: + state["rect"].x = min(x, state["origin"][0]) + state["rect"].y = min(y, state["origin"][1]) + state["rect"].width = abs(x - state["origin"][0]) + 1 + state["rect"].height = abs(y - state["origin"][1]) + 1 + + +@click.command() +@click.option( + "--config", + "-c", + type=click.Path(exists=True), + help="Path to JSON configuration file.", +) +@click.option( + "--ip", + "-i", + multiple=True, + help="IP address(es) in format IP:PORT. Can be used multiple times.", +) +def main(config, ip): + """ + ZED Streaming Receiver. + """ + global exit_app + + cameras_to_open = [] + + # 1. Parse Config File if provided + if config: + print(f"Reading configuration from {config}") + network_config = zed_network_utils.parse_network_config(config) + if network_config: + for serial, cam_config in network_config.items(): + cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config) + if cam_ip and cam_port: + cameras_to_open.append((cam_ip, cam_port, serial)) + + # 2. Parse CLI IPs if provided + if ip: + for ip_str in ip: + if ":" in ip_str: + try: + host, port_str = ip_str.split(":") + port = int(port_str) + cameras_to_open.append((host, port, ip_str)) + except ValueError: + print(f"Invalid format for IP: {ip_str}. Expected IP:PORT") + else: + print( + f"Treating '{ip_str}' as serial number, looking up in default config..." + ) + cam_config = zed_network_utils.get_camera_config_by_serial(ip_str) + if cam_config: + cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config) + if cam_ip and cam_port: + cameras_to_open.append((cam_ip, cam_port, ip_str)) + else: + print(f"Could not find IP/Port for serial {ip_str}") + else: + print(f"Invalid format or unknown serial: {ip_str}.") + + if not cameras_to_open: + print("No valid cameras specified. Use --config or --ip.") + print_help() + return + + print(f"Starting {len(cameras_to_open)} camera streams...") + print_help() + + handlers = [] + + # Initialize all cameras + for cam_ip, cam_port, cam_id in cameras_to_open: + handler = CameraHandler(cam_ip, cam_port, cam_id) + if handler.start(): + handlers.append(handler) + # Create window for this camera + cv2.namedWindow(handler.win_name) + cv2.setMouseCallback(handler.win_name, on_mouse, param=handler.win_name) + + if not handlers: + print("No cameras could be opened. Exiting.") + return + + try: + while not exit_app: + # Main GUI Loop + for handler in handlers: + try: + # Get frame from queue (non-blocking) + frame = handler.frame_queue.get_nowait() + + # Draw selection if exists + state = get_selection_state(handler.win_name) + rect = state["rect"] + if not rect.is_empty() and rect.is_contained( + sl.Rect(0, 0, frame.shape[1], frame.shape[0]) + ): + cv2.rectangle( + frame, + (rect.x, rect.y), + (rect.width + rect.x, rect.height + rect.y), + (220, 180, 20), + 2, + ) + + cv2.imshow(handler.win_name, frame) + + except queue.Empty: + pass # No new frame, just continue + + # Handle key events (once per loop iteration, for all windows) + key = cv2.waitKey(10) + if key != -1: + if key == 113: # q + exit_app = True + else: + # Apply settings to ALL cameras + # Note: We pick the first camera to get current value, then set to all + # This implies settings are synchronized + if handlers: + update_camera_settings( + key, handlers[0].cam, handlers, handlers[0].runtime + ) + + except KeyboardInterrupt: + print("\nCtrl+C pressed. Exiting...") + finally: + print("Closing cameras...") + for handler in handlers: + handler.stop() + cv2.destroyAllWindows() + print("Program exited") + + +def print_camera_information(cam): + cam_info = cam.get_camera_information() + print("ZED Model : {0}".format(cam_info.camera_model)) + print("ZED Serial Number : {0}".format(cam_info.serial_number)) + print( + "ZED Camera Firmware : {0}/{1}".format( + cam_info.camera_configuration.firmware_version, + cam_info.sensors_configuration.firmware_version, + ) + ) + print( + "ZED Camera Resolution : {0}x{1}".format( + round(cam_info.camera_configuration.resolution.width, 2), + cam.get_camera_information().camera_configuration.resolution.height, + ) + ) + print( + "ZED Camera FPS : {0}".format(int(cam_info.camera_configuration.fps)) + ) + + +def print_help(): + print("\n\nCamera controls hotkeys:") + print("* Increase camera settings value: '+'") + print("* Decrease camera settings value: '-'") + print("* Toggle camera settings: 's'") + print("* Toggle camera LED: 'l' (lower L)") + print("* Reset all parameters: 'r'") + print("* Reset exposure ROI to full image 'f'") + print("* Use mouse to select an image area to apply exposure (press 'a')") + print("* Exit : 'q'\n") + + +# Update camera setting on key press +def update_camera_settings(key, reference_cam, handlers, runtime): + global led_on, camera_settings, str_camera_settings + + # This logic updates ALL cameras based on the input key + + if key == 115: # for 's' key + switch_camera_settings() + return + + if key == 108: # for 'l' key + led_on = not led_on + print(f"LED Status: {led_on}") + elif key == 114: # 'r' + print("[Sample] Reset all settings to default") + + # Determine action and value + action = None # 'inc', 'dec', 'set' + val = None + + if key == 43: + action = "inc" + elif key == 45: + action = "dec" + elif key == 114: + action = "reset" + elif key == 108: + action = "led" + elif key == 97 or key == 102: + action = "roi" + + # Apply to all cameras + for handler in handlers: + cam = handler.cam + + if action == "inc": + current_value = cam.get_camera_settings(camera_settings)[1] + cam.set_camera_settings( + camera_settings, current_value + step_camera_settings + ) + if handler == handlers[0]: # Print only once + print( + str_camera_settings + + ": " + + str(current_value + step_camera_settings) + ) + + elif action == "dec": + current_value = cam.get_camera_settings(camera_settings)[1] + if current_value >= 1: + cam.set_camera_settings( + camera_settings, current_value - step_camera_settings + ) + if handler == handlers[0]: + print( + str_camera_settings + + ": " + + str(current_value - step_camera_settings) + ) + + elif action == "reset": + cam.set_camera_settings(sl.VIDEO_SETTINGS.BRIGHTNESS, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.CONTRAST, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.HUE, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.SATURATION, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.SHARPNESS, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.GAIN, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.EXPOSURE, -1) + cam.set_camera_settings(sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE, -1) + + elif action == "led": + cam.set_camera_settings(sl.VIDEO_SETTINGS.LED_STATUS, led_on) + + elif action == "roi": + state = get_selection_state(handler.win_name) + rect = state["rect"] + reset = key == 102 + + if reset: + cam.set_camera_settings_roi( + sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH, True + ) + if handler == handlers[0]: + print("[Sample] reset AEC_AGC_ROI to full res") + else: + print(f"[Sample] set AEC_AGC_ROI on {handler.win_name}") + cam.set_camera_settings_roi( + sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH + ) + + +# Function to switch between different camera settings (brightness, contrast, etc.). +def switch_camera_settings(): + global camera_settings + global str_camera_settings + if camera_settings == sl.VIDEO_SETTINGS.BRIGHTNESS: + camera_settings = sl.VIDEO_SETTINGS.CONTRAST + str_camera_settings = "Contrast" + print("[Sample] Switch to camera settings: CONTRAST") + elif camera_settings == sl.VIDEO_SETTINGS.CONTRAST: + camera_settings = sl.VIDEO_SETTINGS.HUE + str_camera_settings = "Hue" + print("[Sample] Switch to camera settings: HUE") + elif camera_settings == sl.VIDEO_SETTINGS.HUE: + camera_settings = sl.VIDEO_SETTINGS.SATURATION + str_camera_settings = "Saturation" + print("[Sample] Switch to camera settings: SATURATION") + elif camera_settings == sl.VIDEO_SETTINGS.SATURATION: + camera_settings = sl.VIDEO_SETTINGS.SHARPNESS + str_camera_settings = "Sharpness" + print("[Sample] Switch to camera settings: Sharpness") + elif camera_settings == sl.VIDEO_SETTINGS.SHARPNESS: + camera_settings = sl.VIDEO_SETTINGS.GAIN + str_camera_settings = "Gain" + print("[Sample] Switch to camera settings: GAIN") + elif camera_settings == sl.VIDEO_SETTINGS.GAIN: + camera_settings = sl.VIDEO_SETTINGS.EXPOSURE + str_camera_settings = "Exposure" + print("[Sample] Switch to camera settings: EXPOSURE") + elif camera_settings == sl.VIDEO_SETTINGS.EXPOSURE: + camera_settings = sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE + str_camera_settings = "White Balance" + print("[Sample] Switch to camera settings: WHITEBALANCE") + elif camera_settings == sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE: + camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS + str_camera_settings = "Brightness" + print("[Sample] Switch to camera settings: BRIGHTNESS") + + +if __name__ == "__main__": + main() diff --git a/py_workspace/svo_playback.py b/py_workspace/svo_playback.py new file mode 100755 index 0000000..e644c2e --- /dev/null +++ b/py_workspace/svo_playback.py @@ -0,0 +1,185 @@ +# ... existing code ... +import sys +import pyzed.sl as sl +import cv2 +import argparse +import os +import math + + +def progress_bar(percent_done, bar_length=50): + # Display progress bar + done_length = int(bar_length * percent_done / 100) + bar = "=" * done_length + "-" * (bar_length - done_length) + sys.stdout.write("[%s] %i%s\r" % (bar, percent_done, "%")) + sys.stdout.flush() + + +def main(opt): + svo_files = opt.input_svo_files + cameras = [] + cam_data = [] # List of dicts to store camera info + + print(f"Opening {len(svo_files)} SVO files...") + + for filepath in svo_files: + if not os.path.isfile(filepath): + print(f"File {filepath} does not exist. Skipping.") + continue + + init = sl.InitParameters() + init.set_from_svo_file(filepath) + init.svo_real_time_mode = False + init.depth_mode = ( + sl.DEPTH_MODE.NONE + ) # Use NONE for performance with multiple cameras + + cam = sl.Camera() + status = cam.open(init) + if status != sl.ERROR_CODE.SUCCESS: + print(f"Failed to open {filepath}: {status}") + continue + + cameras.append(cam) + + # Store metadata + info = cam.get_camera_information() + cam_data.append( + { + "cam": cam, + "serial": info.serial_number, + "res": info.camera_configuration.resolution, + "fps": info.camera_configuration.fps, + "nb_frames": cam.get_svo_number_of_frames(), + "start_ts": 0, # To be filled + } + ) + + if not cameras: + print("No cameras opened. Exiting.") + return + + # SVO Synchronization + # Since Python API might miss get_svo_position_at_timestamp, we approximate using FPS and start timestamps. + print("Syncing SVOs...") + max_start_ts = 0 + runtime = sl.RuntimeParameters() + + # Grab first frame to determine start timestamps + for data in cam_data: + cam = data["cam"] + err = cam.grab(runtime) + if err == sl.ERROR_CODE.SUCCESS: + ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds() + data["start_ts"] = ts + if ts > max_start_ts: + max_start_ts = ts + else: + print(f"Error grabbing first frame from {data['serial']}: {err}") + + # Align cameras + for data in cam_data: + cam = data["cam"] + ts = data["start_ts"] + if ts < max_start_ts: + # Calculate frames to skip + diff_ns = max_start_ts - ts + fps = data["fps"] + # frames = seconds * fps + frames_to_skip = int((diff_ns / 1_000_000_000) * fps) + print( + f"Camera {data['serial']} starts {diff_ns / 1e9:.2f}s earlier. Skipping {frames_to_skip} frames." + ) + cam.set_svo_position(frames_to_skip) + else: + # This is the latest camera, just reset to 0 (or stay at 1 since we grabbed one) + # To be perfectly synced with the skip, we should ideally be at 'frame 0' relative to the sync point. + # If we simply continue, we are at frame 1. + # Let's reset to 0 for the "base" cameras to ensure we don't miss the *very* first synced frame if possible, + # or just accept we are at frame 1. + # set_svo_position(0) is safer. + cam.set_svo_position(0) + + print("Starting playback...") + print(" Press 's' to save SVO image as a PNG") + print(" Press 'f' to jump forward in the video") + print(" Press 'b' to jump backward in the video") + print(" Press 'q' to exit...") + + key = "" + while key != 113: # for 'q' key + frames_displayed = 0 + + for data in cam_data: + cam = data["cam"] + err = cam.grab(runtime) + if err == sl.ERROR_CODE.SUCCESS: + # Resize for display + res = data["res"] + w = min(720, res.width) + h = min(404, res.height) + # Ensure even dimensions for video + low_resolution = sl.Resolution(w, h) + + svo_image = sl.Mat() + cam.retrieve_image(svo_image, sl.VIEW.LEFT, sl.MEM.CPU, low_resolution) + + img = svo_image.get_data() + cv2.imshow(f"View {data['serial']}", img) + frames_displayed += 1 + + # Update progress bar + if data == cam_data[0]: + pos = cam.get_svo_position() + total = data["nb_frames"] + if total > 0: + progress_bar(int(pos / total * 100), 30) + + elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: + print(f"\nSVO end reached for {data['serial']}. Looping.") + cam.set_svo_position(0) + else: + pass + + key = cv2.waitKey(10) + + if key == 115: # 's' + # Save snapshots + for data in cam_data: + cam = data["cam"] + mat = sl.Mat() + cam.retrieve_image(mat) + pos = cam.get_svo_position() + filename = f"capture_{data['serial']}_{pos}.png" + mat.write(filename) + print(f"Saved {filename}") + + if key == 102: # 'f' + for data in cam_data: + cam = data["cam"] + pos = cam.get_svo_position() + cam.set_svo_position(pos + int(data["fps"])) + + if key == 98: # 'b' + for data in cam_data: + cam = data["cam"] + pos = cam.get_svo_position() + cam.set_svo_position(max(0, pos - int(data["fps"]))) + + cv2.destroyAllWindows() + for cam in cameras: + cam.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--input_svo_files", + nargs="+", + type=str, + help="Path to .svo/.svo2 files", + required=True, + ) + opt = parser.parse_args() + + main(opt) diff --git a/py_workspace/zed_network_utils.py b/py_workspace/zed_network_utils.py new file mode 100644 index 0000000..2a41f98 --- /dev/null +++ b/py_workspace/zed_network_utils.py @@ -0,0 +1,88 @@ +import json +import os +import pyzed.sl as sl + +DEFAULT_CONFIG_PATH = "/workspaces/zed-playground/zed_settings/inside_network.json" + + +def parse_network_config(config_file=DEFAULT_CONFIG_PATH): + """ + Parses the network configuration JSON file and returns a dictionary of camera configurations. + + Args: + config_file (str): Path to the JSON configuration file. + + Returns: + dict: A dictionary where keys are serial numbers (str) and values are configuration dicts, + or None if the file doesn't exist or is invalid. + """ + if not os.path.exists(config_file): + print(f"Configuration file not found: {config_file}") + return None + + try: + with open(config_file, "r") as f: + network_config = json.load(f) + return network_config + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return None + + +def get_camera_config_by_serial(serial_number, config_file=DEFAULT_CONFIG_PATH): + """ + Retrieves configuration for a specific camera serial number. + """ + config = parse_network_config(config_file) + if config: + return config.get(str(serial_number)) + return None + + +def extract_ip_port(config_entry): + """ + Extracts IP and Port from a camera configuration entry. + + Args: + config_entry (dict): A single camera configuration dictionary. + + Returns: + tuple: (ip_address (str), port (int)) or (None, None) if parsing fails. + """ + try: + comm_params = config_entry["FusionConfiguration"]["communication_parameters"][ + "CommunicationParameters" + ] + ip = comm_params["ip_add"] + port = comm_params["ip_port"] + return ip, port + except (KeyError, TypeError) as e: + print(f"Error extracting IP/Port from config: {e}") + return None, None + + +def open_remote_camera(zed, ip, port): + """ + Opens a remote ZED camera using set_from_stream. + + Args: + zed (sl.Camera): The ZED Camera object. + ip (str): IP address. + port (int): Port number. + + Returns: + bool: True if opened successfully, False otherwise. + """ + init_params = sl.InitParameters() + init_params.depth_mode = sl.DEPTH_MODE.NONE + init_params.camera_resolution = sl.RESOLUTION.AUTO + + print(f"Connecting to {ip}:{port}...") + init_params.set_from_stream(ip, port) + + status = zed.open(init_params) + if status != sl.ERROR_CODE.SUCCESS: + print(f"Failed to open camera at {ip}:{port}. Error: {status}") + return False + + return True