python workspace

This commit is contained in:
2026-02-04 15:42:08 +08:00
parent e7c5176229
commit a0ad369211
37 changed files with 47631 additions and 0 deletions

View File

@ -0,0 +1 @@
3.12

46
py_workspace/README.md Executable file
View File

@ -0,0 +1,46 @@
# ZED SDK - Depth Sensing
This sample shows how to retreive the current point cloud.
## Getting Started
- Get the latest [ZED SDK](https://www.stereolabs.com/developers/release/) and [pyZED Package](https://www.stereolabs.com/docs/app-development/python/install/)
- Check the [Documentation](https://www.stereolabs.com/docs/)
## Setting up (Optional)
For improved data retrieval and handling on GPU:
- Install CuPy using pip
```sh
pip install cupy-cuda11x # For CUDA 11.x
pip install cupy-cuda12x # For CUDA 12.x
```
- Install cuda bindings using pip
```sh
pip install cuda-python
```
## Run the program
To run the program, use the following command in your terminal :
```bash
python depth_sensing.py
```
If you wish to run the program from an input_svo_file, or an IP adress, or specify a resolution run :
```bash
python depth_sensing.py --input_svo_file <input_svo_file> --ip_address <ip_address> --resolution <resolution>
```
Arguments:
- `--input_svo_file` A path to an existing .svo file, that will be playbacked. If this parameter and ip_adress are not specified, the soft will use the camera wired as default.
- `--ip_address` IP Address, in format a.b.c.d:port or a.b.c.d. If specified, the soft will try to connect to the IP.
- `--resolution` Resolution, can be either HD2K, HD1200, HD1080, HD720, SVGA or VGA.
- `--disable_gpu` Disable GPU acceleration even if CuPy is available.
### Features
- Camera live point cloud is retreived
- An OpenGL windows displays it in 3D
## Support
If you need assistance go to our Community site at https://community.stereolabs.com/

View File

@ -0,0 +1,83 @@
# Integrating Local Binary Extensions with `uv`
This guide explains how we packaged the local `pyzed` binary extension (originally from a system installation) so that `uv` can manage it as a project dependency.
## The Problem
The ZED SDK installs its Python wrapper (`pyzed`) as a system-level package (often in `/usr/local/lib/...`). It consists of compiled extensions (`.so` or `.sl` files) and Python bindings.
`uv` strictly manages virtual environments and dependencies. It cannot directly "see" or import packages from the global system site-packages unless explicitly configured to use system site-packages (which reduces isolation). Furthermore, a raw `.so` file or a bare directory without metadata isn't a valid package source for `uv`.
## The Solution: A Local Package Wrapper
To make `pyzed` compatible with `uv`, we wrapped the raw library files into a proper, minimally compliant Python package located at `libs/pyzed_pkg`.
### 1. Directory Structure
We organized the files into a standard package layout:
```text
libs/pyzed_pkg/
├── pyproject.toml # Package metadata (CRITICAL)
└── pyzed/ # The actual importable package
├── __init__.py
├── sl.cpython-312-x86_64-linux-gnu.so # The compiled extension
└── ...
```
### 2. The Local `pyproject.toml`
We created a `pyproject.toml` inside `libs/pyzed_pkg` to tell build tools how to handle the files.
Key configuration points:
1. **Build System**: Uses `setuptools` to bundle the files.
2. **Package Discovery**: Explicitly lists `pyzed`.
3. **Package Data**: **Crucially**, configures `setuptools` to include binary files (`*.so`, `*.pyi`) which are usually ignored by default.
```toml
[project]
name = "pyzed"
version = "0.1.0"
description = "Wrapper for ZED SDK"
requires-python = ">=3.12"
dependencies = []
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["pyzed"]
# IMPORTANT: Ensure the binary extension is included in the build
[tool.setuptools.package-data]
pyzed = ["*.so", "*.pyi"]
```
## Configuring the Main Project
In the root `pyproject.toml` of your application, we used **`[tool.uv.sources]`** to redirect the `pyzed` dependency to our local path.
```toml
[project]
dependencies = [
"pyzed", # Declared as a normal dependency
"cupy-cuda12x",
"numpy",
]
# uv-specific configuration
[tool.uv.sources]
pyzed = { path = "libs/pyzed_pkg" }
```
## How `uv` Processes This
1. **Resolution**: When you run `uv sync` or `uv run`, `uv` sees `pyzed` in the dependencies.
2. **Source Lookup**: It checks `tool.uv.sources` and finds the local path `libs/pyzed_pkg`.
3. **Build/Install**:
* `uv` treats the directory as a source distribution.
* It uses the `build-system` defined in `libs/pyzed_pkg/pyproject.toml` to build a temporary wheel.
* This wheel (containing the `.so` file) is installed into the project's virtual environment (`.venv`).
This ensures that your application works seamlessly with `import pyzed.sl` while maintaining a clean, isolated, and reproducible environment managed by `uv`.

View File

@ -0,0 +1,131 @@
########################################################################
#
# Copyright (c) 2022, STEREOLABS.
#
# All rights reserved.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
########################################################################
"""
This sample demonstrates how to capture a live 3D point cloud
with the ZED SDK and display the result in an OpenGL window.
"""
import sys
import ogl_viewer.viewer as gl
import pyzed.sl as sl
import argparse
def parse_args(init, opt):
if len(opt.input_svo_file)>0 and opt.input_svo_file.endswith((".svo", ".svo2")):
init.set_from_svo_file(opt.input_svo_file)
print("[Sample] Using SVO File input: {0}".format(opt.input_svo_file))
elif len(opt.ip_address)>0 :
ip_str = opt.ip_address
if ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4 and len(ip_str.split(':'))==2:
init.set_from_stream(ip_str.split(':')[0],int(ip_str.split(':')[1]))
print("[Sample] Using Stream input, IP : ",ip_str)
elif ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4:
init.set_from_stream(ip_str)
print("[Sample] Using Stream input, IP : ",ip_str)
else :
print("Unvalid IP format. Using live stream")
if ("HD2K" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.HD2K
print("[Sample] Using Camera in resolution HD2K")
elif ("HD1200" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.HD1200
print("[Sample] Using Camera in resolution HD1200")
elif ("HD1080" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.HD1080
print("[Sample] Using Camera in resolution HD1080")
elif ("HD720" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.HD720
print("[Sample] Using Camera in resolution HD720")
elif ("SVGA" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.SVGA
print("[Sample] Using Camera in resolution SVGA")
elif ("VGA" in opt.resolution):
init.camera_resolution = sl.RESOLUTION.VGA
print("[Sample] Using Camera in resolution VGA")
elif len(opt.resolution)>0:
print("[Sample] No valid resolution entered. Using default")
else :
print("[Sample] Using default resolution")
def main(opt):
print("Running Depth Sensing sample ... Press 'Esc' to quit\nPress 's' to save the point cloud")
# Determine memory type based on CuPy availability and user preference
use_gpu = gl.GPU_ACCELERATION_AVAILABLE and not opt.disable_gpu_data_transfer
mem_type = sl.MEM.GPU if use_gpu else sl.MEM.CPU
if use_gpu:
print("🚀 Using GPU data transfer with CuPy")
init = sl.InitParameters(depth_mode=sl.DEPTH_MODE.NEURAL,
coordinate_units=sl.UNIT.METER,
coordinate_system=sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP)
parse_args(init, opt)
zed = sl.Camera()
status = zed.open(init)
if status > sl.ERROR_CODE.SUCCESS:
print(repr(status))
exit()
res = sl.Resolution()
res.width = -1
res.height = -1
# Get the first PC to retrieve the resolution
point_cloud = sl.Mat()
zed.retrieve_measure(point_cloud, sl.MEASURE.XYZRGBA, mem_type, res)
res = point_cloud.get_resolution()
# Create OpenGL viewer
viewer = gl.GLViewer()
viewer.init(1, sys.argv, res)
while viewer.is_available():
if zed.grab() <= sl.ERROR_CODE.SUCCESS:
# Retrieve point cloud data using the optimal memory type (GPU if CuPy available)
zed.retrieve_measure(point_cloud, sl.MEASURE.XYZRGBA, mem_type, res)
viewer.updateData(point_cloud)
if viewer.save_data:
# For saving, we take CPU memory regardless of processing type
point_cloud_to_save = sl.Mat()
zed.retrieve_measure(point_cloud_to_save, sl.MEASURE.XYZRGBA, sl.MEM.CPU)
err = point_cloud_to_save.write('Pointcloud.ply')
if(err == sl.ERROR_CODE.SUCCESS):
print("Current .ply file saving succeed")
else:
print("Current .ply file failed")
viewer.save_data = False
viewer.exit()
zed.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--input_svo_file', type=str, help='Path to an .svo file, if you want to replay it',default = '')
parser.add_argument('--ip_address', type=str, help='IP Adress, in format a.b.c.d:port or a.b.c.d, if you have a streaming setup', default = '')
parser.add_argument('--resolution', type=str, help='Resolution, can be either HD2K, HD1200, HD1080, HD720, SVGA or VGA', default = '')
parser.add_argument('--disable-gpu-data-transfer', action='store_true', help='Disable GPU data transfer acceleration with CuPy even if CuPy is available')
opt = parser.parse_args()
if len(opt.input_svo_file)>0 and len(opt.ip_address)>0:
print("Specify only input_svo_file or ip_address, or none to use wired camera, not both. Exit program")
exit()
main(opt)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,17 @@
[project]
name = "pyzed"
version = "0.1.0"
description = "Wrapper for ZED SDK"
requires-python = ">=3.12"
dependencies = []
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["pyzed"]
[tool.setuptools.package-data]
pyzed = ["*.so", "*.pyi"]

View File

@ -0,0 +1,5 @@
Metadata-Version: 2.4
Name: pyzed
Version: 0.1.0
Summary: Wrapper for ZED SDK
Requires-Python: >=3.12

View File

@ -0,0 +1,8 @@
pyproject.toml
pyzed/__init__.py
pyzed/sl.cpython-312-x86_64-linux-gnu.so
pyzed/sl.pyi
pyzed.egg-info/PKG-INFO
pyzed.egg-info/SOURCES.txt
pyzed.egg-info/dependency_links.txt
pyzed.egg-info/top_level.txt

View File

@ -0,0 +1 @@
pyzed

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,5 @@
Metadata-Version: 2.4
Name: pyzed
Version: 0.1.0
Summary: Wrapper for ZED SDK
Requires-Python: >=3.12

View File

@ -0,0 +1,9 @@
__init__.py
pyproject.toml
sl.pyi
./__init__.py
./sl.pyi
pyzed.egg-info/PKG-INFO
pyzed.egg-info/SOURCES.txt
pyzed.egg-info/dependency_links.txt
pyzed.egg-info/top_level.txt

View File

@ -0,0 +1 @@
pyzed

File diff suppressed because it is too large Load Diff

785
py_workspace/ogl_viewer/viewer.py Executable file
View File

@ -0,0 +1,785 @@
from OpenGL.GL import *
from OpenGL.GLUT import *
from OpenGL.GLU import *
import ctypes
import sys
import math
from threading import Lock
import numpy as np
import array
import pyzed.sl as sl
M_PI = 3.1415926
VERTEX_SHADER = """
# version 330 core
layout(location = 0) in vec3 in_Vertex;
layout(location = 1) in vec4 in_Color;
uniform mat4 u_mvpMatrix;
out vec4 b_color;
void main() {
b_color = in_Color;
gl_Position = u_mvpMatrix * vec4(in_Vertex, 1);
}
"""
FRAGMENT_SHADER = """
# version 330 core
in vec4 b_color;
layout(location = 0) out vec4 out_Color;
void main() {
out_Color = b_color;
}
"""
POINTCLOUD_VERTEX_SHADER ="""
#version 330 core
layout(location = 0) in vec4 in_VertexRGBA;
uniform mat4 u_mvpMatrix;
out vec4 b_color;
void main() {
uint vertexColor = floatBitsToUint(in_VertexRGBA.w);
vec3 clr_int = vec3((vertexColor & uint(0x000000FF)), (vertexColor & uint(0x0000FF00)) >> 8, (vertexColor & uint(0x00FF0000)) >> 16);
b_color = vec4(clr_int.r / 255.0f, clr_int.g / 255.0f, clr_int.b / 255.0f, 1.f);
gl_Position = u_mvpMatrix * vec4(in_VertexRGBA.xyz, 1);
}
"""
POINTCLOUD_FRAGMENT_SHADER = """
#version 330 core
in vec4 b_color;
layout(location = 0) out vec4 out_Color;
void main() {
out_Color = b_color;
}
"""
try:
from cuda.bindings import runtime as cudart
import cupy as cp
GPU_ACCELERATION_AVAILABLE = True
def format_cudart_err(err):
return (
f"{cudart.cudaGetErrorName(err)[1].decode('utf-8')}({int(err)}): "
f"{cudart.cudaGetErrorString(err)[1].decode('utf-8')}"
)
def check_cudart_err(args):
if isinstance(args, tuple):
assert len(args) >= 1
err = args[0]
if len(args) == 1:
ret = None
elif len(args) == 2:
ret = args[1]
else:
ret = args[1:]
else:
err = args
ret = None
assert isinstance(err, cudart.cudaError_t), type(err)
if err != cudart.cudaError_t.cudaSuccess:
raise RuntimeError(format_cudart_err(err))
return ret
class CudaOpenGLMappedBuffer:
def __init__(self, gl_buffer, flags=0):
self._gl_buffer = int(gl_buffer)
self._flags = int(flags)
self._graphics_ressource = None
self._cuda_buffer = None
self.register()
@property
def gl_buffer(self):
return self._gl_buffer
@property
def cuda_buffer(self):
assert self.mapped
return self._cuda_buffer
@property
def graphics_ressource(self):
assert self.registered
return self._graphics_ressource
@property
def registered(self):
return self._graphics_ressource is not None
@property
def mapped(self):
return self._cuda_buffer is not None
def __enter__(self):
return self.map()
def __exit__(self, exc_type, exc_value, trace):
self.unmap()
return False
def __del__(self):
try:
self.unregister()
except:
# Ignore errors during cleanup (e.g., during Python shutdown)
pass
def register(self):
if self.registered:
return self._graphics_ressource
self._graphics_ressource = check_cudart_err(
cudart.cudaGraphicsGLRegisterBuffer(self._gl_buffer, self._flags)
)
return self._graphics_ressource
def unregister(self):
if not self.registered:
return self
try:
self.unmap()
if cudart is not None: # Check if cudart is still available
check_cudart_err(
cudart.cudaGraphicsUnregisterResource(self._graphics_ressource)
)
self._graphics_ressource = None
except Exception:
# Ignore errors during cleanup (e.g., during Python shutdown)
self._graphics_ressource = None
return self
def map(self, stream=None):
if not self.registered:
raise RuntimeError("Cannot map an unregistered buffer.")
if self.mapped:
return self._cuda_buffer
check_cudart_err(
cudart.cudaGraphicsMapResources(1, self._graphics_ressource, stream)
)
ptr, size = check_cudart_err(
cudart.cudaGraphicsResourceGetMappedPointer(self._graphics_ressource)
)
self._cuda_buffer = cp.cuda.MemoryPointer(
cp.cuda.UnownedMemory(ptr, size, self), 0
)
return self._cuda_buffer
def unmap(self, stream=None):
if not self.registered:
raise RuntimeError("Cannot unmap an unregistered buffer.")
if not self.mapped:
return self
try:
if cudart is not None: # Check if cudart is still available
check_cudart_err(
cudart.cudaGraphicsUnmapResources(1, self._graphics_ressource, stream)
)
self._cuda_buffer = None
except Exception:
# Force cleanup even if unmap fails
self._cuda_buffer = None
return self
class CudaOpenGLMappedArray(CudaOpenGLMappedBuffer):
def __init__(self, dtype, shape, gl_buffer, flags=0, strides=None, order='C'):
super().__init__(gl_buffer, flags)
self._dtype = dtype
self._shape = shape
self._strides = strides
self._order = order
@property
def cuda_array(self):
assert self.mapped
return cp.ndarray(
shape=self._shape,
dtype=self._dtype,
strides=self._strides,
order=self._order,
memptr=self._cuda_buffer,
)
def map(self, *args, **kwargs):
super().map(*args, **kwargs)
return self.cuda_array
except ImportError:
GPU_ACCELERATION_AVAILABLE = False
class Shader:
def __init__(self, _vs, _fs):
self.program_id = glCreateProgram()
vertex_id = self.compile(GL_VERTEX_SHADER, _vs)
fragment_id = self.compile(GL_FRAGMENT_SHADER, _fs)
glAttachShader(self.program_id, vertex_id)
glAttachShader(self.program_id, fragment_id)
glBindAttribLocation( self.program_id, 0, "in_vertex")
glBindAttribLocation( self.program_id, 1, "in_texCoord")
glLinkProgram(self.program_id)
if glGetProgramiv(self.program_id, GL_LINK_STATUS) != GL_TRUE:
info = glGetProgramInfoLog(self.program_id)
if (self.program_id is not None) and (self.program_id > 0) and glIsProgram(self.program_id):
glDeleteProgram(self.program_id)
if (vertex_id is not None) and (vertex_id > 0) and glIsShader(vertex_id):
glDeleteShader(vertex_id)
if (fragment_id is not None) and (fragment_id > 0) and glIsShader(fragment_id):
glDeleteShader(fragment_id)
raise RuntimeError('Error linking program: %s' % (info))
if (vertex_id is not None) and (vertex_id > 0) and glIsShader(vertex_id):
glDeleteShader(vertex_id)
if (fragment_id is not None) and (fragment_id > 0) and glIsShader(fragment_id):
glDeleteShader(fragment_id)
def compile(self, _type, _src):
shader_id = None
try:
shader_id = glCreateShader(_type)
if shader_id == 0:
print("ERROR: shader type {0} does not exist".format(_type))
exit()
glShaderSource(shader_id, _src)
glCompileShader(shader_id)
if glGetShaderiv(shader_id, GL_COMPILE_STATUS) != GL_TRUE:
info = glGetShaderInfoLog(shader_id)
if (shader_id is not None) and (shader_id > 0) and glIsShader(shader_id):
glDeleteShader(shader_id)
raise RuntimeError('Shader compilation failed: %s' % (info))
return shader_id
except:
if (shader_id is not None) and (shader_id > 0) and glIsShader(shader_id):
glDeleteShader(shader_id)
raise
def get_program_id(self):
return self.program_id
class Simple3DObject:
def __init__(self, _is_static, pts_size = 3, clr_size = 3):
self.is_init = False
self.drawing_type = GL_TRIANGLES
self.is_static = _is_static
self.clear()
self.pt_type = pts_size
self.clr_type = clr_size
self.data = sl.Mat()
self.cuda_mapped_buffer = None
self.use_gpu = GPU_ACCELERATION_AVAILABLE and not _is_static
def add_pt(self, _pts): # _pts [x,y,z]
for pt in _pts:
self.vertices.append(pt)
def add_clr(self, _clrs): # _clr [r,g,b]
for clr in _clrs:
self.colors.append(clr)
def add_point_clr(self, _pt, _clr):
self.add_pt(_pt)
self.add_clr(_clr)
self.indices.append(len(self.indices))
def add_line(self, _p1, _p2, _clr):
self.add_point_clr(_p1, _clr)
self.add_point_clr(_p2, _clr)
def addFace(self, p1, p2, p3, clr):
self.add_point_clr(p1, clr)
self.add_point_clr(p2, clr)
self.add_point_clr(p3, clr)
def push_to_GPU(self):
if not self.is_init:
self.vboID = glGenBuffers(3)
self.is_init = True
if self.is_static:
type_draw = GL_STATIC_DRAW
else:
type_draw = GL_DYNAMIC_DRAW
if len(self.vertices):
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[0])
glBufferData(GL_ARRAY_BUFFER, len(self.vertices) * self.vertices.itemsize, (GLfloat * len(self.vertices))(*self.vertices), type_draw)
if len(self.colors):
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[1])
glBufferData(GL_ARRAY_BUFFER, len(self.colors) * self.colors.itemsize, (GLfloat * len(self.colors))(*self.colors), type_draw)
if len(self.indices):
glBindBuffer(GL_ELEMENT_ARRAY_BUFFER, self.vboID[2])
glBufferData(GL_ELEMENT_ARRAY_BUFFER,len(self.indices) * self.indices.itemsize,(GLuint * len(self.indices))(*self.indices), type_draw)
self.elementbufferSize = len(self.indices)
def init(self, res):
if not self.is_init:
self.vboID = glGenBuffers(3)
self.is_init = True
if self.is_static:
type_draw = GL_STATIC_DRAW
else:
type_draw = GL_DYNAMIC_DRAW
self.elementbufferSize = res.width * res.height
# Initialize vertex buffer (for XYZRGBA data)
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[0])
glBufferData(GL_ARRAY_BUFFER, self.elementbufferSize * self.pt_type * self.vertices.itemsize, None, type_draw)
# Try to set up GPU acceleration if available
if self.use_gpu:
try:
flags = cudart.cudaGraphicsRegisterFlags.cudaGraphicsRegisterFlagsWriteDiscard
self.cuda_mapped_buffer = CudaOpenGLMappedArray(
dtype=np.float32,
shape=(self.elementbufferSize, self.pt_type),
gl_buffer=self.vboID[0],
flags=flags
)
except Exception as e:
print(f"Failed to initialize GPU acceleration, falling back to CPU: {e}")
self.use_gpu = False
self.cuda_mapped_buffer = None
# Initialize color buffer (not used for point clouds with XYZRGBA)
if self.clr_type:
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[1])
glBufferData(GL_ARRAY_BUFFER, self.elementbufferSize * self.clr_type * self.colors.itemsize, None, type_draw)
for i in range (0, self.elementbufferSize):
self.indices.append(i)
glBindBuffer(GL_ELEMENT_ARRAY_BUFFER, self.vboID[2])
glBufferData(GL_ELEMENT_ARRAY_BUFFER,len(self.indices) * self.indices.itemsize,(GLuint * len(self.indices))(*self.indices), type_draw)
def setPoints(self, pc):
"""Update point cloud data from sl.Mat"""
if not pc.is_init():
return
try:
if self.use_gpu and self.cuda_mapped_buffer and pc.get_memory_type() in (sl.MEM.GPU, sl.MEM.BOTH):
self.setPointsGPU(pc)
else:
self.setPointsCPU(pc)
except Exception as e:
print(f"Error setting points: {e}")
# Fallback to CPU if GPU fails
if self.use_gpu:
print("Falling back to CPU processing")
self.use_gpu = False
self.setPointsCPU(pc)
def setPointsGPU(self, pc):
"""Set points using GPU acceleration with CUDA-OpenGL interop"""
try:
# Get point cloud data from GPU memory
cupy_arr = pc.get_data(sl.MEM.GPU)
# Map OpenGL buffer to CUDA memory
with self.cuda_mapped_buffer as cuda_array:
# Reshape point cloud data to match buffer format
if cupy_arr.ndim == 3: # (height, width, channels)
pc_flat = cupy_arr.reshape(-1, cupy_arr.shape[-1])
else:
pc_flat = cupy_arr
# Copy data to GPU buffer (optimized GPU-to-GPU copy with continuous memory)
points_to_copy = min(pc_flat.shape[0], cuda_array.shape[0])
cuda_array[:points_to_copy] = pc_flat[:points_to_copy]
# Zero out remaining buffer if needed
if points_to_copy < cuda_array.shape[0]:
cuda_array[points_to_copy:] = 0
except Exception as e:
print(f"GPU point cloud update failed: {e}")
raise
def setPointsCPU(self, pc):
"""Fallback CPU method for setting points"""
try:
# Ensure data is available on CPU
if pc.get_memory_type() == sl.MEM.GPU:
pc.update_cpu_from_gpu()
# Get CPU pointer and upload to GPU buffer
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[0])
data_ptr = pc.get_pointer(sl.MEM.CPU)
buffer_size = self.elementbufferSize * self.pt_type * 4 # 4 bytes per float32
glBufferSubData(GL_ARRAY_BUFFER, 0, buffer_size, ctypes.c_void_p(data_ptr))
glBindBuffer(GL_ARRAY_BUFFER, 0)
except Exception as e:
print(f"CPU point cloud update failed: {e}")
raise
def clear(self):
self.vertices = array.array('f')
self.colors = array.array('f')
self.indices = array.array('I')
self.elementbufferSize = 0
def set_drawing_type(self, _type):
self.drawing_type = _type
def draw(self):
if self.elementbufferSize:
glEnableVertexAttribArray(0)
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[0])
glVertexAttribPointer(0,self.pt_type,GL_FLOAT,GL_FALSE,0,None)
if(self.clr_type):
glEnableVertexAttribArray(1)
glBindBuffer(GL_ARRAY_BUFFER, self.vboID[1])
glVertexAttribPointer(1,self.clr_type,GL_FLOAT,GL_FALSE,0,None)
glBindBuffer(GL_ELEMENT_ARRAY_BUFFER, self.vboID[2])
glDrawElements(self.drawing_type, self.elementbufferSize, GL_UNSIGNED_INT, None)
glDisableVertexAttribArray(0)
if self.clr_type:
glDisableVertexAttribArray(1)
def __del__(self):
"""Cleanup GPU resources"""
if hasattr(self, 'cuda_mapped_buffer') and self.cuda_mapped_buffer:
try:
self.cuda_mapped_buffer.unregister()
except:
pass
class GLViewer:
def __init__(self):
self.available = False
self.mutex = Lock()
self.camera = CameraGL()
self.wheelPosition = 0.
self.mouse_button = [False, False]
self.mouseCurrentPosition = [0., 0.]
self.previousMouseMotion = [0., 0.]
self.mouseMotion = [0., 0.]
self.zedModel = Simple3DObject(True)
self.point_cloud = Simple3DObject(False, 4)
self.save_data = False
def init(self, _argc, _argv, res): # _params = sl.CameraParameters
glutInit(_argc, _argv)
wnd_w = int(glutGet(GLUT_SCREEN_WIDTH)*0.9)
wnd_h = int(glutGet(GLUT_SCREEN_HEIGHT) *0.9)
glutInitWindowSize(wnd_w, wnd_h)
glutInitWindowPosition(int(wnd_w*0.05), int(wnd_h*0.05))
glutInitDisplayMode(GLUT_DOUBLE | GLUT_RGBA | GLUT_DEPTH)
glutCreateWindow(b"ZED Depth Sensing")
glViewport(0, 0, wnd_w, wnd_h)
glutSetOption(GLUT_ACTION_ON_WINDOW_CLOSE,
GLUT_ACTION_CONTINUE_EXECUTION)
glEnable(GL_DEPTH_TEST)
glEnable(GL_BLEND)
glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA)
glEnable(GL_LINE_SMOOTH)
glHint(GL_LINE_SMOOTH_HINT, GL_NICEST)
# Compile and create the shader for 3D objects
self.shader_image = Shader(VERTEX_SHADER, FRAGMENT_SHADER)
self.shader_image_MVP = glGetUniformLocation(self.shader_image.get_program_id(), "u_mvpMatrix")
self.shader_pc = Shader(POINTCLOUD_VERTEX_SHADER, POINTCLOUD_FRAGMENT_SHADER)
self.shader_pc_MVP = glGetUniformLocation(self.shader_pc.get_program_id(), "u_mvpMatrix")
self.bckgrnd_clr = np.array([223/255., 230/255., 233/255.])
# Create the camera model
Z_ = -0.15
Y_ = Z_ * math.tan(95. * M_PI / 180. / 2.)
X_ = Y_ * 16./9.
A = np.array([0, 0, 0])
B = np.array([X_, Y_, Z_])
C = np.array([-X_, Y_, Z_])
D = np.array([-X_, -Y_, Z_])
E = np.array([X_, -Y_, Z_])
lime_clr = np.array([217 / 255, 255/255, 66/255])
self.zedModel.add_line(A, B, lime_clr)
self.zedModel.add_line(A, C, lime_clr)
self.zedModel.add_line(A, D, lime_clr)
self.zedModel.add_line(A, E, lime_clr)
self.zedModel.add_line(B, C, lime_clr)
self.zedModel.add_line(C, D, lime_clr)
self.zedModel.add_line(D, E, lime_clr)
self.zedModel.add_line(E, B, lime_clr)
self.zedModel.set_drawing_type(GL_LINES)
self.zedModel.push_to_GPU()
self.point_cloud.init(res)
self.point_cloud.set_drawing_type(GL_POINTS)
# Register GLUT callback functions
glutDisplayFunc(self.draw_callback)
glutIdleFunc(self.idle)
glutKeyboardFunc(self.keyPressedCallback)
glutCloseFunc(self.close_func)
glutMouseFunc(self.on_mouse)
glutMotionFunc(self.on_mousemove)
glutReshapeFunc(self.on_resize)
self.available = True
def is_available(self):
if self.available:
glutMainLoopEvent()
return self.available
def updateData(self, pc):
self.mutex.acquire()
try:
self.point_cloud.setPoints(pc)
finally:
self.mutex.release()
def idle(self):
if self.available:
glutPostRedisplay()
def exit(self):
if self.available:
self.available = False
def close_func(self):
if self.available:
self.available = False
def keyPressedCallback(self, key, x, y):
if ord(key) == 27:
self.close_func()
if (ord(key) == 83 or ord(key) == 115):
self.save_data = True
def on_mouse(self,*args,**kwargs):
(key,Up,x,y) = args
if key==0:
self.mouse_button[0] = (Up == 0)
elif key==2 :
self.mouse_button[1] = (Up == 0)
elif(key == 3):
self.wheelPosition = self.wheelPosition + 1
elif(key == 4):
self.wheelPosition = self.wheelPosition - 1
self.mouseCurrentPosition = [x, y]
self.previousMouseMotion = [x, y]
def on_mousemove(self,*args,**kwargs):
(x,y) = args
self.mouseMotion[0] = x - self.previousMouseMotion[0]
self.mouseMotion[1] = y - self.previousMouseMotion[1]
self.previousMouseMotion = [x, y]
glutPostRedisplay()
def on_resize(self,Width,Height):
glViewport(0, 0, Width, Height)
self.camera.setProjection(Height / Width)
def draw_callback(self):
if self.available:
glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT)
glClearColor(self.bckgrnd_clr[0], self.bckgrnd_clr[1], self.bckgrnd_clr[2], 1.)
self.mutex.acquire()
self.update()
self.draw()
self.mutex.release()
glutSwapBuffers()
glutPostRedisplay()
def update(self):
if(self.mouse_button[0]):
r = sl.Rotation()
vert=self.camera.vertical_
tmp = vert.get()
vert.init_vector(tmp[0] * 1.,tmp[1] * 1., tmp[2] * 1.)
r.init_angle_translation(self.mouseMotion[0] * 0.02, vert)
self.camera.rotate(r)
r.init_angle_translation(self.mouseMotion[1] * 0.02, self.camera.right_)
self.camera.rotate(r)
if(self.mouse_button[1]):
t = sl.Translation()
tmp = self.camera.right_.get()
scale = self.mouseMotion[0] *-0.05
t.init_vector(tmp[0] * scale, tmp[1] * scale, tmp[2] * scale)
self.camera.translate(t)
tmp = self.camera.up_.get()
scale = self.mouseMotion[1] * 0.05
t.init_vector(tmp[0] * scale, tmp[1] * scale, tmp[2] * scale)
self.camera.translate(t)
if (self.wheelPosition != 0):
t = sl.Translation()
tmp = self.camera.forward_.get()
scale = self.wheelPosition * -0.065
t.init_vector(tmp[0] * scale, tmp[1] * scale, tmp[2] * scale)
self.camera.translate(t)
self.camera.update()
self.mouseMotion = [0., 0.]
self.wheelPosition = 0
def draw(self):
vpMatrix = self.camera.getViewProjectionMatrix()
glUseProgram(self.shader_image.get_program_id())
glUniformMatrix4fv(self.shader_image_MVP, 1, GL_TRUE, (GLfloat * len(vpMatrix))(*vpMatrix))
glPolygonMode(GL_FRONT_AND_BACK, GL_FILL)
self.zedModel.draw()
glUseProgram(0)
glUseProgram(self.shader_pc.get_program_id())
glUniformMatrix4fv(self.shader_pc_MVP, 1, GL_TRUE, (GLfloat * len(vpMatrix))(*vpMatrix))
glPointSize(1.)
self.point_cloud.draw()
glUseProgram(0)
class CameraGL:
def __init__(self):
self.ORIGINAL_FORWARD = sl.Translation()
self.ORIGINAL_FORWARD.init_vector(0,0,1)
self.ORIGINAL_UP = sl.Translation()
self.ORIGINAL_UP.init_vector(0,1,0)
self.ORIGINAL_RIGHT = sl.Translation()
self.ORIGINAL_RIGHT.init_vector(1,0,0)
self.znear = 0.5
self.zfar = 100.
self.horizontalFOV = 70.
self.orientation_ = sl.Orientation()
self.position_ = sl.Translation()
self.forward_ = sl.Translation()
self.up_ = sl.Translation()
self.right_ = sl.Translation()
self.vertical_ = sl.Translation()
self.vpMatrix_ = sl.Matrix4f()
self.offset_ = sl.Translation()
self.offset_.init_vector(0,0,5)
self.projection_ = sl.Matrix4f()
self.projection_.set_identity()
self.setProjection(1.78)
self.position_.init_vector(0., 0., 0.)
tmp = sl.Translation()
tmp.init_vector(0, 0, -.1)
tmp2 = sl.Translation()
tmp2.init_vector(0, 1, 0)
self.setDirection(tmp, tmp2)
def update(self):
dot_ = sl.Translation.dot_translation(self.vertical_, self.up_)
if(dot_ < 0.):
tmp = self.vertical_.get()
self.vertical_.init_vector(tmp[0] * -1.,tmp[1] * -1., tmp[2] * -1.)
transformation = sl.Transform()
tmp_position = self.position_.get()
tmp = (self.offset_ * self.orientation_).get()
new_position = sl.Translation()
new_position.init_vector(tmp_position[0] + tmp[0], tmp_position[1] + tmp[1], tmp_position[2] + tmp[2])
transformation.init_orientation_translation(self.orientation_, new_position)
transformation.inverse()
self.vpMatrix_ = self.projection_ * transformation
def setProjection(self, im_ratio):
fov_x = self.horizontalFOV * 3.1416 / 180.
fov_y = self.horizontalFOV * im_ratio * 3.1416 / 180.
self.projection_[(0,0)] = 1. / math.tan(fov_x * .5)
self.projection_[(1,1)] = 1. / math.tan(fov_y * .5)
self.projection_[(2,2)] = -(self.zfar + self.znear) / (self.zfar - self.znear)
self.projection_[(3,2)] = -1.
self.projection_[(2,3)] = -(2. * self.zfar * self.znear) / (self.zfar - self.znear)
self.projection_[(3,3)] = 0.
def getViewProjectionMatrix(self):
tmp = self.vpMatrix_.m
vpMat = array.array('f')
for row in tmp:
for v in row:
vpMat.append(v)
return vpMat
def getViewProjectionMatrixRT(self, tr):
tmp = self.vpMatrix_
tmp.transpose()
tr.transpose()
tmp = (tr * tmp).m
vpMat = array.array('f')
for row in tmp:
for v in row:
vpMat.append(v)
return vpMat
def setDirection(self, dir, vert):
dir.normalize()
tmp = dir.get()
dir.init_vector(tmp[0] * -1.,tmp[1] * -1., tmp[2] * -1.)
self.orientation_.init_translation(self.ORIGINAL_FORWARD, dir)
self.updateVectors()
self.vertical_ = vert
if(sl.Translation.dot_translation(self.vertical_, self.up_) < 0.):
tmp = sl.Rotation()
tmp.init_angle_translation(3.14, self.ORIGINAL_FORWARD)
self.rotate(tmp)
def translate(self, t):
ref = self.position_.get()
tmp = t.get()
self.position_.init_vector(ref[0] + tmp[0], ref[1] + tmp[1], ref[2] + tmp[2])
def setPosition(self, p):
self.position_ = p
def rotate(self, r):
tmp = sl.Orientation()
tmp.init_rotation(r)
self.orientation_ = tmp * self.orientation_
self.updateVectors()
def setRotation(self, r):
self.orientation_.init_rotation(r)
self.updateVectors()
def updateVectors(self):
self.forward_ = self.ORIGINAL_FORWARD * self.orientation_
self.up_ = self.ORIGINAL_UP * self.orientation_
right = self.ORIGINAL_RIGHT
tmp = right.get()
right.init_vector(tmp[0] * -1.,tmp[1] * -1., tmp[2] * -1.)
self.right_ = right * self.orientation_

View File

@ -0,0 +1,19 @@
[project]
name = "depth-sensing"
version = "0.1.0"
description = "depth sensing with ZED cameras"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pyzed",
"cupy-cuda12x",
"cuda-python",
"numpy",
"pyopengl>=3.1.10",
"pyopengl-accelerate>=3.1.10",
"jupyterlab>=4.5.3",
]
[tool.uv.sources]
pyzed = { path = "libs/pyzed_pkg" }

1659
py_workspace/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff