diff --git a/py_workspace/.beads/issues.jsonl b/py_workspace/.beads/issues.jsonl index 02603d8..4bd7456 100644 --- a/py_workspace/.beads/issues.jsonl +++ b/py_workspace/.beads/issues.jsonl @@ -17,7 +17,7 @@ {"id":"py_workspace-aif","title":"Update visualization conventions documentation","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-09T04:20:14.893831963Z","created_by":"crosstyan","updated_at":"2026-02-09T04:22:07.154821825Z","closed_at":"2026-02-09T04:22:07.154821825Z","close_reason":"Updated documentation with current policy checklist, metadata details, and known pitfalls"} {"id":"py_workspace-cg4","title":"Implement geometry-first auto-align heuristic","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T16:48:33.048250646Z","created_by":"crosstyan","updated_at":"2026-02-07T16:53:54.772815505Z","closed_at":"2026-02-07T16:53:54.772815505Z","close_reason":"Closed"} {"id":"py_workspace-cg9","title":"Implement core alignment utilities (Task 1)","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-06T10:40:36.296030875Z","created_by":"crosstyan","updated_at":"2026-02-06T10:40:46.196825039Z","closed_at":"2026-02-06T10:40:46.196825039Z","close_reason":"Implemented compute_face_normal, rotation_align_vectors, and apply_alignment_to_pose in aruco/alignment.py"} -{"id":"py_workspace-e09","title":"Implement aruco/depth_save.py","status":"in_progress","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-09T06:58:01.987010195Z","created_by":"crosstyan","updated_at":"2026-02-09T06:58:09.311371064Z"} +{"id":"py_workspace-e09","title":"Implement aruco/depth_save.py","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-09T06:58:01.987010195Z","created_by":"crosstyan","updated_at":"2026-02-09T07:17:05.33050495Z","closed_at":"2026-02-09T07:17:05.33050495Z","close_reason":"Implemented depth_save.py with HDF5 persistence and passed tests/type checks"} {"id":"py_workspace-ecz","title":"Update visualization conventions docs with alignment details","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-08T07:47:49.633647436Z","created_by":"crosstyan","updated_at":"2026-02-08T07:48:25.728323257Z","closed_at":"2026-02-08T07:48:25.728323257Z","close_reason":"Added alignment methodology section to docs"} {"id":"py_workspace-ee1","title":"Implement depth-mode argument resolution in calibrate_extrinsics.py","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T06:31:03.430147225Z","created_by":"crosstyan","updated_at":"2026-02-07T06:33:43.204825053Z","closed_at":"2026-02-07T06:33:43.204825053Z","close_reason":"Implemented depth-mode argument resolution logic and verified with multiple test cases."} {"id":"py_workspace-f23","title":"Add --origin-axes-scale option to visualize_extrinsics.py","status":"closed","priority":2,"issue_type":"feature","owner":"crosstyan@outlook.com","created_at":"2026-02-08T05:37:35.228917793Z","created_by":"crosstyan","updated_at":"2026-02-08T05:38:31.173898101Z","closed_at":"2026-02-08T05:38:31.173898101Z","close_reason":"Implemented --origin-axes-scale option and verified with rendering."} diff --git a/py_workspace/aruco/depth_save.py b/py_workspace/aruco/depth_save.py new file mode 100644 index 0000000..8dbe064 --- /dev/null +++ b/py_workspace/aruco/depth_save.py @@ -0,0 +1,181 @@ +import h5py +import numpy as np +import json +import datetime +from pathlib import Path +from typing import Dict, Any, List, Optional, cast +from loguru import logger + + +def save_depth_data( + path: str | Path, camera_data: Dict[str, Any], schema_version: int = 1 +) -> None: + """ + Save depth data to HDF5 format. + + Args: + path: Output file path + camera_data: Dictionary mapping serial numbers to camera data + schema_version: Schema version number (default: 1) + """ + path = Path(path) + + # Ensure parent directory exists + if not path.parent.exists(): + raise FileNotFoundError(f"Parent directory does not exist: {path.parent}") + + with h5py.File(path, "w") as f: + # Global metadata + meta = f.create_group("meta") + meta.attrs["schema_version"] = schema_version + meta.attrs["units"] = "meters" + meta.attrs["coordinate_frame"] = "world_from_cam" + meta.attrs["created_at"] = datetime.datetime.now().isoformat() + + # Per-camera data + cameras = f.create_group("cameras") + + for serial, data in camera_data.items(): + cam_group = cameras.create_group(serial) + + # Intrinsics and resolution + cam_group.create_dataset("intrinsics", data=data["intrinsics"]) + cam_group.attrs["resolution"] = data["resolution"] + + # Pooled depth map (compressed) + cam_group.create_dataset( + "pooled_depth", + data=data["pooled_depth"], + compression="gzip", + compression_opts=4, + shuffle=True, + ) + + # Optional pooled confidence + if "pooled_confidence" in data and data["pooled_confidence"] is not None: + cam_group.create_dataset( + "pooled_confidence", + data=data["pooled_confidence"], + compression="gzip", + compression_opts=4, + shuffle=True, + ) + + # Pool metadata (JSON serialized) + if data.get("pool_metadata"): + cam_group.attrs["pool_metadata"] = json.dumps(data["pool_metadata"]) + + # Raw frames + if "raw_frames" in data and data["raw_frames"]: + raw_group = cam_group.create_group("raw_frames") + for i, frame in enumerate(data["raw_frames"]): + frame_group = raw_group.create_group(f"frame_{i}") + frame_group.attrs["frame_index"] = frame["frame_index"] + frame_group.attrs["score"] = frame["score"] + + if "depth_map" in frame and frame["depth_map"] is not None: + frame_group.create_dataset( + "depth_map", + data=frame["depth_map"], + compression="gzip", + compression_opts=4, + shuffle=True, + ) + + if ( + "confidence_map" in frame + and frame["confidence_map"] is not None + ): + frame_group.create_dataset( + "confidence_map", + data=frame["confidence_map"], + compression="gzip", + compression_opts=4, + shuffle=True, + ) + + +def load_depth_data(path: str | Path) -> Dict[str, Any]: + """ + Load depth data from HDF5 file. + + Args: + path: Input file path + + Returns: + Dictionary mapping serial numbers to camera data + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {path}") + + results = {} + + with h5py.File(path, "r") as f: + if "cameras" not in f: + return {} + + cameras_group = f["cameras"] + if not isinstance(cameras_group, h5py.Group): + return {} + + for serial in cameras_group: + cam_group = cameras_group[serial] + if not isinstance(cam_group, h5py.Group): + continue + + # We use type: ignore here because h5py dataset slicing returns numpy arrays + # but type checkers struggle with the dynamic nature of h5py + cam_data: Dict[str, Any] = { + "intrinsics": np.array(cam_group["intrinsics"]), + "resolution": tuple(cast(Any, cam_group.attrs["resolution"])), + "pooled_depth": np.array(cam_group["pooled_depth"]), + } + + if "pooled_confidence" in cam_group: + cam_data["pooled_confidence"] = np.array(cam_group["pooled_confidence"]) + + if "pool_metadata" in cam_group.attrs: + metadata_str = cam_group.attrs["pool_metadata"] + if isinstance(metadata_str, str): + cam_data["pool_metadata"] = json.loads(metadata_str) + elif isinstance(metadata_str, bytes): + cam_data["pool_metadata"] = json.loads(metadata_str.decode("utf-8")) + else: + cam_data["pool_metadata"] = None + else: + cam_data["pool_metadata"] = None + + # Load raw frames + raw_frames: List[Dict[str, Any]] = [] + if "raw_frames" in cam_group: + raw_group = cam_group["raw_frames"] + if isinstance(raw_group, h5py.Group): + # Sort by frame index to maintain order if needed, though keys are frame_0, frame_1... + # We'll just iterate sorted keys + for key in sorted( + raw_group.keys(), key=lambda x: int(x.split("_")[1]) + ): + frame_group = raw_group[key] + if not isinstance(frame_group, h5py.Group): + continue + + frame_data: Dict[str, Any] = { + "frame_index": frame_group.attrs["frame_index"], + "score": frame_group.attrs["score"], + } + + if "depth_map" in frame_group: + frame_data["depth_map"] = np.array(frame_group["depth_map"]) + + if "confidence_map" in frame_group: + frame_data["confidence_map"] = np.array( + frame_group["confidence_map"] + ) + + raw_frames.append(frame_data) + + cam_data["raw_frames"] = raw_frames + results[serial] = cam_data + + return results diff --git a/py_workspace/tests/test_depth_save.py b/py_workspace/tests/test_depth_save.py new file mode 100644 index 0000000..3782913 --- /dev/null +++ b/py_workspace/tests/test_depth_save.py @@ -0,0 +1,132 @@ +import numpy as np +import pytest +import h5py +import json +from pathlib import Path +from aruco.depth_save import save_depth_data, load_depth_data + + +@pytest.fixture +def sample_camera_data(): + """Create sample camera data for testing.""" + return { + "12345678": { + "intrinsics": np.array([[1000, 0, 640], [0, 1000, 360], [0, 0, 1]]), + "resolution": (1280, 720), + "pooled_depth": np.random.rand(720, 1280).astype(np.float32), + "pooled_confidence": np.random.randint(0, 100, (720, 1280)).astype( + np.uint8 + ), + "pool_metadata": { + "pool_size_requested": 5, + "pool_size_actual": 3, + "pooled": True, + "pooled_rmse": 0.05, + }, + "raw_frames": [ + { + "frame_index": 10, + "score": 95.5, + "depth_map": np.random.rand(720, 1280).astype(np.float32), + "confidence_map": np.random.randint(0, 100, (720, 1280)).astype( + np.uint8 + ), + } + ], + }, + "87654321": { + "intrinsics": np.array([[1000, 0, 640], [0, 1000, 360], [0, 0, 1]]), + "resolution": (1280, 720), + "pooled_depth": np.random.rand(720, 1280).astype(np.float32), + # No confidence map for this camera + "pool_metadata": None, + "raw_frames": [], + }, + } + + +def test_save_depth_data_creates_file(tmp_path, sample_camera_data): + """Test that save_depth_data creates a valid HDF5 file.""" + output_path = tmp_path / "test_depth.h5" + save_depth_data(output_path, sample_camera_data) + + assert output_path.exists() + assert h5py.is_hdf5(output_path) + + +def test_save_depth_data_metadata(tmp_path, sample_camera_data): + """Test that global metadata is saved correctly.""" + output_path = tmp_path / "test_depth.h5" + save_depth_data(output_path, sample_camera_data) + + with h5py.File(output_path, "r") as f: + assert "meta" in f + assert f["meta"].attrs["schema_version"] == 1 + assert f["meta"].attrs["units"] == "meters" + assert f["meta"].attrs["coordinate_frame"] == "world_from_cam" + assert "created_at" in f["meta"].attrs + + +def test_save_load_roundtrip(tmp_path, sample_camera_data): + """Test that data can be saved and loaded back accurately.""" + output_path = tmp_path / "test_depth.h5" + save_depth_data(output_path, sample_camera_data) + + loaded_data = load_depth_data(output_path) + + assert set(loaded_data.keys()) == set(sample_camera_data.keys()) + + for serial in sample_camera_data: + original = sample_camera_data[serial] + loaded = loaded_data[serial] + + np.testing.assert_array_equal(loaded["intrinsics"], original["intrinsics"]) + assert tuple(loaded["resolution"]) == tuple(original["resolution"]) + np.testing.assert_allclose(loaded["pooled_depth"], original["pooled_depth"]) + + if "pooled_confidence" in original: + np.testing.assert_array_equal( + loaded["pooled_confidence"], original["pooled_confidence"] + ) + else: + assert "pooled_confidence" not in loaded + + if original["pool_metadata"]: + assert loaded["pool_metadata"] == original["pool_metadata"] + else: + assert loaded["pool_metadata"] is None + + +def test_save_raw_frames(tmp_path, sample_camera_data): + """Test that raw frames are saved and loaded correctly.""" + output_path = tmp_path / "test_depth.h5" + save_depth_data(output_path, sample_camera_data) + + loaded_data = load_depth_data(output_path) + + # Check camera with raw frames + serial = "12345678" + original_frames = sample_camera_data[serial]["raw_frames"] + loaded_frames = loaded_data[serial]["raw_frames"] + + assert len(loaded_frames) == len(original_frames) + for orig, load in zip(original_frames, loaded_frames): + assert load["frame_index"] == orig["frame_index"] + assert load["score"] == orig["score"] + np.testing.assert_allclose(load["depth_map"], orig["depth_map"]) + np.testing.assert_array_equal(load["confidence_map"], orig["confidence_map"]) + + +def test_invalid_path_handling(): + """Test handling of invalid paths.""" + with pytest.raises(Exception): + save_depth_data("/nonexistent/directory/file.h5", {}) + + +def test_empty_data_handling(tmp_path): + """Test saving empty data dictionary.""" + output_path = tmp_path / "empty.h5" + save_depth_data(output_path, {}) + + loaded = load_depth_data(output_path) + assert len(loaded) == 0