feat: add aruco-svo-calibration plan and utils scripts

- Add comprehensive work plan for ArUco-based multi-camera calibration
- Add recording_multi.py for multi-camera SVO recording
- Add streaming_receiver.py for network streaming
- Add svo_playback.py for synchronized playback
- Add zed_network_utils.py for camera configuration
- Add AGENTS.md with project context
This commit is contained in:
2026-02-05 03:17:05 +00:00
parent d1e58245a6
commit 9c861105f7
17 changed files with 2071 additions and 0 deletions
+220
View File
@@ -0,0 +1,220 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
.venv
*.svo2
.ruff_cache
+8
View File
@@ -0,0 +1,8 @@
{
"active_plan": "/workspaces/zed-playground/py_workspace/.sisyphus/plans/aruco-svo-calibration.md",
"started_at": "2026-02-04T10:33:12.040Z",
"session_ids": [
"ses_3d7dbf4bdffeF4K2AhV4X3VFTa"
],
"plan_name": "aruco-svo-calibration"
}
@@ -0,0 +1,108 @@
# Draft: ArUco-Based Multi-Camera Extrinsic Calibration from SVO
## Requirements (confirmed)
### Goal
Create a CLI tool that reads synchronized SVO recordings from multiple ZED cameras, detects ArUco markers on a 3D calibration box, computes camera extrinsics relative to the marker world origin, and outputs accurate pose matrices to replace the inaccurate ones in `inside_network.json`.
### Calibration Target
- **Type**: 3D box with 6 diamond board faces
- **Object points**: Defined in `aruco/output/standard_box_markers.parquet`
- **Marker dictionary**: `DICT_4X4_50` (from existing code)
- **Minimum markers per frame**: 4+ (one diamond face worth)
### Input
- Multiple SVO2 files (one per camera)
- Frame sampling: Fixed interval + quality filter
- Timestamp-aligned playback (using existing `svo_playback.py` pattern)
### Output
- **New JSON file** with calibrated extrinsics
- Format: Similar to `inside_network.json` but with accurate `pose` field
- Reference frame: **Marker is world origin** (all cameras expressed relative to ArUco box)
### Workflow
- **CLI with preview**: Command-line driven but shows visualization of detected markers
- Example: `uv run calibrate_extrinsics.py --svos *.svo2 --interval 30 --output calibrated.json`
## Technical Decisions
### Intrinsics Source
- Use ZED SDK's pre-calibrated intrinsics from `cam.get_camera_information().camera_configuration.calibration_parameters.left_cam`
- Properties: `fx, fy, cx, cy, disto`
### Pose Estimation
- Use `cv2.solvePnP` with `SOLVEPNP_SQPNP` flag (from existing code)
- Consider `solvePnPRansac` for per-frame robustness
### Outlier Handling (Two-stage)
1. **Per-frame rejection**: Reject frames with high reprojection error (threshold ~2-5 pixels)
2. **RANSAC on pose set**: After collecting all valid poses, use RANSAC-style consensus
### Pose Averaging
- **Rotation**: Use `scipy.spatial.transform.Rotation.mean()` for geodesic mean
- **Translation**: Use median or weighted mean with MAD-based outlier rejection
### Math: Camera-to-World Transform
Each camera sees marker → `T_cam_marker` (camera-to-marker)
World origin = marker, so camera pose in world = `T_world_cam = inv(T_cam_marker)`
For camera i: `T_world_cam_i = inv(T_cam_i_marker)`
## Research Findings
### From Librarian (Multi-camera calibration)
- Relative transform: `T_BA = T_BM @ inv(T_AM)`
- Board-based detection improves robustness to occlusion
- Use `refineDetectedMarkers` for corner accuracy
- Handle missing views by only computing poses when enough markers visible
### From Librarian (Robust averaging)
- Use `scipy.spatial.transform.Rotation.mean(weights=...)` for rotation averaging
- Median/MAD on translation for outlier rejection
- RANSAC over pose set with rotation angle + translation distance thresholds
- Practical thresholds: rotation >2-5°, translation depends on scale
### Existing Codebase Patterns
- `find_extrinsic_object.py`: ArUco detection + solvePnP pattern
- `svo_playback.py`: Multi-SVO sync via timestamp alignment
- `aruco_box.py`: Diamond board geometry generation
## Open Questions
- None remaining
## Metis Gap Analysis (Addressed)
### Critical Gaps Resolved:
1. **World frame**: As defined in `standard_box_markers.parquet` (origin at box coordinate system)
2. **Image stream**: Use rectified LEFT view (no distortion coefficients needed)
3. **Transform convention**: Match `inside_network.json` format - appears to be T_world_from_cam (camera pose in world)
- Format: space-separated 4x4 matrix, row-major
4. **Sync tolerance**: Moderate (<33ms, 1 frame at 30fps)
### Guardrails Added:
- Validate parquet schema early (require marker_id, corners with X,Y,Z in meters)
- Use reprojection error as primary quality metric
- Require ≥4 markers with sufficient 3D spread (not just coplanar)
- Whitelist only expected marker IDs (from parquet)
- Add self-check mode with quantitative quality report
## Scope Boundaries
### INCLUDE
- SVO file loading with timestamp sync
- ArUco detection on left camera image
- Pose estimation using solvePnP
- Per-frame quality filtering (reprojection error)
- Multi-frame pose averaging with outlier rejection
- JSON output with 4x4 pose matrices
- Preview visualization showing detected markers and axes
- CLI interface with click
### EXCLUDE
- Right camera processing (use left only for simplicity)
- Intrinsic calibration (use pre-calibrated from ZED SDK)
- Modifying `inside_network.json` in-place
- GUI-based frame selection
- Bundle adjustment refinement
- Depth-based verification
@@ -0,0 +1,6 @@
## 2026-02-04 Init
- Use ZED rectified LEFT images (VIEW.LEFT). Distortion handled as zero (since images are rectified).
- Output `pose` matrices in the same convention as ZED Fusion `FusionConfiguration.pose`:
- Semantics: WORLD Pose of camera = T_world_from_cam.
- Storage: row-major 4x4, translation in last column.
- Coordinate system/units: defined by InitFusionParameters / InitParameters.
@@ -0,0 +1,22 @@
## 2026-02-04 Init
- Baseline note: ZED SDK stub file `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi` has many LSP/type-stub errors pre-existing in repo.
- Do not treat as regressions for this plan.
- ZED Fusion pose semantics confirmed by librarian:
- `/usr/local/zed/include/sl/Fusion.hpp` indicates `FusionConfiguration.pose` is "WORLD Pose of camera" in InitFusionParameters coordinate system/units.
- `/usr/local/zed/doc/API/html/classsl_1_1Matrix4f.html` indicates Matrix4f is row-major with translation in last column.
- Local LSP diagnostics not available: `basedpyright-langserver` is configured but not installed. Use `py_compile` + runtime smoke checks instead.
- Git commands currently fail due to missing git-lfs (smudge filter). Avoid git-based verification unless git-lfs is installed.
- `ak.from_parquet` requires `pyarrow` and `pandas` to be installed in the environment, which were missing initially.
## Task 6: IndexError in draw_detected_markers
- **Bug**: `draw_detected_markers` assumed `ids` was always 2D (`ids[i][0]`) and `corners` was always a list of 3D arrays.
- **Fix**: Flattened `ids` using `ids.flatten()` to support both (N,) and (N, 1) shapes. Reshaped `corners` and `int_corners` to ensure compatibility with `cv2.polylines` and text placement regardless of whether input is a list or a 3D/4D numpy array.
The calibration loop was hanging because SVOReader loops back to frame 0 upon reaching the end, and 'any(frames)' remained true. Fixed by calculating 'max_frames' based on remaining frames at start and bounding the loop.
- Fixed pose parsing in self-check: used np.fromstring instead of np.array on the space-separated string.
- Guarded max_frames calculation with a safety limit of 10,000 frames and handled cases where total_frames is -1 or 0.
- Improved --validate-markers mode to exit cleanly with a message when no SVOs are provided.
- Fixed pose string parsing in self-check distance block to use np.fromstring with sep=' '.
- Added max_frames guard for unknown total_frames (<= 0) to prevent infinite loops when SVO length cannot be determined.
@@ -0,0 +1,54 @@
## 2026-02-04 Init
- Repo is script-first, but `aruco/` imports work via Python namespace package even without `__init__.py`.
- No existing test suite. `pytest` is not installed; will need to be added (likely as dev dependency) before tests can run.
- Existing CLI patterns use `click` (e.g., `streaming_receiver.py`).
## Pose Math Utilities
- Created `aruco/pose_math.py` for common SE(3) operations.
- Standardized on 4x4 homogeneous matrices for composition and inversion.
- Inversion uses the efficient property: [R | t; 0 | 1]^-1 = [R^T | -R^T * t; 0 | 1].
- Reprojection error calculation uses `cv2.projectPoints` and mean Euclidean distance.
- ArUco marker geometry loading and validation logic implemented in `aruco/marker_geometry.py`.
- Use `awkward` and `pyarrow` for efficient parquet loading of multi-dimensional arrays (corners).
- Reshaping `ak.to_numpy` output is necessary when dealing with nested structures like corners (4x3).
## SVO Sync
- Added `aruco/svo_sync.py` with `SVOReader` and `FrameData` to open multiple SVO2 files, align starts by timestamp, and grab frames.
- Verified with real local SVO2 files: able to open, sync, and grab frames for 2 cameras.
## ArUco Detector
- Added `aruco/detector.py` implementing:
- ArUcoDetector creation (DICT_4X4_50)
- marker detection (BGR or grayscale input)
- ZED intrinsics -> K matrix extraction
- multi-marker solvePnP pose estimation + reprojection error
- Verified pose estimation with synthetic projected points and with a real SVO-opened camera for intrinsics.
- Implemented `PoseAccumulator` in `aruco/pose_averaging.py` for robust SE(3) pose averaging.
- Added RANSAC-based filtering for rotation and translation consensus.
- Implemented quaternion eigen-mean fallback for rotation averaging when `scipy` is unavailable.
- Used median for robust translation averaging.
## Task 6: ArUco Preview Helpers
- Implemented `aruco/preview.py` with `draw_detected_markers`, `draw_pose_axes`, and `show_preview`.
- Ensured grayscale images are converted to BGR before drawing to support colored overlays.
- Used `cv2.drawFrameAxes` for pose visualization.
- `show_preview` handles multiple windows based on camera serial numbers.
- Removed *.parquet from .gitignore to allow versioning of marker geometry data.
## Unit Testing
- Added pytest as a dev dependency.
- Implemented synthetic tests for pose math and averaging.
- Discovered that OpenCV's `projectPoints` is strict about `tvec` being floating-point; ensured tests use `dtype=np.float64`.
- Verified that `PoseAccumulator` correctly filters outliers using RANSAC and computes robust means.
## Pytest sys.path Pitfall
When running pytest via a console script (e.g., `uv run pytest`), the current working directory is not always automatically added to `sys.path`. This can lead to `ModuleNotFoundError` for local packages like `aruco`.
**Fix:** Create a `tests/conftest.py` file that explicitly inserts the project root into `sys.path`:
```python
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\")))
```
This ensures deterministic import behavior regardless of how pytest is invoked.
@@ -0,0 +1,2 @@
## 2026-02-04 Init
- (empty)
@@ -0,0 +1,3 @@
## Architectural Decisions
- Implemented SVOReader to encapsulate multi-camera management and synchronization logic.
- Used rectified sl.VIEW.LEFT for frame retrieval as requested.
@@ -0,0 +1,3 @@
## SVO Synchronization Patterns
- Use sl.Camera.set_svo_position(0) to loop SVOs when END_OF_SVOFILE_REACHED is encountered.
- Synchronization can be achieved by comparing timestamps and skipping frames based on FPS.
@@ -0,0 +1,745 @@
# ArUco-Based Multi-Camera Extrinsic Calibration from SVO
## TL;DR
> **Quick Summary**: Create a CLI tool that reads synchronized SVO recordings from multiple ZED cameras, detects ArUco markers on a 3D calibration box, computes camera extrinsics using robust pose averaging, and outputs accurate 4x4 transform matrices.
>
> **Deliverables**:
> - `calibrate_extrinsics.py` - Main CLI tool
> - `pose_averaging.py` - Robust pose estimation utilities
> - `svo_sync.py` - Multi-SVO timestamp synchronization
> - `tests/test_pose_math.py` - Unit tests for pose calculations
> - Output JSON with calibrated extrinsics
>
> **Estimated Effort**: Medium (3-5 days)
> **Parallel Execution**: YES - 2 waves
> **Critical Path**: Task 1 → Task 3 → Task 5 → Task 7 → Task 8
---
## Context
### Original Request
User wants to integrate ArUco marker detection with SVO recording playback to calibrate multi-camera extrinsics. The idea is to use timestamp-aligned SVO reading to extract frame batches at certain intervals, calculate camera extrinsics by averaging multiple pose estimates, and handle outliers.
### Interview Summary
**Key Discussions**:
- Calibration target: 3D box with 6 diamond board faces (24 markers), defined in `standard_box_markers.parquet`
- Current extrinsics in `inside_network.json` are **inaccurate** and need replacement
- Output: New JSON file with 4x4 pose matrices, marker box as world origin
- Workflow: CLI with preview visualization
**User Decisions**:
- Frame sampling: Fixed interval + quality filter
- Outlier handling: Two-stage (per-frame + RANSAC on pose set)
- Minimum markers: 4+ per frame
- Image stream: Rectified LEFT (no distortion needed)
- Sync tolerance: <33ms (1 frame at 30fps)
- Tests: Add after implementation
### Research Findings
- **Existing patterns**: `find_extrinsic_object.py` (ArUco + solvePnP), `svo_playback.py` (multi-SVO sync)
- **ZED SDK intrinsics**: `cam.get_camera_information().camera_configuration.calibration_parameters.left_cam`
- **Rotation averaging**: `scipy.spatial.transform.Rotation.mean()` for geodesic mean
- **Translation averaging**: Median with MAD-based outlier rejection
- **Transform math**: `T_world_cam = inv(T_cam_marker)` when marker is world origin
### Metis Review
**Identified Gaps** (addressed):
- World frame definition → Use coordinates from `standard_box_markers.parquet`
- Transform convention → Match `inside_network.json` format (T_world_from_cam, space-separated 4x4)
- Image stream → Rectified LEFT view (no distortion)
- Sync tolerance → Moderate (<33ms)
- Parquet validation → Must validate schema early
- Planar degeneracy → Require multi-face visibility or 3D spread check
---
## Work Objectives
### Core Objective
Build a robust CLI tool for multi-camera extrinsic calibration using ArUco markers detected in synchronized SVO playback.
### Concrete Deliverables
- `py_workspace/calibrate_extrinsics.py` - Main entry point
- `py_workspace/aruco/pose_averaging.py` - Robust averaging utilities
- `py_workspace/aruco/svo_sync.py` - Multi-SVO synchronization
- `py_workspace/tests/test_pose_math.py` - Unit tests
- Output: `calibrated_extrinsics.json` with per-camera 4x4 transforms
### Definition of Done
- [ ] `uv run calibrate_extrinsics.py --help` → exits 0, shows required args
- [ ] `uv run calibrate_extrinsics.py --validate-markers` → validates parquet schema
- [ ] `uv run calibrate_extrinsics.py --svos ... --output out.json` → produces valid JSON
- [ ] Output JSON contains 4 cameras with 4x4 matrices in correct format
- [ ] `uv run pytest tests/test_pose_math.py` → all tests pass
- [ ] Preview mode shows detected markers with axes overlay
### Must Have
- Load multiple SVO files with timestamp synchronization
- Detect ArUco markers using cv2.aruco with DICT_4X4_50
- Estimate per-frame poses using cv2.solvePnP
- Two-stage outlier rejection (reprojection error + pose RANSAC)
- Robust pose averaging (geodesic rotation mean + median translation)
- Output 4x4 transforms in `inside_network.json`-compatible format
- CLI with click for argument parsing
- Preview visualization with detected markers and axes
### Must NOT Have (Guardrails)
- NO intrinsic calibration (use ZED SDK pre-calibrated values)
- NO bundle adjustment or SLAM
- NO modification of `inside_network.json` in-place
- NO right camera processing (use left only)
- NO GUI beyond simple preview window
- NO depth-based verification
- NO automatic config file updates
---
## Verification Strategy
> **UNIVERSAL RULE: ZERO HUMAN INTERVENTION**
>
> ALL tasks must be verifiable by agent-executed commands. No "user visually confirms" criteria.
### Test Decision
- **Infrastructure exists**: NO (need to set up pytest)
- **Automated tests**: YES (tests-after)
- **Framework**: pytest
### Agent-Executed QA Scenarios (MANDATORY)
**Verification Tool by Deliverable Type:**
| Type | Tool | How Agent Verifies |
|------|------|-------------------|
| CLI | Bash | Run command, check exit code, parse output |
| JSON output | Bash (jq) | Parse JSON, validate structure and values |
| Preview | Playwright | Capture window screenshot (optional) |
| Unit tests | Bash (pytest) | Run tests, assert all pass |
---
## Execution Strategy
### Parallel Execution Waves
```
Wave 1 (Start Immediately):
├── Task 1: Core pose math utilities
├── Task 2: Parquet loader and validator
└── Task 4: SVO synchronization module
Wave 2 (After Wave 1):
├── Task 3: ArUco detection integration (depends: 1, 2)
├── Task 5: Robust pose aggregation (depends: 1)
└── Task 6: Preview visualization (depends: 3)
Wave 3 (After Wave 2):
├── Task 7: CLI integration (depends: 3, 4, 5, 6)
└── Task 8: Tests and validation (depends: all)
Critical Path: Task 1 → Task 3 → Task 7 → Task 8
```
### Dependency Matrix
| Task | Depends On | Blocks | Can Parallelize With |
|------|------------|--------|---------------------|
| 1 | None | 3, 5 | 2, 4 |
| 2 | None | 3 | 1, 4 |
| 3 | 1, 2 | 6, 7 | 5 |
| 4 | None | 7 | 1, 2 |
| 5 | 1 | 7 | 3, 6 |
| 6 | 3 | 7 | 5 |
| 7 | 3, 4, 5, 6 | 8 | None |
| 8 | 7 | None | None |
---
## TODOs
- [ ] 1. Create pose math utilities module
**What to do**:
- Create `py_workspace/aruco/pose_math.py`
- Implement `rvec_tvec_to_matrix(rvec, tvec) -> np.ndarray` (4x4 homogeneous)
- Implement `matrix_to_rvec_tvec(T) -> tuple[np.ndarray, np.ndarray]`
- Implement `invert_transform(T) -> np.ndarray`
- Implement `compose_transforms(T1, T2) -> np.ndarray`
- Implement `compute_reprojection_error(obj_pts, img_pts, rvec, tvec, K) -> float`
- Use numpy for all matrix operations
**Must NOT do**:
- Do NOT use scipy in this module (keep it pure numpy for core math)
- Do NOT implement averaging here (that's Task 5)
**Recommended Agent Profile**:
- **Category**: `quick`
- Reason: Pure math utilities, straightforward implementation
- **Skills**: []
- No special skills needed
**Parallelization**:
- **Can Run In Parallel**: YES
- **Parallel Group**: Wave 1 (with Tasks 2, 4)
- **Blocks**: Tasks 3, 5
- **Blocked By**: None
**References**:
- `py_workspace/aruco/find_extrinsic_object.py:123-145` - solvePnP usage and rvec/tvec handling
- OpenCV docs: `cv2.Rodrigues()` for rvec↔rotation matrix conversion
- OpenCV docs: `cv2.projectPoints()` for reprojection
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: rvec/tvec round-trip conversion
Tool: Bash (python)
Steps:
1. python -c "from aruco.pose_math import *; import numpy as np; rvec=np.array([0.1,0.2,0.3]); tvec=np.array([1,2,3]); T=rvec_tvec_to_matrix(rvec,tvec); r2,t2=matrix_to_rvec_tvec(T); assert np.allclose(rvec,r2,atol=1e-6) and np.allclose(tvec,t2,atol=1e-6); print('PASS')"
Expected Result: Prints "PASS"
Scenario: Transform inversion identity
Tool: Bash (python)
Steps:
1. python -c "from aruco.pose_math import *; import numpy as np; T=np.eye(4); T[:3,3]=[1,2,3]; T_inv=invert_transform(T); result=compose_transforms(T,T_inv); assert np.allclose(result,np.eye(4),atol=1e-9); print('PASS')"
Expected Result: Prints "PASS"
```
**Commit**: YES
- Message: `feat(aruco): add pose math utilities for transform operations`
- Files: `py_workspace/aruco/pose_math.py`
---
- [ ] 2. Create parquet loader and validator
**What to do**:
- Create `py_workspace/aruco/marker_geometry.py`
- Implement `load_marker_geometry(parquet_path) -> dict[int, np.ndarray]`
- Returns mapping: marker_id → corner coordinates (4, 3)
- Implement `validate_marker_geometry(geometry) -> bool`
- Check all expected marker IDs present
- Check coordinates are in meters (reasonable range)
- Check corner ordering is consistent
- Use awkward-array (already in project) for parquet reading
**Must NOT do**:
- Do NOT hardcode marker IDs (read from parquet)
- Do NOT assume specific number of markers (validate dynamically)
**Recommended Agent Profile**:
- **Category**: `quick`
- Reason: Simple data loading and validation
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: YES
- **Parallel Group**: Wave 1 (with Tasks 1, 4)
- **Blocks**: Task 3
- **Blocked By**: None
**References**:
- `py_workspace/aruco/find_extrinsic_object.py:55-66` - Parquet loading with awkward-array
- `py_workspace/aruco/output/standard_box_markers.parquet` - Actual data file
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: Load marker geometry from parquet
Tool: Bash (python)
Preconditions: standard_box_markers.parquet exists
Steps:
1. cd /workspaces/zed-playground/py_workspace
2. python -c "from aruco.marker_geometry import load_marker_geometry; g=load_marker_geometry('aruco/output/standard_box_markers.parquet'); print(f'Loaded {len(g)} markers'); assert len(g) >= 4; print('PASS')"
Expected Result: Prints marker count and "PASS"
Scenario: Validate geometry returns True for valid data
Tool: Bash (python)
Steps:
1. python -c "from aruco.marker_geometry import *; g=load_marker_geometry('aruco/output/standard_box_markers.parquet'); assert validate_marker_geometry(g); print('PASS')"
Expected Result: Prints "PASS"
```
**Commit**: YES
- Message: `feat(aruco): add marker geometry loader with validation`
- Files: `py_workspace/aruco/marker_geometry.py`
---
- [ ] 3. Integrate ArUco detection with ZED intrinsics
**What to do**:
- Create `py_workspace/aruco/detector.py`
- Implement `create_detector() -> cv2.aruco.ArucoDetector` using DICT_4X4_50
- Implement `detect_markers(image, detector) -> tuple[corners, ids]`
- Implement `get_zed_intrinsics(camera) -> tuple[np.ndarray, np.ndarray]`
- Extract K matrix (3x3) and distortion from ZED SDK
- For rectified images, distortion should be zeros
- Implement `estimate_pose(corners, ids, marker_geometry, K, dist) -> tuple[rvec, tvec, error]`
- Match detected markers to known 3D points
- Call solvePnP with SOLVEPNP_SQPNP
- Compute and return reprojection error
- Require minimum 4 markers for valid pose
**Must NOT do**:
- Do NOT use deprecated `estimatePoseSingleMarkers`
- Do NOT accept poses with <4 markers
**Recommended Agent Profile**:
- **Category**: `unspecified-low`
- Reason: Integration of existing patterns, moderate complexity
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: NO
- **Parallel Group**: Wave 2 (after Task 1, 2)
- **Blocks**: Tasks 6, 7
- **Blocked By**: Tasks 1, 2
**References**:
- `py_workspace/aruco/find_extrinsic_object.py:54-145` - Full ArUco detection and solvePnP pattern
- `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi:5110-5180` - CameraParameters with fx, fy, cx, cy, disto
- `py_workspace/svo_playback.py:46` - get_camera_information() usage
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: Detector creation succeeds
Tool: Bash (python)
Steps:
1. python -c "from aruco.detector import create_detector; d=create_detector(); print(type(d)); print('PASS')"
Expected Result: Prints detector type and "PASS"
Scenario: Pose estimation with synthetic data
Tool: Bash (python)
Steps:
1. python -c "
import numpy as np
from aruco.detector import estimate_pose
from aruco.marker_geometry import load_marker_geometry
# Create synthetic test with known geometry
geom = load_marker_geometry('aruco/output/standard_box_markers.parquet')
K = np.array([[700,0,960],[0,700,540],[0,0,1]], dtype=np.float64)
# Test passes if function runs without error
print('PASS')
"
Expected Result: Prints "PASS"
```
**Commit**: YES
- Message: `feat(aruco): add ArUco detector with ZED intrinsics integration`
- Files: `py_workspace/aruco/detector.py`
---
- [ ] 4. Create multi-SVO synchronization module
**What to do**:
- Create `py_workspace/aruco/svo_sync.py`
- Implement `SVOReader` class:
- `__init__(svo_paths: list[str])` - Open all SVOs
- `get_camera_info(idx) -> CameraInfo` - Serial, resolution, intrinsics
- `sync_to_latest_start()` - Align all cameras to latest start timestamp
- `grab_synced(tolerance_ms=33) -> dict[serial, Frame] | None` - Get synced frames
- `seek_to_frame(frame_num)` - Seek all cameras
- `close()` - Cleanup
- Frame should contain: image (numpy), timestamp_ns, serial_number
- Use pattern from `svo_playback.py` for sync logic
**Must NOT do**:
- Do NOT implement complex clock drift correction
- Do NOT handle streaming (SVO only)
**Recommended Agent Profile**:
- **Category**: `unspecified-low`
- Reason: Adapting existing pattern, moderate complexity
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: YES
- **Parallel Group**: Wave 1 (with Tasks 1, 2)
- **Blocks**: Task 7
- **Blocked By**: None
**References**:
- `py_workspace/svo_playback.py:18-102` - Complete multi-SVO sync pattern
- `py_workspace/libs/pyzed_pkg/pyzed/sl.pyi:10010-10097` - SVO position and frame methods
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: SVOReader opens multiple files
Tool: Bash (python)
Preconditions: SVO files exist in py_workspace
Steps:
1. python -c "
from aruco.svo_sync import SVOReader
import glob
svos = glob.glob('*.svo2')[:2]
if len(svos) >= 2:
reader = SVOReader(svos)
print(f'Opened {len(svos)} SVOs')
reader.close()
print('PASS')
else:
print('SKIP: Need 2+ SVOs')
"
Expected Result: Prints "PASS" or "SKIP"
Scenario: Sync aligns timestamps
Tool: Bash (python)
Steps:
1. Test sync_to_latest_start returns without error
Expected Result: No exception raised
```
**Commit**: YES
- Message: `feat(aruco): add multi-SVO synchronization reader`
- Files: `py_workspace/aruco/svo_sync.py`
---
- [ ] 5. Implement robust pose aggregation
**What to do**:
- Create `py_workspace/aruco/pose_averaging.py`
- Implement `PoseAccumulator` class:
- `add_pose(T: np.ndarray, reproj_error: float, frame_id: int)`
- `get_inlier_poses(max_reproj_error=2.0) -> list[np.ndarray]`
- `compute_robust_mean() -> tuple[np.ndarray, dict]`
- Use scipy.spatial.transform.Rotation.mean() for rotation
- Use median for translation
- Return stats dict: {n_total, n_inliers, median_error, std_rotation_deg}
- Implement `ransac_filter_poses(poses, rot_thresh_deg=5.0, trans_thresh_m=0.05) -> list[int]`
- Return indices of inlier poses
**Must NOT do**:
- Do NOT implement bundle adjustment
- Do NOT modify poses in-place
**Recommended Agent Profile**:
- **Category**: `unspecified-low`
- Reason: Math-focused but requires scipy understanding
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: YES
- **Parallel Group**: Wave 2 (with Task 3)
- **Blocks**: Task 7
- **Blocked By**: Task 1
**References**:
- Librarian findings on `scipy.spatial.transform.Rotation.mean()`
- Librarian findings on RANSAC-style pose filtering
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: Rotation averaging produces valid result
Tool: Bash (python)
Steps:
1. python -c "
from aruco.pose_averaging import PoseAccumulator
import numpy as np
acc = PoseAccumulator()
T = np.eye(4)
acc.add_pose(T, reproj_error=1.0, frame_id=0)
acc.add_pose(T, reproj_error=1.5, frame_id=1)
mean_T, stats = acc.compute_robust_mean()
assert mean_T.shape == (4,4)
assert stats['n_inliers'] == 2
print('PASS')
"
Expected Result: Prints "PASS"
Scenario: RANSAC rejects outliers
Tool: Bash (python)
Steps:
1. python -c "
from aruco.pose_averaging import ransac_filter_poses
import numpy as np
# Create 3 similar poses + 1 outlier
poses = [np.eye(4) for _ in range(3)]
outlier = np.eye(4); outlier[:3,3] = [10,10,10] # Far away
poses.append(outlier)
inliers = ransac_filter_poses(poses, trans_thresh_m=0.1)
assert len(inliers) == 3
assert 3 not in inliers
print('PASS')
"
Expected Result: Prints "PASS"
```
**Commit**: YES
- Message: `feat(aruco): add robust pose averaging with RANSAC filtering`
- Files: `py_workspace/aruco/pose_averaging.py`
---
- [ ] 6. Add preview visualization
**What to do**:
- Create `py_workspace/aruco/preview.py`
- Implement `draw_detected_markers(image, corners, ids) -> np.ndarray`
- Draw marker outlines and IDs
- Implement `draw_pose_axes(image, rvec, tvec, K, length=0.1) -> np.ndarray`
- Use cv2.drawFrameAxes
- Implement `show_preview(images: dict[str, np.ndarray], wait_ms=1) -> int`
- Show multiple camera views in separate windows
- Return key pressed
**Must NOT do**:
- Do NOT implement complex GUI
- Do NOT block indefinitely (use waitKey with timeout)
**Recommended Agent Profile**:
- **Category**: `quick`
- Reason: Simple OpenCV visualization
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: YES
- **Parallel Group**: Wave 2 (with Task 5)
- **Blocks**: Task 7
- **Blocked By**: Task 3
**References**:
- `py_workspace/aruco/find_extrinsic_object.py:138-145` - drawFrameAxes usage
- `py_workspace/aruco/find_extrinsic_object.py:84-105` - Marker visualization
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: Draw functions return valid images
Tool: Bash (python)
Steps:
1. python -c "
from aruco.preview import draw_detected_markers
import numpy as np
img = np.zeros((480,640,3), dtype=np.uint8)
corners = [np.array([[100,100],[200,100],[200,200],[100,200]], dtype=np.float32)]
ids = np.array([[1]])
result = draw_detected_markers(img, corners, ids)
assert result.shape == (480,640,3)
print('PASS')
"
Expected Result: Prints "PASS"
```
**Commit**: YES
- Message: `feat(aruco): add preview visualization utilities`
- Files: `py_workspace/aruco/preview.py`
---
- [ ] 7. Create main CLI tool
**What to do**:
- Create `py_workspace/calibrate_extrinsics.py`
- Use click for CLI:
- `--svo PATH` (multiple) - SVO file paths
- `--markers PATH` - Marker geometry parquet
- `--output PATH` - Output JSON path
- `--sample-interval INT` - Frame interval (default 30)
- `--max-reproj-error FLOAT` - Threshold (default 2.0)
- `--preview / --no-preview` - Show visualization
- `--validate-markers` - Only validate parquet and exit
- `--self-check` - Run and report quality metrics
- Main workflow:
1. Load marker geometry and validate
2. Open SVOs and sync
3. Sample frames at interval
4. For each synced frame set:
- Detect markers in each camera
- Estimate pose if ≥4 markers
- Accumulate poses per camera
5. Compute robust mean per camera
6. Output JSON in inside_network.json-compatible format
- Output JSON format:
```json
{
"serial": {
"pose": "r00 r01 r02 tx r10 r11 r12 ty ...",
"stats": { "n_frames": N, "median_reproj_error": X }
}
}
```
**Must NOT do**:
- Do NOT modify existing config files
- Do NOT implement auto-update of inside_network.json
**Recommended Agent Profile**:
- **Category**: `unspecified-high`
- Reason: Integration of all components, complex workflow
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: NO
- **Parallel Group**: Wave 3 (final integration)
- **Blocks**: Task 8
- **Blocked By**: Tasks 3, 4, 5, 6
**References**:
- `py_workspace/svo_playback.py` - CLI structure with argparse (adapt to click)
- `py_workspace/aruco/find_extrinsic_object.py` - Main loop pattern
- `zed_settings/inside_network.json:20` - Output pose format
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: CLI help works
Tool: Bash
Steps:
1. cd /workspaces/zed-playground/py_workspace
2. uv run calibrate_extrinsics.py --help
Expected Result: Exit code 0, shows --svo, --markers, --output options
Scenario: Validate markers only mode
Tool: Bash
Steps:
1. uv run calibrate_extrinsics.py --markers aruco/output/standard_box_markers.parquet --validate-markers
Expected Result: Exit code 0, prints marker count
Scenario: Full calibration produces JSON
Tool: Bash
Preconditions: SVO files exist
Steps:
1. uv run calibrate_extrinsics.py \
--svo ZED_SN46195029.svo2 \
--svo ZED_SN44435674.svo2 \
--markers aruco/output/standard_box_markers.parquet \
--output /tmp/test_extrinsics.json \
--no-preview \
--sample-interval 100
2. jq 'keys' /tmp/test_extrinsics.json
Expected Result: Exit code 0, JSON contains camera serials
Scenario: Self-check reports quality
Tool: Bash
Steps:
1. uv run calibrate_extrinsics.py ... --self-check
Expected Result: Prints per-camera stats including median reproj error
```
**Commit**: YES
- Message: `feat(aruco): add calibrate_extrinsics CLI tool`
- Files: `py_workspace/calibrate_extrinsics.py`
---
- [ ] 8. Add unit tests and final validation
**What to do**:
- Create `py_workspace/tests/test_pose_math.py`
- Test cases:
- `test_rvec_tvec_roundtrip` - Convert and back
- `test_transform_inversion` - T @ inv(T) = I
- `test_transform_composition` - Known compositions
- `test_reprojection_error_zero` - Perfect projection = 0 error
- Create `py_workspace/tests/test_pose_averaging.py`
- Test cases:
- `test_mean_of_identical_poses` - Returns same pose
- `test_outlier_rejection` - Outliers removed
- Add `scipy` to pyproject.toml if not present
- Run full test suite
**Must NOT do**:
- Do NOT require real SVO files for unit tests (use synthetic data)
**Recommended Agent Profile**:
- **Category**: `quick`
- Reason: Straightforward test implementation
- **Skills**: []
**Parallelization**:
- **Can Run In Parallel**: NO
- **Parallel Group**: Wave 3 (final)
- **Blocks**: None
- **Blocked By**: Task 7
**References**:
- Task 1 acceptance criteria for test patterns
- Task 5 acceptance criteria for averaging tests
**Acceptance Criteria**:
**Agent-Executed QA Scenarios:**
```
Scenario: All unit tests pass
Tool: Bash
Steps:
1. cd /workspaces/zed-playground/py_workspace
2. uv run pytest tests/ -v
Expected Result: Exit code 0, all tests pass
Scenario: Coverage check
Tool: Bash
Steps:
1. uv run pytest tests/ --tb=short
Expected Result: Shows test results summary
```
**Commit**: YES
- Message: `test(aruco): add unit tests for pose math and averaging`
- Files: `py_workspace/tests/test_pose_math.py`, `py_workspace/tests/test_pose_averaging.py`
---
## Commit Strategy
| After Task | Message | Files | Verification |
|------------|---------|-------|--------------|
| 1 | `feat(aruco): add pose math utilities` | pose_math.py | python import test |
| 2 | `feat(aruco): add marker geometry loader` | marker_geometry.py | python import test |
| 3 | `feat(aruco): add ArUco detector` | detector.py | python import test |
| 4 | `feat(aruco): add multi-SVO sync` | svo_sync.py | python import test |
| 5 | `feat(aruco): add pose averaging` | pose_averaging.py | python import test |
| 6 | `feat(aruco): add preview utils` | preview.py | python import test |
| 7 | `feat(aruco): add calibrate CLI` | calibrate_extrinsics.py | --help works |
| 8 | `test(aruco): add unit tests` | tests/*.py | pytest passes |
---
## Success Criteria
### Verification Commands
```bash
# CLI works
uv run calibrate_extrinsics.py --help # Expected: exit 0
# Marker validation
uv run calibrate_extrinsics.py --markers aruco/output/standard_box_markers.parquet --validate-markers # Expected: exit 0
# Tests pass
uv run pytest tests/ -v # Expected: all pass
# Full calibration (with real SVOs)
uv run calibrate_extrinsics.py --svo *.svo2 --markers aruco/output/standard_box_markers.parquet --output calibrated.json --no-preview
jq 'keys' calibrated.json # Expected: camera serials
```
### Final Checklist
- [ ] All "Must Have" present
- [ ] All "Must NOT Have" absent
- [ ] All tests pass
- [ ] CLI --help shows all options
- [ ] Output JSON matches inside_network.json pose format
- [ ] Preview shows detected markers with axes
+37
View File
@@ -0,0 +1,37 @@
# Python Agent Context
## Environment
- **Directory**: `/workspaces/zed-playground/py_workspace`
- **Package Manager**: `uv`
- **Python Version**: 3.12+ (Managed by `uv`)
- **Dependencies**: Defined in `pyproject.toml`
- `pyzed`: ZED SDK Python wrapper
- `opencv-python`: GUI and image processing
- `click`: CLI argument parsing
- `numpy`, `cupy-cuda12x`: Data manipulation
## Workflow & Commands
- **Run Scripts**: Always use `uv run` to ensure correct environment.
```bash
uv run streaming_receiver.py --help
uv run recording_multi.py
```
- **New Dependencies**: Add with `uv add <package>` (e.g., `uv add requests`).
## Architecture & Patterns
- **Network Camera Handling**:
- Use `zed_network_utils.py` for all network config parsing.
- Config file: `/workspaces/zed-playground/zed_settings/inside_network.json`
- **Threading Model**:
- **Main Thread**: MUST handle all OpenCV GUI (`cv2.imshow`, `cv2.waitKey`).
- **Worker Threads**: Handle `camera.grab()` and data retrieval.
- **Communication**: Use `queue.Queue` to pass frames from workers to main.
- **ZED API Patterns**:
- Streaming Input: `init_params.set_from_stream(ip, port)`
- Serial Number: Use `camera.get_camera_information().serial_number`.
## Documentation & References
- **Python API Docs**: `/usr/local/zed/doc/API/html/python/index.html`
- **ZED SDK General Docs**: `/usr/local/zed/doc/`
- **C++ Headers (Reference)**: `/usr/local/zed/include/sl/`
- Useful for understanding underlying enum values or behaviors not fully detailed in Python docstrings.
+135
View File
@@ -0,0 +1,135 @@
"""
This sample shows how to record video in Stereolabs SVO format from network cameras.
SVO video files can be played with the ZED API and used with its different modules.
"""
import pyzed.sl as sl
import threading
import signal
import time
import sys
import zed_network_utils
# Global variable to handle exit
exit_app = False
def signal_handler(signal, frame):
"""Handle Ctrl+C to properly exit the program"""
global exit_app
exit_app = True
print("\nCtrl+C pressed. Exiting...")
def acquisition(zed):
"""Acquisition thread function to continuously grab frames"""
infos = zed.get_camera_information()
while not exit_app:
if zed.grab() == sl.ERROR_CODE.SUCCESS:
# If needed, add more processing here
# But be aware that any processing involving the GiL will slow down the multi threading performance
pass
print(f"{infos.camera_model}[{infos.serial_number}] QUIT")
# disable Recording
zed.disable_recording()
# close the Camera
zed.close()
def open_camera(zed, config):
"""Open a camera with given configuration and enable streaming"""
ip, port = zed_network_utils.extract_ip_port(config)
if not ip or not port:
return False
try:
serial = config["FusionConfiguration"]["serial_number"]
except KeyError:
print("Error: Serial number not found in config")
return False
# Open the remote camera using utility function
if not zed_network_utils.open_remote_camera(zed, ip, port):
return False
print(f"ZED SN{serial} Opened from {ip}:{port}")
# Enable Recording
output_svo_file = f"ZED_SN{serial}.svo2"
recording_param = sl.RecordingParameters(
output_svo_file.replace(" ", ""), sl.SVO_COMPRESSION_MODE.H265
)
record_err = zed.enable_recording(recording_param)
if record_err == sl.ERROR_CODE.SUCCESS:
print(f"ZED SN{serial} Enabled recording")
else:
print(f"ZED SN{serial} Recording initialization error: {record_err}")
zed.close()
return False
print(f"Recording SVO file: {recording_param.video_filename}")
return True
def main():
global exit_app
# Read network configuration using utility
network_config = zed_network_utils.parse_network_config()
if not network_config:
return 1
print(f"Found {len(network_config)} cameras in configuration")
if len(network_config) == 0:
print("No ZED configured, exit program")
return 1
zed_open = False
# Open all cameras
zeds = []
threads = []
for serial, config in network_config.items():
zed = sl.Camera()
if open_camera(zed, config):
zeds.append(zed)
zed_open = True
# Start acquisition thread immediately
thread = threading.Thread(target=acquisition, args=(zed,))
thread.start()
threads.append(thread)
if not zed_open:
print("No ZED opened, exit program")
return 1
# Set up signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
print("Press Ctrl+C to exit")
# Main loop
while not exit_app:
time.sleep(0.02)
# Wait for all threads to finish
print("Exit signal, closing ZEDs")
time.sleep(0.1)
for thread in threads:
thread.join()
print("Program exited")
return 0
if __name__ == "__main__":
sys.exit(main())
+437
View File
@@ -0,0 +1,437 @@
########################################################################
#
# Copyright (c) 2022, STEREOLABS.
#
# All rights reserved.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
########################################################################
"""
Read a stream and display the left images using OpenCV
"""
import sys
import pyzed.sl as sl
import cv2
import socket
import threading
import queue
import zed_network_utils
import click
import time
# Global variables
exit_app = False
camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS
str_camera_settings = "BRIGHTNESS"
step_camera_settings = 1
led_on = True
# Map to store selection state for each camera window
# Key: window_name (str), Value: dict with selection state
window_selections = {}
class CameraHandler:
def __init__(self, ip, port, serial_number=None):
self.ip = ip
self.port = port
self.serial_number = serial_number
self.id = f"{serial_number}" if serial_number else f"{ip}:{port}"
self.cam = sl.Camera()
self.runtime = sl.RuntimeParameters()
self.mat = sl.Mat()
self.frame_queue = queue.Queue(maxsize=1) # Keep only latest frame
self.running = False
self.thread = None
self.win_name = f"Camera {self.id}"
self.status_msg = "Initializing..."
def start(self):
init_parameters = sl.InitParameters()
init_parameters.depth_mode = sl.DEPTH_MODE.NONE
init_parameters.sdk_verbose = 1
init_parameters.set_from_stream(self.ip, self.port)
status = self.cam.open(init_parameters)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Camera {self.id} Open : {repr(status)}. Skipping.")
self.status_msg = f"Error: {status}"
return False
print(f"\n--- Camera {self.id} Info ---")
print_camera_information(self.cam)
self.running = True
self.thread = threading.Thread(target=self._grab_loop)
self.thread.daemon = True
self.thread.start()
return True
def stop(self):
self.running = False
if self.thread and self.thread.is_alive():
self.thread.join(timeout=1.0)
self.cam.close()
def _grab_loop(self):
while self.running:
err = self.cam.grab(self.runtime)
if err == sl.ERROR_CODE.SUCCESS:
# Retrieve image into a local Mat, then convert to numpy for queue
# We need a lock or just copy it because Mat is not thread-safe if modified
self.cam.retrieve_image(self.mat, sl.VIEW.LEFT)
frame = self.mat.get_data()
# We can clone it to be safe for passing to main thread
# (get_data() returns a numpy view, so deep copy might be needed if buffer is reused)
# However, for simple display, usually it's fine if we consume fast enough.
# To be perfectly safe, let's copy.
frame_copy = frame.copy()
try:
# Remove old frame if exists to keep latest
if self.frame_queue.full():
self.frame_queue.get_nowait()
self.frame_queue.put_nowait(frame_copy)
except queue.Full:
pass
else:
sl.sleep_ms(10)
def get_selection_state(win_name):
if win_name not in window_selections:
window_selections[win_name] = {
"rect": sl.Rect(),
"in_progress": False,
"origin": (-1, -1),
}
return window_selections[win_name]
def on_mouse(event, x, y, flags, param):
win_name = param
state = get_selection_state(win_name)
if event == cv2.EVENT_LBUTTONDOWN:
state["origin"] = (x, y)
state["in_progress"] = True
elif event == cv2.EVENT_LBUTTONUP:
state["in_progress"] = False
elif event == cv2.EVENT_RBUTTONDOWN:
state["in_progress"] = False
state["rect"] = sl.Rect(0, 0, 0, 0)
if state["in_progress"]:
state["rect"].x = min(x, state["origin"][0])
state["rect"].y = min(y, state["origin"][1])
state["rect"].width = abs(x - state["origin"][0]) + 1
state["rect"].height = abs(y - state["origin"][1]) + 1
@click.command()
@click.option(
"--config",
"-c",
type=click.Path(exists=True),
help="Path to JSON configuration file.",
)
@click.option(
"--ip",
"-i",
multiple=True,
help="IP address(es) in format IP:PORT. Can be used multiple times.",
)
def main(config, ip):
"""
ZED Streaming Receiver.
"""
global exit_app
cameras_to_open = []
# 1. Parse Config File if provided
if config:
print(f"Reading configuration from {config}")
network_config = zed_network_utils.parse_network_config(config)
if network_config:
for serial, cam_config in network_config.items():
cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config)
if cam_ip and cam_port:
cameras_to_open.append((cam_ip, cam_port, serial))
# 2. Parse CLI IPs if provided
if ip:
for ip_str in ip:
if ":" in ip_str:
try:
host, port_str = ip_str.split(":")
port = int(port_str)
cameras_to_open.append((host, port, ip_str))
except ValueError:
print(f"Invalid format for IP: {ip_str}. Expected IP:PORT")
else:
print(
f"Treating '{ip_str}' as serial number, looking up in default config..."
)
cam_config = zed_network_utils.get_camera_config_by_serial(ip_str)
if cam_config:
cam_ip, cam_port = zed_network_utils.extract_ip_port(cam_config)
if cam_ip and cam_port:
cameras_to_open.append((cam_ip, cam_port, ip_str))
else:
print(f"Could not find IP/Port for serial {ip_str}")
else:
print(f"Invalid format or unknown serial: {ip_str}.")
if not cameras_to_open:
print("No valid cameras specified. Use --config or --ip.")
print_help()
return
print(f"Starting {len(cameras_to_open)} camera streams...")
print_help()
handlers = []
# Initialize all cameras
for cam_ip, cam_port, cam_id in cameras_to_open:
handler = CameraHandler(cam_ip, cam_port, cam_id)
if handler.start():
handlers.append(handler)
# Create window for this camera
cv2.namedWindow(handler.win_name)
cv2.setMouseCallback(handler.win_name, on_mouse, param=handler.win_name)
if not handlers:
print("No cameras could be opened. Exiting.")
return
try:
while not exit_app:
# Main GUI Loop
for handler in handlers:
try:
# Get frame from queue (non-blocking)
frame = handler.frame_queue.get_nowait()
# Draw selection if exists
state = get_selection_state(handler.win_name)
rect = state["rect"]
if not rect.is_empty() and rect.is_contained(
sl.Rect(0, 0, frame.shape[1], frame.shape[0])
):
cv2.rectangle(
frame,
(rect.x, rect.y),
(rect.width + rect.x, rect.height + rect.y),
(220, 180, 20),
2,
)
cv2.imshow(handler.win_name, frame)
except queue.Empty:
pass # No new frame, just continue
# Handle key events (once per loop iteration, for all windows)
key = cv2.waitKey(10)
if key != -1:
if key == 113: # q
exit_app = True
else:
# Apply settings to ALL cameras
# Note: We pick the first camera to get current value, then set to all
# This implies settings are synchronized
if handlers:
update_camera_settings(
key, handlers[0].cam, handlers, handlers[0].runtime
)
except KeyboardInterrupt:
print("\nCtrl+C pressed. Exiting...")
finally:
print("Closing cameras...")
for handler in handlers:
handler.stop()
cv2.destroyAllWindows()
print("Program exited")
def print_camera_information(cam):
cam_info = cam.get_camera_information()
print("ZED Model : {0}".format(cam_info.camera_model))
print("ZED Serial Number : {0}".format(cam_info.serial_number))
print(
"ZED Camera Firmware : {0}/{1}".format(
cam_info.camera_configuration.firmware_version,
cam_info.sensors_configuration.firmware_version,
)
)
print(
"ZED Camera Resolution : {0}x{1}".format(
round(cam_info.camera_configuration.resolution.width, 2),
cam.get_camera_information().camera_configuration.resolution.height,
)
)
print(
"ZED Camera FPS : {0}".format(int(cam_info.camera_configuration.fps))
)
def print_help():
print("\n\nCamera controls hotkeys:")
print("* Increase camera settings value: '+'")
print("* Decrease camera settings value: '-'")
print("* Toggle camera settings: 's'")
print("* Toggle camera LED: 'l' (lower L)")
print("* Reset all parameters: 'r'")
print("* Reset exposure ROI to full image 'f'")
print("* Use mouse to select an image area to apply exposure (press 'a')")
print("* Exit : 'q'\n")
# Update camera setting on key press
def update_camera_settings(key, reference_cam, handlers, runtime):
global led_on, camera_settings, str_camera_settings
# This logic updates ALL cameras based on the input key
if key == 115: # for 's' key
switch_camera_settings()
return
if key == 108: # for 'l' key
led_on = not led_on
print(f"LED Status: {led_on}")
elif key == 114: # 'r'
print("[Sample] Reset all settings to default")
# Determine action and value
action = None # 'inc', 'dec', 'set'
val = None
if key == 43:
action = "inc"
elif key == 45:
action = "dec"
elif key == 114:
action = "reset"
elif key == 108:
action = "led"
elif key == 97 or key == 102:
action = "roi"
# Apply to all cameras
for handler in handlers:
cam = handler.cam
if action == "inc":
current_value = cam.get_camera_settings(camera_settings)[1]
cam.set_camera_settings(
camera_settings, current_value + step_camera_settings
)
if handler == handlers[0]: # Print only once
print(
str_camera_settings
+ ": "
+ str(current_value + step_camera_settings)
)
elif action == "dec":
current_value = cam.get_camera_settings(camera_settings)[1]
if current_value >= 1:
cam.set_camera_settings(
camera_settings, current_value - step_camera_settings
)
if handler == handlers[0]:
print(
str_camera_settings
+ ": "
+ str(current_value - step_camera_settings)
)
elif action == "reset":
cam.set_camera_settings(sl.VIDEO_SETTINGS.BRIGHTNESS, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.CONTRAST, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.HUE, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.SATURATION, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.SHARPNESS, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.GAIN, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.EXPOSURE, -1)
cam.set_camera_settings(sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE, -1)
elif action == "led":
cam.set_camera_settings(sl.VIDEO_SETTINGS.LED_STATUS, led_on)
elif action == "roi":
state = get_selection_state(handler.win_name)
rect = state["rect"]
reset = key == 102
if reset:
cam.set_camera_settings_roi(
sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH, True
)
if handler == handlers[0]:
print("[Sample] reset AEC_AGC_ROI to full res")
else:
print(f"[Sample] set AEC_AGC_ROI on {handler.win_name}")
cam.set_camera_settings_roi(
sl.VIDEO_SETTINGS.AEC_AGC_ROI, rect, sl.SIDE.BOTH
)
# Function to switch between different camera settings (brightness, contrast, etc.).
def switch_camera_settings():
global camera_settings
global str_camera_settings
if camera_settings == sl.VIDEO_SETTINGS.BRIGHTNESS:
camera_settings = sl.VIDEO_SETTINGS.CONTRAST
str_camera_settings = "Contrast"
print("[Sample] Switch to camera settings: CONTRAST")
elif camera_settings == sl.VIDEO_SETTINGS.CONTRAST:
camera_settings = sl.VIDEO_SETTINGS.HUE
str_camera_settings = "Hue"
print("[Sample] Switch to camera settings: HUE")
elif camera_settings == sl.VIDEO_SETTINGS.HUE:
camera_settings = sl.VIDEO_SETTINGS.SATURATION
str_camera_settings = "Saturation"
print("[Sample] Switch to camera settings: SATURATION")
elif camera_settings == sl.VIDEO_SETTINGS.SATURATION:
camera_settings = sl.VIDEO_SETTINGS.SHARPNESS
str_camera_settings = "Sharpness"
print("[Sample] Switch to camera settings: Sharpness")
elif camera_settings == sl.VIDEO_SETTINGS.SHARPNESS:
camera_settings = sl.VIDEO_SETTINGS.GAIN
str_camera_settings = "Gain"
print("[Sample] Switch to camera settings: GAIN")
elif camera_settings == sl.VIDEO_SETTINGS.GAIN:
camera_settings = sl.VIDEO_SETTINGS.EXPOSURE
str_camera_settings = "Exposure"
print("[Sample] Switch to camera settings: EXPOSURE")
elif camera_settings == sl.VIDEO_SETTINGS.EXPOSURE:
camera_settings = sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE
str_camera_settings = "White Balance"
print("[Sample] Switch to camera settings: WHITEBALANCE")
elif camera_settings == sl.VIDEO_SETTINGS.WHITEBALANCE_TEMPERATURE:
camera_settings = sl.VIDEO_SETTINGS.BRIGHTNESS
str_camera_settings = "Brightness"
print("[Sample] Switch to camera settings: BRIGHTNESS")
if __name__ == "__main__":
main()
+185
View File
@@ -0,0 +1,185 @@
# ... existing code ...
import sys
import pyzed.sl as sl
import cv2
import argparse
import os
import math
def progress_bar(percent_done, bar_length=50):
# Display progress bar
done_length = int(bar_length * percent_done / 100)
bar = "=" * done_length + "-" * (bar_length - done_length)
sys.stdout.write("[%s] %i%s\r" % (bar, percent_done, "%"))
sys.stdout.flush()
def main(opt):
svo_files = opt.input_svo_files
cameras = []
cam_data = [] # List of dicts to store camera info
print(f"Opening {len(svo_files)} SVO files...")
for filepath in svo_files:
if not os.path.isfile(filepath):
print(f"File {filepath} does not exist. Skipping.")
continue
init = sl.InitParameters()
init.set_from_svo_file(filepath)
init.svo_real_time_mode = False
init.depth_mode = (
sl.DEPTH_MODE.NONE
) # Use NONE for performance with multiple cameras
cam = sl.Camera()
status = cam.open(init)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Failed to open {filepath}: {status}")
continue
cameras.append(cam)
# Store metadata
info = cam.get_camera_information()
cam_data.append(
{
"cam": cam,
"serial": info.serial_number,
"res": info.camera_configuration.resolution,
"fps": info.camera_configuration.fps,
"nb_frames": cam.get_svo_number_of_frames(),
"start_ts": 0, # To be filled
}
)
if not cameras:
print("No cameras opened. Exiting.")
return
# SVO Synchronization
# Since Python API might miss get_svo_position_at_timestamp, we approximate using FPS and start timestamps.
print("Syncing SVOs...")
max_start_ts = 0
runtime = sl.RuntimeParameters()
# Grab first frame to determine start timestamps
for data in cam_data:
cam = data["cam"]
err = cam.grab(runtime)
if err == sl.ERROR_CODE.SUCCESS:
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
data["start_ts"] = ts
if ts > max_start_ts:
max_start_ts = ts
else:
print(f"Error grabbing first frame from {data['serial']}: {err}")
# Align cameras
for data in cam_data:
cam = data["cam"]
ts = data["start_ts"]
if ts < max_start_ts:
# Calculate frames to skip
diff_ns = max_start_ts - ts
fps = data["fps"]
# frames = seconds * fps
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
print(
f"Camera {data['serial']} starts {diff_ns / 1e9:.2f}s earlier. Skipping {frames_to_skip} frames."
)
cam.set_svo_position(frames_to_skip)
else:
# This is the latest camera, just reset to 0 (or stay at 1 since we grabbed one)
# To be perfectly synced with the skip, we should ideally be at 'frame 0' relative to the sync point.
# If we simply continue, we are at frame 1.
# Let's reset to 0 for the "base" cameras to ensure we don't miss the *very* first synced frame if possible,
# or just accept we are at frame 1.
# set_svo_position(0) is safer.
cam.set_svo_position(0)
print("Starting playback...")
print(" Press 's' to save SVO image as a PNG")
print(" Press 'f' to jump forward in the video")
print(" Press 'b' to jump backward in the video")
print(" Press 'q' to exit...")
key = ""
while key != 113: # for 'q' key
frames_displayed = 0
for data in cam_data:
cam = data["cam"]
err = cam.grab(runtime)
if err == sl.ERROR_CODE.SUCCESS:
# Resize for display
res = data["res"]
w = min(720, res.width)
h = min(404, res.height)
# Ensure even dimensions for video
low_resolution = sl.Resolution(w, h)
svo_image = sl.Mat()
cam.retrieve_image(svo_image, sl.VIEW.LEFT, sl.MEM.CPU, low_resolution)
img = svo_image.get_data()
cv2.imshow(f"View {data['serial']}", img)
frames_displayed += 1
# Update progress bar
if data == cam_data[0]:
pos = cam.get_svo_position()
total = data["nb_frames"]
if total > 0:
progress_bar(int(pos / total * 100), 30)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
print(f"\nSVO end reached for {data['serial']}. Looping.")
cam.set_svo_position(0)
else:
pass
key = cv2.waitKey(10)
if key == 115: # 's'
# Save snapshots
for data in cam_data:
cam = data["cam"]
mat = sl.Mat()
cam.retrieve_image(mat)
pos = cam.get_svo_position()
filename = f"capture_{data['serial']}_{pos}.png"
mat.write(filename)
print(f"Saved {filename}")
if key == 102: # 'f'
for data in cam_data:
cam = data["cam"]
pos = cam.get_svo_position()
cam.set_svo_position(pos + int(data["fps"]))
if key == 98: # 'b'
for data in cam_data:
cam = data["cam"]
pos = cam.get_svo_position()
cam.set_svo_position(max(0, pos - int(data["fps"])))
cv2.destroyAllWindows()
for cam in cameras:
cam.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_svo_files",
nargs="+",
type=str,
help="Path to .svo/.svo2 files",
required=True,
)
opt = parser.parse_args()
main(opt)
+88
View File
@@ -0,0 +1,88 @@
import json
import os
import pyzed.sl as sl
DEFAULT_CONFIG_PATH = "/workspaces/zed-playground/zed_settings/inside_network.json"
def parse_network_config(config_file=DEFAULT_CONFIG_PATH):
"""
Parses the network configuration JSON file and returns a dictionary of camera configurations.
Args:
config_file (str): Path to the JSON configuration file.
Returns:
dict: A dictionary where keys are serial numbers (str) and values are configuration dicts,
or None if the file doesn't exist or is invalid.
"""
if not os.path.exists(config_file):
print(f"Configuration file not found: {config_file}")
return None
try:
with open(config_file, "r") as f:
network_config = json.load(f)
return network_config
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return None
def get_camera_config_by_serial(serial_number, config_file=DEFAULT_CONFIG_PATH):
"""
Retrieves configuration for a specific camera serial number.
"""
config = parse_network_config(config_file)
if config:
return config.get(str(serial_number))
return None
def extract_ip_port(config_entry):
"""
Extracts IP and Port from a camera configuration entry.
Args:
config_entry (dict): A single camera configuration dictionary.
Returns:
tuple: (ip_address (str), port (int)) or (None, None) if parsing fails.
"""
try:
comm_params = config_entry["FusionConfiguration"]["communication_parameters"][
"CommunicationParameters"
]
ip = comm_params["ip_add"]
port = comm_params["ip_port"]
return ip, port
except (KeyError, TypeError) as e:
print(f"Error extracting IP/Port from config: {e}")
return None, None
def open_remote_camera(zed, ip, port):
"""
Opens a remote ZED camera using set_from_stream.
Args:
zed (sl.Camera): The ZED Camera object.
ip (str): IP address.
port (int): Port number.
Returns:
bool: True if opened successfully, False otherwise.
"""
init_params = sl.InitParameters()
init_params.depth_mode = sl.DEPTH_MODE.NONE
init_params.camera_resolution = sl.RESOLUTION.AUTO
print(f"Connecting to {ip}:{port}...")
init_params.set_from_stream(ip, port)
status = zed.open(init_params)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Failed to open camera at {ip}:{port}. Error: {status}")
return False
return True