import numpy as np import awkward as ak from pathlib import Path from typing import Union, cast def load_marker_geometry(parquet_path: Union[str, Path]) -> dict[int, np.ndarray]: """ Reads ArUco marker geometry from a parquet file. The parquet file is expected to have 'ids' and 'corners' columns, similar to standard_box_markers.parquet. Args: parquet_path: Path to the parquet file. Returns: A dictionary mapping marker IDs to their 3D corner coordinates. The corners are represented as a (4, 3) float32 numpy array. """ path = Path(parquet_path) if not path.exists(): raise FileNotFoundError(f"Parquet file not found: {path}") ops = ak.from_parquet(path) # Extract IDs and corners using logic similar to find_extrinsic_object.py total_ids = cast(np.ndarray, ak.to_numpy(ops["ids"])).flatten() total_corners = cast(np.ndarray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3) # Create the mapping and ensure coordinates are float32 marker_geometry = { int(marker_id): corners.astype(np.float32) for marker_id, corners in zip(total_ids, total_corners) } return marker_geometry def load_face_mapping(parquet_path: Union[str, Path]) -> dict[str, list[int]]: """ Reads face mapping from a parquet file. The parquet file is expected to have 'name' and 'ids' columns. 'name' should be a string (face name), and 'ids' should be a list of integers. Args: parquet_path: Path to the parquet file. Returns: A dictionary mapping face names (lowercase) to lists of marker IDs. """ path = Path(parquet_path) if not path.exists(): raise FileNotFoundError(f"Parquet file not found: {path}") ops = ak.from_parquet(path) # Check if required columns exist if "name" not in ops.fields or "ids" not in ops.fields: # Fallback or empty if columns missing (e.g. old format) return {} names = cast(np.ndarray, ak.to_numpy(ops["name"])) # ids might be a jagged array if different faces have different marker counts # ak.to_list() converts to python lists which is what we want for the dict values ids_list = ak.to_list(ops["ids"]) face_map: dict[str, list[int]] = {} for name, m_ids in zip(names, ids_list): if name and m_ids: # Normalize name to lowercase key = str(name).lower() # Ensure IDs are ints face_map[key] = [int(mid) for mid in m_ids] return face_map def validate_marker_geometry(geometry: dict[int, np.ndarray]) -> None: """ Validates the marker geometry dictionary. Checks: - Dictionary is not empty. - Every entry has shape (4, 3). - Every entry contains only finite numbers (no NaN or Inf). - Coordinates are within a reasonable range (abs(coord) < 100m). Args: geometry: The marker geometry dictionary to validate. Raises: ValueError: If any validation check fails. """ if not geometry: raise ValueError("Marker geometry is empty.") for marker_id, corners in geometry.items(): # Check shape if corners.shape != (4, 3): raise ValueError( f"Invalid shape for marker {marker_id}: " f"expected (4, 3), got {corners.shape}" ) # Check for non-finite values (NaN, Inf) if not np.isfinite(corners).all(): raise ValueError( f"Marker {marker_id} contains non-finite values (NaN or Inf)." ) # Check for reasonable range (e.g., < 100 meters) if np.any(np.abs(corners) > 100.0): raise ValueError( f"Marker {marker_id} has coordinates exceeding reasonable range (> 100m): " f"{corners}" ) def expected_ids(geometry: dict[int, np.ndarray]) -> set[int]: """ Returns the set of marker IDs present in the geometry. """ return set(geometry.keys())