import h5py import numpy as np import json import datetime from pathlib import Path from typing import Dict, Any, List, Optional, cast from loguru import logger def save_depth_data( path: str | Path, camera_data: Dict[str, Any], schema_version: int = 1 ) -> None: """ Save depth data to HDF5 format. Args: path: Output file path camera_data: Dictionary mapping serial numbers to camera data schema_version: Schema version number (default: 1) """ path = Path(path) # Ensure parent directory exists if not path.parent.exists(): raise FileNotFoundError(f"Parent directory does not exist: {path.parent}") with h5py.File(path, "w") as f: # Global metadata meta = f.create_group("meta") meta.attrs["schema_version"] = schema_version meta.attrs["units"] = "meters" meta.attrs["coordinate_frame"] = "world_from_cam" meta.attrs["created_at"] = datetime.datetime.now().isoformat() # Per-camera data cameras = f.create_group("cameras") for serial, data in camera_data.items(): cam_group = cameras.create_group(serial) # Intrinsics and resolution cam_group.create_dataset("intrinsics", data=data["intrinsics"]) cam_group.attrs["resolution"] = data["resolution"] # Pooled depth map (compressed) cam_group.create_dataset( "pooled_depth", data=data["pooled_depth"], compression="gzip", compression_opts=4, shuffle=True, ) # Optional pooled confidence if "pooled_confidence" in data and data["pooled_confidence"] is not None: cam_group.create_dataset( "pooled_confidence", data=data["pooled_confidence"], compression="gzip", compression_opts=4, shuffle=True, ) # Pool metadata (JSON serialized) if data.get("pool_metadata"): cam_group.attrs["pool_metadata"] = json.dumps(data["pool_metadata"]) # Raw frames if "raw_frames" in data and data["raw_frames"]: raw_group = cam_group.create_group("raw_frames") for i, frame in enumerate(data["raw_frames"]): frame_group = raw_group.create_group(f"frame_{i}") frame_group.attrs["frame_index"] = frame["frame_index"] frame_group.attrs["score"] = frame["score"] if "depth_map" in frame and frame["depth_map"] is not None: frame_group.create_dataset( "depth_map", data=frame["depth_map"], compression="gzip", compression_opts=4, shuffle=True, ) if ( "confidence_map" in frame and frame["confidence_map"] is not None ): frame_group.create_dataset( "confidence_map", data=frame["confidence_map"], compression="gzip", compression_opts=4, shuffle=True, ) def load_depth_data(path: str | Path) -> Dict[str, Any]: """ Load depth data from HDF5 file. Args: path: Input file path Returns: Dictionary mapping serial numbers to camera data """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") results = {} with h5py.File(path, "r") as f: if "cameras" not in f: return {} cameras_group = f["cameras"] if not isinstance(cameras_group, h5py.Group): return {} for serial in cameras_group: cam_group = cameras_group[serial] if not isinstance(cam_group, h5py.Group): continue # We use type: ignore here because h5py dataset slicing returns numpy arrays # but type checkers struggle with the dynamic nature of h5py cam_data: Dict[str, Any] = { "intrinsics": np.array(cam_group["intrinsics"]), "resolution": tuple(cast(Any, cam_group.attrs["resolution"])), "pooled_depth": np.array(cam_group["pooled_depth"]), } if "pooled_confidence" in cam_group: cam_data["pooled_confidence"] = np.array(cam_group["pooled_confidence"]) if "pool_metadata" in cam_group.attrs: metadata_str = cam_group.attrs["pool_metadata"] if isinstance(metadata_str, str): cam_data["pool_metadata"] = json.loads(metadata_str) elif isinstance(metadata_str, bytes): cam_data["pool_metadata"] = json.loads(metadata_str.decode("utf-8")) else: cam_data["pool_metadata"] = None else: cam_data["pool_metadata"] = None # Load raw frames raw_frames: List[Dict[str, Any]] = [] if "raw_frames" in cam_group: raw_group = cam_group["raw_frames"] if isinstance(raw_group, h5py.Group): # Sort by frame index to maintain order if needed, though keys are frame_0, frame_1... # We'll just iterate sorted keys for key in sorted( raw_group.keys(), key=lambda x: int(x.split("_")[1]) ): frame_group = raw_group[key] if not isinstance(frame_group, h5py.Group): continue frame_data: Dict[str, Any] = { "frame_index": frame_group.attrs["frame_index"], "score": frame_group.attrs["score"], } if "depth_map" in frame_group: frame_data["depth_map"] = np.array(frame_group["depth_map"]) if "confidence_map" in frame_group: frame_data["confidence_map"] = np.array( frame_group["confidence_map"] ) raw_frames.append(frame_data) cam_data["raw_frames"] = raw_frames results[serial] = cam_data return results