From 43e736faac80e48eec012474e5714fbb8e860708 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Wed, 4 Feb 2026 11:44:37 +0000 Subject: [PATCH] Add ArUco detection and pose estimation modules Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus --- py_workspace/aruco/aruco_box.py | 95 +++++++ py_workspace/aruco/detector.py | 144 +++++++++++ py_workspace/aruco/find_extrinsic_object.py | 173 +++++++++++++ py_workspace/aruco/marker_geometry.py | 85 ++++++ py_workspace/aruco/output/.DS_Store | Bin 0 -> 6148 bytes .../output/aruco_2d_uv_coords_normalized.json | 1 + .../aruco/output/aruco_3d_coords.json | 1 + .../aruco/output/object_points.parquet | Bin 0 -> 4617 bytes .../aruco/output/standard_box_markers.parquet | Bin 0 -> 5363 bytes py_workspace/aruco/pose_averaging.py | 242 ++++++++++++++++++ py_workspace/aruco/pose_math.py | 144 +++++++++++ py_workspace/aruco/preview.py | 93 +++++++ py_workspace/aruco/svo_sync.py | 163 ++++++++++++ 13 files changed, 1141 insertions(+) create mode 100644 py_workspace/aruco/aruco_box.py create mode 100644 py_workspace/aruco/detector.py create mode 100644 py_workspace/aruco/find_extrinsic_object.py create mode 100644 py_workspace/aruco/marker_geometry.py create mode 100644 py_workspace/aruco/output/.DS_Store create mode 100644 py_workspace/aruco/output/aruco_2d_uv_coords_normalized.json create mode 100644 py_workspace/aruco/output/aruco_3d_coords.json create mode 100644 py_workspace/aruco/output/object_points.parquet create mode 100644 py_workspace/aruco/output/standard_box_markers.parquet create mode 100644 py_workspace/aruco/pose_averaging.py create mode 100644 py_workspace/aruco/pose_math.py create mode 100644 py_workspace/aruco/preview.py create mode 100644 py_workspace/aruco/svo_sync.py diff --git a/py_workspace/aruco/aruco_box.py b/py_workspace/aruco/aruco_box.py new file mode 100644 index 0000000..1e1735e --- /dev/null +++ b/py_workspace/aruco/aruco_box.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass + +# Order of detection result +# 0, 1, 2, 3 +# TL, TR, BR, BL +# RED, GREEN, BLUE, YELLOW + + +@dataclass +class DiamondBoardParameter: + marker_leghth: float + """ + the ArUco marker length in meter + """ + chess_length: float + """ + the length of the chess board in meter + """ + border_length: float = 0.01 + """ + border_length in m, default is 1cm + """ + + @property + def marker_border_length(self): + assert self.chess_length > self.marker_leghth + return (self.chess_length - self.marker_leghth) / 2 + + @property + def total_side_length(self): + assert self.chess_length > self.marker_leghth + return self.marker_border_length * 2 + self.chess_length * 3 + + +# 9mm + 127mm + 127mm (97mm marker) + 127mm + 10mm +# i.e. marker boarder = 127mm - 97mm = 30mm (15mm each side) +Point2D = tuple[float, float] +Quad2D = tuple[Point2D, Point2D, Point2D, Point2D] + + +@dataclass +class ArUcoMarker: + id: int + corners: Quad2D + + +# let's let TL be the origin +def generate_diamond_corners( + ids: tuple[int, int, int, int], params: DiamondBoardParameter +): + """ + A diamond chess board, which could be count as a kind of ChArUco board + + C | 0 | C + --------- + 1 | C | 2 + --------- + C | 3 | C + + where C is the chess box, and 0, 1, 2, 3 are the markers (whose ids are passed in order) + + Args: + ids: a tuple of 4 ids of the markers + params: DiamondBoardParameter + """ + + def tl_to_square(tl_x: float, tl_y: float, side_length: float) -> Quad2D: + return ( + (tl_x, tl_y), + (tl_x + side_length, tl_y), + (tl_x + side_length, tl_y + side_length), + (tl_x, tl_y + side_length), + ) + + tl_0_x = params.border_length + params.chess_length + params.marker_border_length + tl_0_y = params.border_length + params.marker_border_length + + tl_1_x = params.border_length + params.marker_border_length + tl_1_y = params.border_length + params.chess_length + params.marker_border_length + + tl_2_x = ( + params.border_length + params.chess_length * 2 + params.marker_border_length + ) + tl_2_y = tl_1_y + + tl_3_x = params.border_length + params.chess_length + params.marker_border_length + tl_3_y = ( + params.border_length + params.chess_length * 2 + params.marker_border_length + ) + return ( + ArUcoMarker(ids[0], tl_to_square(tl_0_x, tl_0_y, params.marker_leghth)), + ArUcoMarker(ids[1], tl_to_square(tl_1_x, tl_1_y, params.marker_leghth)), + ArUcoMarker(ids[2], tl_to_square(tl_2_x, tl_2_y, params.marker_leghth)), + ArUcoMarker(ids[3], tl_to_square(tl_3_x, tl_3_y, params.marker_leghth)), + ) diff --git a/py_workspace/aruco/detector.py b/py_workspace/aruco/detector.py new file mode 100644 index 0000000..4a7785a --- /dev/null +++ b/py_workspace/aruco/detector.py @@ -0,0 +1,144 @@ +import cv2 +import numpy as np +import pyzed.sl as sl +from typing import Optional, Tuple, Dict +from .pose_math import compute_reprojection_error + + +def create_detector( + dictionary_id: int = cv2.aruco.DICT_4X4_50, + params: Optional[cv2.aruco.DetectorParameters] = None, +) -> cv2.aruco.ArucoDetector: + """ + Creates an ArUco detector with the specified dictionary and parameters. + + Args: + dictionary_id: The ArUco dictionary ID. + params: Optional detector parameters. If None, default parameters are used. + + Returns: + cv2.aruco.ArucoDetector: The configured ArUco detector. + """ + dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id) + if params is None: + params = cv2.aruco.DetectorParameters() + return cv2.aruco.ArucoDetector(dictionary, params) + + +def detect_markers( + image: np.ndarray, detector: cv2.aruco.ArucoDetector +) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """ + Detects ArUco markers in an image. + + Args: + image: Input image (BGR or grayscale). + detector: The ArUco detector to use. + + Returns: + Tuple[np.ndarray, Optional[np.ndarray]]: + - corners: Detected corners as float32 shape (N, 4, 2). + - ids: Detected marker IDs as int shape (N,) or None if no markers detected. + """ + if image is None: + raise ValueError("Input image is None") + + if len(image.shape) == 3: + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + else: + gray = image + + corners, ids, _ = detector.detectMarkers(gray) + + if ids is not None: + # corners is a list of (1, 4, 2) arrays, reshape to (N, 4, 2) + corners_array = np.array(corners, dtype=np.float32).reshape(-1, 4, 2) + ids_array = ids.flatten().astype(np.int32) + return corners_array, ids_array + else: + return np.empty((0, 4, 2), dtype=np.float32), None + + +def build_camera_matrix_from_zed( + cam: sl.Camera, resizer: Optional[sl.Resolution] = None +) -> np.ndarray: + """ + Builds a 3x3 camera intrinsic matrix from a ZED camera's calibration parameters. + + Args: + cam: The ZED camera object. + resizer: Optional resolution to get parameters for. + + Returns: + np.ndarray: 3x3 float64 camera matrix K. + """ + cam_params = cam.get_camera_information( + resizer + ).camera_configuration.calibration_parameters.left_cam + fx = cam_params.fx + fy = cam_params.fy + cx = cam_params.cx + cy = cam_params.cy + + K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float64) + + return K + + +def estimate_pose_from_detections( + corners: np.ndarray, + ids: np.ndarray, + marker_geometry: Dict[int, np.ndarray], + K: np.ndarray, + dist: Optional[np.ndarray] = None, + min_markers: int = 4, + pnp_flag: int = cv2.SOLVEPNP_SQPNP, +) -> Optional[Tuple[np.ndarray, np.ndarray, float, int]]: + """ + Estimates the camera pose from detected ArUco markers. + + Args: + corners: Detected corners (N, 4, 2). + ids: Detected marker IDs (N,). + marker_geometry: Dictionary mapping marker ID to its 3D corners (4, 3). + K: 3x3 camera matrix. + dist: Optional distortion coefficients. + min_markers: Minimum number of markers required for pose estimation. + pnp_flag: OpenCV solvePnP flag. + + Returns: + Optional[Tuple[np.ndarray, np.ndarray, float, int]]: + (rvec, tvec, reproj_err, n_markers) or None if estimation fails or not enough markers. + """ + if ids is None or len(ids) == 0: + return None + + obj_pts = [] + img_pts = [] + n_markers = 0 + + for i, marker_id in enumerate(ids): + if marker_id in marker_geometry: + obj_pts.append(marker_geometry[marker_id]) + img_pts.append(corners[i]) + n_markers += 1 + + if n_markers < min_markers: + return None + + # Stack points into (M*4, 3) and (M*4, 2) + obj_pts_array = np.vstack(obj_pts).astype(np.float32) + img_pts_array = np.vstack(img_pts).astype(np.float32) + + success, rvec, tvec = cv2.solvePnP( + obj_pts_array, img_pts_array, K, dist, flags=pnp_flag + ) + + if not success: + return None + + reproj_err = compute_reprojection_error( + obj_pts_array, img_pts_array, rvec, tvec, K, dist + ) + + return rvec, tvec, reproj_err, n_markers diff --git a/py_workspace/aruco/find_extrinsic_object.py b/py_workspace/aruco/find_extrinsic_object.py new file mode 100644 index 0000000..66d262b --- /dev/null +++ b/py_workspace/aruco/find_extrinsic_object.py @@ -0,0 +1,173 @@ +from datetime import datetime +from pathlib import Path +from typing import Final, Optional, TypedDict, cast + +import awkward as ak +import cv2 +import numpy as np +from cv2 import aruco +from cv2.typing import MatLike +from jaxtyping import Int, Num +from loguru import logger + +NDArray = np.ndarray +CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet" +# OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet" +OBJECT_POINTS_PARQUET = Path("output") / "standard_box_markers.parquet" +DICTIONARY: Final[int] = aruco.DICT_4X4_50 +# 400mm +MARKER_LENGTH: Final[float] = 0.4 + + +class MarkerFace(TypedDict): + """ + for diamond ArUco markers, N is 4 + """ + + name: str + """ + a label for the face + """ + ids: Int[NDArray, "N"] + """ + ArUco marker ids + """ + corners: Num[NDArray, "N 4 3"] + """ + Corner coordinates in 3D of rectangle, + relative to the world origin + """ + + +def gen(): + API = cv2.CAP_AVFOUNDATION + cap = cv2.VideoCapture(0, API) + while True: + ret, frame = cap.read() + if not ret: + logger.warning("Failed to grab frame") + break + yield frame + + +def main(): + aruco_dict = aruco.getPredefinedDictionary(DICTIONARY) + cal = ak.from_parquet(CALIBRATION_PARQUET)[0] + camera_matrix = cast(MatLike, ak.to_numpy(cal["camera_matrix"])) + distortion_coefficients = cast(MatLike, ak.to_numpy(cal["distortion_coefficients"])) + ops = ak.from_parquet(OBJECT_POINTS_PARQUET) + detector = aruco.ArucoDetector( + dictionary=aruco_dict, detectorParams=aruco.DetectorParameters() + ) + + total_ids = cast(NDArray, ak.to_numpy(ops["ids"])).flatten() + total_corners = cast(NDArray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3) + ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners)) + logger.info("ops_map={}", ops_map) + writer: Optional[cv2.VideoWriter] = None + + for frame in gen(): + grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + # pylint: disable-next=unpacking-non-sequence + markers, ids, rejected = detector.detectMarkers(grey) + # `markers` is [N, 1, 4, 2] + # `ids` is [N, 1] + if ids is not None: + markers = np.reshape(markers, (-1, 4, 2)) + ids = np.reshape(ids, (-1, 1)) + # logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape) + ips_map: dict[int, NDArray] = {} + for cs, id in zip(markers, ids): + id = int(id) + cs = cast(NDArray, cs) + ips_map[id] = cs + center = np.mean(cs, axis=0).astype(int) + GREY = (128, 128, 128) + # logger.info("id={}, center={}", id, center) + cv2.circle(frame, tuple(center), 5, GREY, -1) + cv2.putText( + frame, + str(id), + tuple(center), + cv2.FONT_HERSHEY_SIMPLEX, + 1, + GREY, + 2, + ) + # BGR + RED = (0, 0, 255) + GREEN = (0, 255, 0) + BLUE = (255, 0, 0) + YELLOW = (0, 255, 255) + color_map = [RED, GREEN, BLUE, YELLOW] + for color, corners in zip(color_map, cs): + corners = corners.astype(int) + frame = cv2.circle(frame, corners, 5, color, -1) + # https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e + ops: NDArray = np.empty((0, 3), dtype=np.float32) + ips: NDArray = np.empty((0, 2), dtype=np.float32) + for id, ip in ips_map.items(): + try: + op = ops_map[id] + assert ip.shape == (4, 2), f"corners.shape={ip.shape}" + assert op.shape == (4, 3), f"op.shape={op.shape}" + ops = np.concatenate((ops, op), axis=0) + ips = np.concatenate((ips, ip), axis=0) + except KeyError: + logger.warning("No object points for id={}", id) + continue + assert len(ops) == len(ips), f"len(ops)={len(ops)} != len(ips)={len(ips)}" + if len(ops) > 0: + # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html + # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags + ret, rvec, tvec = cv2.solvePnP( + objectPoints=ops, + imagePoints=ips, + cameraMatrix=camera_matrix, + distCoeffs=distortion_coefficients, + flags=cv2.SOLVEPNP_SQPNP, + ) + # ret, rvec, tvec, inliners = cv2.solvePnPRansac( + # objectPoints=ops, + # imagePoints=ips, + # cameraMatrix=camera_matrix, + # distCoeffs=distortion_coefficients, + # flags=cv2.SOLVEPNP_SQPNP, + # ) + if ret: + cv2.drawFrameAxes( + frame, + camera_matrix, + distortion_coefficients, + rvec, + tvec, + MARKER_LENGTH, + ) + else: + logger.warning("Failed to solvePnPRansac") + cv2.imshow("frame", frame) + if writer is not None: + writer.write(frame) + if (k := cv2.waitKey(1)) == ord("q"): + logger.info("Exiting") + break + elif k == ord("s"): + now = datetime.now().strftime("%Y%m%d%H%M%S") + file_name = f"aruco_{now}.png" + logger.info("Saving to {}", file_name) + cv2.imwrite(file_name, frame) + elif k == ord("r"): + if writer is not None: + writer.release() + writer = None + logger.info("Recording stopped") + else: + now = datetime.now().strftime("%Y%m%d%H%M%S") + file_name = f"aruco_{now}.mp4" + logger.info("Recording to {}", file_name) + fourcc = cv2.VideoWriter.fourcc(*"mp4v") + writer = cv2.VideoWriter(file_name, fourcc, 20.0, frame.shape[:2][::-1]) + + +if __name__ == "__main__": + main() diff --git a/py_workspace/aruco/marker_geometry.py b/py_workspace/aruco/marker_geometry.py new file mode 100644 index 0000000..493604f --- /dev/null +++ b/py_workspace/aruco/marker_geometry.py @@ -0,0 +1,85 @@ +import numpy as np +import awkward as ak +from pathlib import Path +from typing import Union, cast + + +def load_marker_geometry(parquet_path: Union[str, Path]) -> dict[int, np.ndarray]: + """ + Reads ArUco marker geometry from a parquet file. + + The parquet file is expected to have 'ids' and 'corners' columns, + similar to standard_box_markers.parquet. + + Args: + parquet_path: Path to the parquet file. + + Returns: + A dictionary mapping marker IDs to their 3D corner coordinates. + The corners are represented as a (4, 3) float32 numpy array. + """ + path = Path(parquet_path) + if not path.exists(): + raise FileNotFoundError(f"Parquet file not found: {path}") + + ops = ak.from_parquet(path) + + # Extract IDs and corners using logic similar to find_extrinsic_object.py + total_ids = cast(np.ndarray, ak.to_numpy(ops["ids"])).flatten() + total_corners = cast(np.ndarray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3) + + # Create the mapping and ensure coordinates are float32 + marker_geometry = { + int(marker_id): corners.astype(np.float32) + for marker_id, corners in zip(total_ids, total_corners) + } + + return marker_geometry + + +def validate_marker_geometry(geometry: dict[int, np.ndarray]) -> None: + """ + Validates the marker geometry dictionary. + + Checks: + - Dictionary is not empty. + - Every entry has shape (4, 3). + - Every entry contains only finite numbers (no NaN or Inf). + - Coordinates are within a reasonable range (abs(coord) < 100m). + + Args: + geometry: The marker geometry dictionary to validate. + + Raises: + ValueError: If any validation check fails. + """ + if not geometry: + raise ValueError("Marker geometry is empty.") + + for marker_id, corners in geometry.items(): + # Check shape + if corners.shape != (4, 3): + raise ValueError( + f"Invalid shape for marker {marker_id}: " + f"expected (4, 3), got {corners.shape}" + ) + + # Check for non-finite values (NaN, Inf) + if not np.isfinite(corners).all(): + raise ValueError( + f"Marker {marker_id} contains non-finite values (NaN or Inf)." + ) + + # Check for reasonable range (e.g., < 100 meters) + if np.any(np.abs(corners) > 100.0): + raise ValueError( + f"Marker {marker_id} has coordinates exceeding reasonable range (> 100m): " + f"{corners}" + ) + + +def expected_ids(geometry: dict[int, np.ndarray]) -> set[int]: + """ + Returns the set of marker IDs present in the geometry. + """ + return set(geometry.keys()) diff --git a/py_workspace/aruco/output/.DS_Store b/py_workspace/aruco/output/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..34a0105b1bfc58550a5f788cbd78982433f6842c GIT binary patch literal 6148 zcmeHKJ5Iwu5S<|@EYYN-+$*G|KqhidkPAd1QYaER0zpILj+}*qaGhL%Hy^-}MM?^I zBh5bV%)DLs6&{a>=3v$>xb{mu5Fstwp}4o-5ifEZ-@8Wes+B(yT9+Bx&)0z1*iZOpaN9jUn_u~ZB|_d za-{-PfC_vmVBd!VH>?x8K>u`L@eu%MzwvH(_E`d0EC8$%yFf%>8dP9VHAf5$I^rek z>clQE=%Riz&dHlKClvMD5iedYS_g8a0#x8sfl+KH*8k`5ALjp45_eR93j8SrblfbO z8D1%S>)_?A*B1B^{%NT7axC78f!>O-u~vN5t1I@5e4W?@IvsJR1NkFhy3nY=Zz%8q DetIcK literal 0 HcmV?d00001 diff --git a/py_workspace/aruco/output/aruco_2d_uv_coords_normalized.json b/py_workspace/aruco/output/aruco_2d_uv_coords_normalized.json new file mode 100644 index 0000000..a37a2cf --- /dev/null +++ b/py_workspace/aruco/output/aruco_2d_uv_coords_normalized.json @@ -0,0 +1 @@ +[{"id":15,"center":[0.49987878787878787,0.13660606060606062],"corners":[[0.4749090909090909,0.1615757575757576],[0.524969696969697,0.1615757575757576],[0.524969696969697,0.11151515151515157],[0.4749090909090909,0.11151515151515157]]},{"id":13,"center":[0.43636363636363634,0.20012121212121214],"corners":[[0.41139393939393937,0.22509090909090912],[0.46145454545454545,0.22509090909090912],[0.46145454545454545,0.17503030303030298],[0.41139393939393937,0.17503030303030298]]},{"id":11,"center":[0.49987878787878787,0.3366060606060606],"corners":[[0.4749090909090909,0.36157575757575755],[0.524969696969697,0.36157575757575755],[0.524969696969697,0.3115151515151515],[0.4749090909090909,0.3115151515151515]]},{"id":23,"center":[0.29987878787878786,0.3366060606060606],"corners":[[0.27490909090909094,0.36157575757575755],[0.32496969696969696,0.36157575757575755],[0.32496969696969696,0.3115151515151515],[0.27490909090909094,0.3115151515151515]]},{"id":17,"center":[0.7633939393939394,0.4001212121212121],"corners":[[0.7884848484848485,0.37503030303030305],[0.7384242424242424,0.37503030303030305],[0.7384242424242424,0.4250909090909091],[0.7884848484848485,0.4250909090909091]]},{"id":9,"center":[0.43636363636363634,0.4001212121212121],"corners":[[0.41139393939393937,0.4250909090909091],[0.46145454545454545,0.4250909090909091],[0.46145454545454545,0.37503030303030305],[0.41139393939393937,0.37503030303030305]]},{"id":21,"center":[0.23636363636363636,0.4001212121212121],"corners":[[0.21139393939393938,0.4250909090909091],[0.26145454545454544,0.4250909090909091],[0.26145454545454544,0.37503030303030305],[0.21139393939393938,0.37503030303030305]]},{"id":19,"center":[0.6998787878787879,0.4636363636363636],"corners":[[0.7249696969696969,0.43854545454545457],[0.6749090909090909,0.43854545454545457],[0.6749090909090909,0.4886060606060606],[0.7249696969696969,0.4886060606060606]]},{"id":7,"center":[0.49987878787878787,0.5366060606060605],"corners":[[0.4749090909090909,0.5615757575757576],[0.524969696969697,0.5615757575757576],[0.524969696969697,0.5115151515151515],[0.4749090909090909,0.5115151515151515]]},{"id":5,"center":[0.43636363636363634,0.600121212121212],"corners":[[0.41139393939393937,0.6250909090909091],[0.46145454545454545,0.6250909090909091],[0.46145454545454545,0.575030303030303],[0.41139393939393937,0.575030303030303]]},{"id":3,"center":[0.49987878787878787,0.7366060606060606],"corners":[[0.4749090909090909,0.7615757575757576],[0.524969696969697,0.7615757575757576],[0.524969696969697,0.7115151515151514],[0.4749090909090909,0.7115151515151514]]},{"id":1,"center":[0.43636363636363634,0.8001212121212121],"corners":[[0.41139393939393937,0.8250909090909091],[0.46145454545454545,0.8250909090909091],[0.46145454545454545,0.7750303030303031],[0.41139393939393937,0.7750303030303031]]},{"id":14,"center":[0.5632727272727273,0.20012121212121214],"corners":[[0.5384242424242425,0.22509090909090912],[0.5883636363636363,0.22509090909090912],[0.5883636363636363,0.17503030303030298],[0.5383030303030303,0.17503030303030298]]},{"id":12,"center":[0.49987878787878787,0.2635151515151515],"corners":[[0.4749090909090909,0.28848484848484846],[0.524969696969697,0.28848484848484846],[0.5248484848484849,0.23842424242424243],[0.4749090909090909,0.23842424242424243]]},{"id":16,"center":[0.6998787878787879,0.33672727272727276],"corners":[[0.7249696969696969,0.3116363636363636],[0.6749090909090909,0.3116363636363636],[0.675030303030303,0.36169696969696974],[0.7249696969696969,0.36169696969696974]]},{"id":18,"center":[0.6364848484848484,0.4001212121212121],"corners":[[0.6614545454545454,0.37503030303030305],[0.6115151515151516,0.37503030303030305],[0.6115151515151516,0.4250909090909091],[0.6615757575757576,0.4250909090909091]]},{"id":10,"center":[0.5632727272727273,0.4001212121212121],"corners":[[0.5384242424242425,0.4250909090909091],[0.5883636363636363,0.4250909090909091],[0.5883636363636363,0.37503030303030305],[0.5383030303030303,0.37503030303030305]]},{"id":22,"center":[0.36327272727272725,0.4001212121212121],"corners":[[0.3384242424242424,0.4250909090909091],[0.38836363636363636,0.4250909090909091],[0.38836363636363636,0.37503030303030305],[0.3383030303030303,0.37503030303030305]]},{"id":8,"center":[0.49987878787878787,0.46351515151515155],"corners":[[0.4749090909090909,0.4884848484848485],[0.524969696969697,0.4884848484848485],[0.5248484848484849,0.4384242424242424],[0.4749090909090909,0.4384242424242424]]},{"id":20,"center":[0.29987878787878786,0.46351515151515155],"corners":[[0.27490909090909094,0.4884848484848485],[0.32496969696969696,0.4884848484848485],[0.32484848484848483,0.4384242424242424],[0.27490909090909094,0.4384242424242424]]},{"id":6,"center":[0.5632727272727273,0.600121212121212],"corners":[[0.5384242424242425,0.6250909090909091],[0.5883636363636363,0.6250909090909091],[0.5883636363636363,0.575030303030303],[0.5383030303030303,0.575030303030303]]},{"id":4,"center":[0.49987878787878787,0.6635151515151515],"corners":[[0.4749090909090909,0.6884848484848485],[0.524969696969697,0.6884848484848485],[0.5248484848484849,0.6384242424242424],[0.4749090909090909,0.6384242424242424]]},{"id":2,"center":[0.5632727272727273,0.8001212121212121],"corners":[[0.5384242424242425,0.8250909090909091],[0.5883636363636363,0.8250909090909091],[0.5883636363636363,0.7750303030303031],[0.5383030303030303,0.7750303030303031]]},{"id":0,"center":[0.49987878787878787,0.8635151515151516],"corners":[[0.4749090909090909,0.8884848484848484],[0.524969696969697,0.8884848484848484],[0.5248484848484849,0.8384242424242424],[0.4749090909090909,0.8384242424242424]]}] \ No newline at end of file diff --git a/py_workspace/aruco/output/aruco_3d_coords.json b/py_workspace/aruco/output/aruco_3d_coords.json new file mode 100644 index 0000000..a37a2cf --- /dev/null +++ b/py_workspace/aruco/output/aruco_3d_coords.json @@ -0,0 +1 @@ +[{"id":15,"center":[0.49987878787878787,0.13660606060606062],"corners":[[0.4749090909090909,0.1615757575757576],[0.524969696969697,0.1615757575757576],[0.524969696969697,0.11151515151515157],[0.4749090909090909,0.11151515151515157]]},{"id":13,"center":[0.43636363636363634,0.20012121212121214],"corners":[[0.41139393939393937,0.22509090909090912],[0.46145454545454545,0.22509090909090912],[0.46145454545454545,0.17503030303030298],[0.41139393939393937,0.17503030303030298]]},{"id":11,"center":[0.49987878787878787,0.3366060606060606],"corners":[[0.4749090909090909,0.36157575757575755],[0.524969696969697,0.36157575757575755],[0.524969696969697,0.3115151515151515],[0.4749090909090909,0.3115151515151515]]},{"id":23,"center":[0.29987878787878786,0.3366060606060606],"corners":[[0.27490909090909094,0.36157575757575755],[0.32496969696969696,0.36157575757575755],[0.32496969696969696,0.3115151515151515],[0.27490909090909094,0.3115151515151515]]},{"id":17,"center":[0.7633939393939394,0.4001212121212121],"corners":[[0.7884848484848485,0.37503030303030305],[0.7384242424242424,0.37503030303030305],[0.7384242424242424,0.4250909090909091],[0.7884848484848485,0.4250909090909091]]},{"id":9,"center":[0.43636363636363634,0.4001212121212121],"corners":[[0.41139393939393937,0.4250909090909091],[0.46145454545454545,0.4250909090909091],[0.46145454545454545,0.37503030303030305],[0.41139393939393937,0.37503030303030305]]},{"id":21,"center":[0.23636363636363636,0.4001212121212121],"corners":[[0.21139393939393938,0.4250909090909091],[0.26145454545454544,0.4250909090909091],[0.26145454545454544,0.37503030303030305],[0.21139393939393938,0.37503030303030305]]},{"id":19,"center":[0.6998787878787879,0.4636363636363636],"corners":[[0.7249696969696969,0.43854545454545457],[0.6749090909090909,0.43854545454545457],[0.6749090909090909,0.4886060606060606],[0.7249696969696969,0.4886060606060606]]},{"id":7,"center":[0.49987878787878787,0.5366060606060605],"corners":[[0.4749090909090909,0.5615757575757576],[0.524969696969697,0.5615757575757576],[0.524969696969697,0.5115151515151515],[0.4749090909090909,0.5115151515151515]]},{"id":5,"center":[0.43636363636363634,0.600121212121212],"corners":[[0.41139393939393937,0.6250909090909091],[0.46145454545454545,0.6250909090909091],[0.46145454545454545,0.575030303030303],[0.41139393939393937,0.575030303030303]]},{"id":3,"center":[0.49987878787878787,0.7366060606060606],"corners":[[0.4749090909090909,0.7615757575757576],[0.524969696969697,0.7615757575757576],[0.524969696969697,0.7115151515151514],[0.4749090909090909,0.7115151515151514]]},{"id":1,"center":[0.43636363636363634,0.8001212121212121],"corners":[[0.41139393939393937,0.8250909090909091],[0.46145454545454545,0.8250909090909091],[0.46145454545454545,0.7750303030303031],[0.41139393939393937,0.7750303030303031]]},{"id":14,"center":[0.5632727272727273,0.20012121212121214],"corners":[[0.5384242424242425,0.22509090909090912],[0.5883636363636363,0.22509090909090912],[0.5883636363636363,0.17503030303030298],[0.5383030303030303,0.17503030303030298]]},{"id":12,"center":[0.49987878787878787,0.2635151515151515],"corners":[[0.4749090909090909,0.28848484848484846],[0.524969696969697,0.28848484848484846],[0.5248484848484849,0.23842424242424243],[0.4749090909090909,0.23842424242424243]]},{"id":16,"center":[0.6998787878787879,0.33672727272727276],"corners":[[0.7249696969696969,0.3116363636363636],[0.6749090909090909,0.3116363636363636],[0.675030303030303,0.36169696969696974],[0.7249696969696969,0.36169696969696974]]},{"id":18,"center":[0.6364848484848484,0.4001212121212121],"corners":[[0.6614545454545454,0.37503030303030305],[0.6115151515151516,0.37503030303030305],[0.6115151515151516,0.4250909090909091],[0.6615757575757576,0.4250909090909091]]},{"id":10,"center":[0.5632727272727273,0.4001212121212121],"corners":[[0.5384242424242425,0.4250909090909091],[0.5883636363636363,0.4250909090909091],[0.5883636363636363,0.37503030303030305],[0.5383030303030303,0.37503030303030305]]},{"id":22,"center":[0.36327272727272725,0.4001212121212121],"corners":[[0.3384242424242424,0.4250909090909091],[0.38836363636363636,0.4250909090909091],[0.38836363636363636,0.37503030303030305],[0.3383030303030303,0.37503030303030305]]},{"id":8,"center":[0.49987878787878787,0.46351515151515155],"corners":[[0.4749090909090909,0.4884848484848485],[0.524969696969697,0.4884848484848485],[0.5248484848484849,0.4384242424242424],[0.4749090909090909,0.4384242424242424]]},{"id":20,"center":[0.29987878787878786,0.46351515151515155],"corners":[[0.27490909090909094,0.4884848484848485],[0.32496969696969696,0.4884848484848485],[0.32484848484848483,0.4384242424242424],[0.27490909090909094,0.4384242424242424]]},{"id":6,"center":[0.5632727272727273,0.600121212121212],"corners":[[0.5384242424242425,0.6250909090909091],[0.5883636363636363,0.6250909090909091],[0.5883636363636363,0.575030303030303],[0.5383030303030303,0.575030303030303]]},{"id":4,"center":[0.49987878787878787,0.6635151515151515],"corners":[[0.4749090909090909,0.6884848484848485],[0.524969696969697,0.6884848484848485],[0.5248484848484849,0.6384242424242424],[0.4749090909090909,0.6384242424242424]]},{"id":2,"center":[0.5632727272727273,0.8001212121212121],"corners":[[0.5384242424242425,0.8250909090909091],[0.5883636363636363,0.8250909090909091],[0.5883636363636363,0.7750303030303031],[0.5383030303030303,0.7750303030303031]]},{"id":0,"center":[0.49987878787878787,0.8635151515151516],"corners":[[0.4749090909090909,0.8884848484848484],[0.524969696969697,0.8884848484848484],[0.5248484848484849,0.8384242424242424],[0.4749090909090909,0.8384242424242424]]}] \ No newline at end of file diff --git a/py_workspace/aruco/output/object_points.parquet b/py_workspace/aruco/output/object_points.parquet new file mode 100644 index 0000000000000000000000000000000000000000..d6ed3d72d9a3eab7249373d86fd85e6b6a241d65 GIT binary patch literal 4617 zcmeHLeQZF_&$TK!erzl%OwgjF(;)F-cl4mFWO5!+nVn4qwC4SG2zn&fA zk2rQ$wzkkR(XDIO4TVV)U71w2Nkti@L`$_IwiR`4p;Rp}q>(?`L5sHTLuz1Er=5Ep zJ530qh=2M=pOUA0@3|l6oZmg?cb*-LQ%w@&S4dS0DT9WLl-0HoO%g#N@eu^k^aAxs z-I^RhNTB0GFCM)FnE7W3xm(ggDi#(gD!&W@^e8H;3k^a}G_5cSLcDG4bqR4)S_a*U zzkf-J7D}p#wZuANJyAoDuy_NZAe6*LqPX!rHAI<0cwGHQ#jncX)VX!!++8i?L1{^Y z_dh=Dp^u*LR8$Hh?996gaTaNCnsAZ;8`YQ|P?ac;yo2S34e_!%-IJ`|?O~ z;rgz#&zuxbiqqkWnrrr5KYRUKvoL%j{i8D*gKw}e{>Q7APR{6N=>789H|JfCL}q{A zd3RCoGvn8ue>!*g+uy1Cn*Wu}d(h5T_E#8AJfuG>Ir-TBbMZ47{miCEHdSud>(9)) zzB>Be%b;WD;Z4r#f4UmipS}0e%pZ=?KE37A!1woDeFj0>s(A~nzW+JjfvzUa;pe(86J<|6^|!U7wUS2-^@qA%ZF+m- z5BuLKKh}9MqWa|2k-zeUF=kQnBU! z_KhFCSohJ|LmMpTnrk+^-L<#jdc~oizVL(K8?`_2pHp{euU-h$Q9thpQSH+sN4RGd zPwm?NY|UePepx1;KmO<{&u?CuZ}@e`zu)=bp{h01do2}Yx$-p~p^&A7P$1?bWdvE( zKq$)NzF44v=&J>&L0T5}r${L&>(irtxuRSQr_!X9Aj{F*a5@k}v+feclkq@u#hD<+ z^H$pB|H;Tzr4!^bWvx=*NYs*5$_5#sEGS7<$z}2(`B-D6z7(FyTDbWH=tmG}dO4oH z6&ZmQFDOzaSIEyxWEAo^4F z+v@Hcd9bDKf%;?sK=SVhr*@vCDhJ>)S&1&XCyu~h#xw64P&gSAg_6Z^8Gzf89DBwbJotigK((E8(N9TDxsW!oE z%6UC%(QkI=1gv^LNDL|iwA--4Sl^w^4vIo+Oipa{I1(Ar05}7^(aVC&W(3ql`eiuQ zmS-3%5^Y5QLwAv@8Y0WIKTwB;83KIUWX zV}6S}%P-UAmiP@aUTm4$<{DB9Z5$g?aUNYB-HU-3!pao&vd$1T-;xJ^QsL&Y&x#NB zZvya6Fw6tIX>OPNS`n80T1^LA%FUs0VL$_TgT5W@UchQ?i?ScAL;W2J^;u- z9{fqWsD=E-OyUF+UCeK|kDyz~Un0b!3ip8m7^b_J@1g0)Q&eF+&i5#XGG>$3r}aZF z8APN3%wTd2M`?B-=5B=;%m^BBf{!FJC0`qM8?zstx5&L%TJm4hfnxN?OV})lxIVn*`)GiRFyGxCK4vlR@1bK{+%Eci zOgZ2goi&^d7BODJ$+;qLe;Vr>xTzg+zBO(Wy&5{lndpR|rd58{kXm+EmT;4xv4?oz zf5cr}%#;!|Zo1&~oA57io!8?G`OSrL#)JIX-VUh*)4LJhZ(|U-1(QDRBS+zSh+%xd zrFl{ z2!E)Tc#kRX1Ai2`pO3p!yb<|ZH^G_Ri!o)i4fs9kY+(3vh6xG=cviN5Sv>%kz8lAn zP^izaJ4#5n5Wd*?PvxP|HI-THu$@nlHgC;Qzrw zK4P|SClqz?owPF(NN*MriOs%bay(l%0pCpUtyHI`x2U$LlzRB!E&LJ6H3Z>+AA$Y{ DBHg2R literal 0 HcmV?d00001 diff --git a/py_workspace/aruco/output/standard_box_markers.parquet b/py_workspace/aruco/output/standard_box_markers.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8c4d3757bcd9ebcbe2d20087c08327e9c687ca13 GIT binary patch literal 5363 zcmeHLX;>528XgvbD%6dH(DEp?idK+pfJUL?B#;dVAxJjctH~rGAqz1HB!uhQTJdTt zRx3+;Tcs6H)GA7?wHmBevF(+(w9TWo)vi|43ogBiwQ|py2o$8-l zJMTN^d%q~gBy`j=IF* zC^3h^7O;kZCvY7G9y(iz#Q_^OLy_JA9ZXuC15Ei!2&pY7iA1|MJ0&6p*x*Hp2s{yl zj7%9~6j+I+PaFtF7)K&w+KdnPz{^unQEhS_FRKlZWJ0EoG;D=q^Qs&Q=NwdwyVHt zjq=1W_6lF}kLu(bpxgcZ)scLi&r`Vt>EPBMd9Hi&EWc;xSxrxE18BS%d_I>sR`bU4 zqo;SgG%s&c{9fIiiK_7vY>h4NfvBG9xFeRk8rwTi zd%RH%-Cp3%tZ5p(w|UAx_k8nk!Mm+B3f_zLT`wB{}esA9KI-jTd$ZU^goGH(fFO0p^;d*_1q#IzUzYzWysj{HcgI^PQR*;A@3C4SHQ_-EskqrBV3ecXBSgwJy%GrDPWGk`j)X!){*9|F*-U9;_` z%jmeS4 z&!3n~zYEA&+!R#nolzh6_@Og3;TM9}7HkO_GkbP~c=fnOdeEjs>xo@;X!hKL zYm&yF;~%|o@z&2<7u=>*oJl?%zQB9q_VS4ydBbS&UdzdS5tpy;fAJgj5r3VklNtV| z=lqGc=*Y3P+n4P7BIfHYG0asp&Wox`md=*>*FTQxh*C2@KlR}!m8~(~FEc(nYHk#t z;_GUNer{?0w567FJ?i;8o~oPwZhHvzKZg;6q)z{XbLZvkN9OjdUcjuBzOP-5FRsY_ zs)C|EHQ)2;lLsG4T>sXrRZZ1Dwtr&I*m7XFyEVBz?X#7&Wbvb2BO7*J`?`Mjo;p%! z>_@2xCCrZNwPqD53?W5MLMWjYm03Fp$%z9w30Ol68apYN6qb_?`=OLjlfmvF1tX+T z=xcCj&Cn|xFR@v)wjn)%neZEGmfy0GhWD*Nj*N|qO;14LNRhFV!jRZNkfca*B)K7o z{B}@6MEXDpij4!g83AD;2z)D)cxwjS3lV=Ho=7r{To+0{7@W`+l>R3~X3*rriOF5P z5cDrFR13u17D{ePAa59(a5n_33L*s8ACQh;tV+I=NRCX1Ag_*zMv&MfN|eet)2gxo z-W-4fO5WUx2_;sC0jO)aRXa~-(3&*%2{Ys8KD{t0zG8w+3&_&UGuY?Zag|AB0~Vdi zWY-c#31P%+HdQ$ox0ISp3!g?Pk(fl1Cz~1gP$+N5QsI??2{8=I2mj!ejbSMmy9ncA zSP{l4BF5P40d^-G%<8iXh*v=$TtpMzICd78THZIv7Q?2DX*Z~uY??~OD%J2!PNkG( zR>`DxB|q1YXJD)4a++Phw;OP?v{=m)8s!X=Th0^}D`YM`ZV;picuKvRFEZ&0oJHDl zVS!R6D#OhZHf|B>wFS_gN>}q-EO|aAg6k<{_B5QwEmz3sCJj$ojuWBh0t=hQ2JJK= zU?ScHiDD`hzQdIen=XMS4jcerz&pJ0;kGdxc42&_O1aRDVFGBH0dIs8uQ-KDol)4= z5BTdf3QD=(6HWiCAe?x|Q-ijO+qV z?;LDB4%d@QOt^)>fACyR5U*Y>lhWWm;hb6~;D;kDzlXa{B^T*5e5q4CD3;&lFCSA7 zJCh0v=FssprCd)_%2M6%yT~&`uDC{pT%sqo&vyensfI+|ALWO@*9!P{5bqD>Cp$Cs z!THH7gz@~%4?s9)4#ZQH2uOl>|Ni;;9ezZ`X%Eg1mJ8$gn;$?(o*UrFDxluWZ?_mzbcwDB6sR6kpABqN025v#2 zktNSFOH;rZEX5h7GIg=F6y{)P#Np+ zokOpc z&{B2aI1#0Tm@vTsb_%tON)7lE4t!`*$Tc<%gIlh|Frr?_A5srffd&(1%qE^rrKS;+ z5&;?^|Daw{%eZb8z@u0D)fTB;&4GBQmMMAC{xii9=4oVfr?&7f#tA$O;2L`WgX#hB z(Pt9p2M^R|1RQ-#6hM9n@Il>`32lHaU@airU{^x$A^7$D*N-=GzOo3u2)x}Ohdyxr zgYoy)&oHQ*{`_i#{XfTNE~vq1a~Mi9__k^Ja$&hbZZr_}VBnj$uZA1^ZQOSj!TNGK zOHb6)QkIUWB`qQvZZ?%F%Y`gB_rUs2LdSV7qUP+Id*C^BD;Qk6LN0*ysu|SspdJnA z%OA~`02P3Lejp!V85syA9{f&PP^xuI!>!h7Dx0mu8D9o|GlAbq@pM))EtwXZ4u1Fs P{*cE;B8V9Lcjx~B+B-Jf literal 0 HcmV?d00001 diff --git a/py_workspace/aruco/pose_averaging.py b/py_workspace/aruco/pose_averaging.py new file mode 100644 index 0000000..ca8faa9 --- /dev/null +++ b/py_workspace/aruco/pose_averaging.py @@ -0,0 +1,242 @@ +import numpy as np +from typing import List, Dict, Tuple, Optional, Any + +try: + from scipy.spatial.transform import Rotation as R + + HAS_SCIPY = True +except ImportError: + HAS_SCIPY = False + + +def rotation_angle_deg(Ra: np.ndarray, Rb: np.ndarray) -> float: + """ + Compute the rotation angle in degrees between two rotation matrices. + + Args: + Ra: 3x3 rotation matrix. + Rb: 3x3 rotation matrix. + + Returns: + Angle in degrees. + """ + R_diff = Ra.T @ Rb + cos_theta = (np.trace(R_diff) - 1.0) / 2.0 + # Clip to avoid numerical issues + cos_theta = np.clip(cos_theta, -1.0, 1.0) + return np.degrees(np.arccos(cos_theta)) + + +def matrix_to_quaternion(R_mat: np.ndarray) -> np.ndarray: + """ + Convert a 3x3 rotation matrix to a quaternion (w, x, y, z). + Uses the method from "Unit Quaternions and Rotation Matrices" by Mike Day. + """ + tr = np.trace(R_mat) + if tr > 0: + S = np.sqrt(tr + 1.0) * 2 + qw = 0.25 * S + qx = (R_mat[2, 1] - R_mat[1, 2]) / S + qy = (R_mat[0, 2] - R_mat[2, 0]) / S + qz = (R_mat[1, 0] - R_mat[0, 1]) / S + elif (R_mat[0, 0] > R_mat[1, 1]) and (R_mat[0, 0] > R_mat[2, 2]): + S = np.sqrt(1.0 + R_mat[0, 0] - R_mat[1, 1] - R_mat[2, 2]) * 2 + qw = (R_mat[2, 1] - R_mat[1, 2]) / S + qx = 0.25 * S + qy = (R_mat[0, 1] + R_mat[1, 0]) / S + qz = (R_mat[0, 2] + R_mat[2, 0]) / S + elif R_mat[1, 1] > R_mat[2, 2]: + S = np.sqrt(1.0 + R_mat[1, 1] - R_mat[0, 0] - R_mat[2, 2]) * 2 + qw = (R_mat[0, 2] - R_mat[2, 0]) / S + qx = (R_mat[0, 1] + R_mat[1, 0]) / S + qy = 0.25 * S + qz = (R_mat[1, 2] + R_mat[2, 1]) / S + else: + S = np.sqrt(1.0 + R_mat[2, 2] - R_mat[0, 0] - R_mat[1, 1]) * 2 + qw = (R_mat[1, 0] - R_mat[0, 1]) / S + qx = (R_mat[0, 2] + R_mat[2, 0]) / S + qy = (R_mat[1, 2] + R_mat[2, 1]) / S + qz = 0.25 * S + return np.array([qw, qx, qy, qz]) + + +def quaternion_to_matrix(q: np.ndarray) -> np.ndarray: + """ + Convert a quaternion (w, x, y, z) to a 3x3 rotation matrix. + """ + qw, qx, qy, qz = q + return np.array( + [ + [ + 1 - 2 * qy**2 - 2 * qz**2, + 2 * qx * qy - 2 * qz * qw, + 2 * qx * qz + 2 * qy * qw, + ], + [ + 2 * qx * qy + 2 * qz * qw, + 1 - 2 * qx**2 - 2 * qz**2, + 2 * qy * qz - 2 * qx * qw, + ], + [ + 2 * qx * qz - 2 * qy * qw, + 2 * qy * qz + 2 * qx * qw, + 1 - 2 * qx**2 - 2 * qy**2, + ], + ] + ) + + +class PoseAccumulator: + """ + Accumulates 4x4 poses and provides robust averaging methods. + """ + + def __init__(self): + self.poses: List[np.ndarray] = [] + self.reproj_errors: List[float] = [] + self.frame_ids: List[int] = [] + + def add_pose(self, T: np.ndarray, reproj_error: float, frame_id: int) -> None: + """ + Add a 4x4 pose to the accumulator. + + Args: + T: 4x4 transformation matrix. + reproj_error: Reprojection error associated with the pose. + frame_id: ID of the frame where the pose was detected. + """ + if not isinstance(T, np.ndarray) or T.shape != (4, 4): + raise ValueError("T must be a 4x4 numpy array") + + self.poses.append(T.copy()) + self.reproj_errors.append(float(reproj_error)) + self.frame_ids.append(int(frame_id)) + + def filter_by_reproj(self, max_reproj_error: float) -> List[int]: + """ + Return indices of poses with reprojection error below threshold. + """ + return [ + i for i, err in enumerate(self.reproj_errors) if err <= max_reproj_error + ] + + def ransac_filter( + self, + rot_thresh_deg: float = 5.0, + trans_thresh_m: float = 0.05, + max_iters: int = 200, + random_seed: int = 0, + ) -> List[int]: + """ + Perform RANSAC to find the largest consensus set of poses. + + Args: + rot_thresh_deg: Rotation threshold in degrees. + trans_thresh_m: Translation threshold in meters. + max_iters: Maximum number of RANSAC iterations. + random_seed: Seed for reproducibility. + + Returns: + Indices of the inlier poses. + """ + n = len(self.poses) + if n == 0: + return [] + if n == 1: + return [0] + + rng = np.random.default_rng(random_seed) + best_inliers = [] + + for _ in range(max_iters): + # Pick a random reference pose + ref_idx = rng.integers(0, n) + T_ref = self.poses[ref_idx] + R_ref = T_ref[:3, :3] + t_ref = T_ref[:3, 3] + + current_inliers = [] + for i in range(n): + T_i = self.poses[i] + R_i = T_i[:3, :3] + t_i = T_i[:3, 3] + + # Check rotation + d_rot = rotation_angle_deg(R_ref, R_i) + # Check translation + d_trans = np.linalg.norm(t_ref - t_i) + + if d_rot <= rot_thresh_deg and d_trans <= trans_thresh_m: + current_inliers.append(i) + + if len(current_inliers) > len(best_inliers): + best_inliers = current_inliers + if len(best_inliers) == n: + break + + return best_inliers + + def compute_robust_mean( + self, inlier_indices: Optional[List[int]] = None + ) -> Tuple[np.ndarray, Dict[str, Any]]: + """ + Compute the robust mean of the accumulated poses. + + Args: + inlier_indices: Optional list of indices to use. If None, uses all poses. + + Returns: + T_mean: 4x4 averaged transformation matrix. + stats: Dictionary with statistics. + """ + if inlier_indices is None: + inlier_indices = list(range(len(self.poses))) + + n_total = len(self.poses) + n_inliers = len(inlier_indices) + + if n_inliers == 0: + return np.eye(4), { + "n_total": n_total, + "n_inliers": 0, + "median_reproj_error": 0.0, + } + + inlier_poses = [self.poses[i] for i in inlier_indices] + inlier_errors = [self.reproj_errors[i] for i in inlier_indices] + + # Translation: median + translations = np.array([T[:3, 3] for T in inlier_poses]) + t_mean = np.median(translations, axis=0) + + # Rotation: mean + rotations = [T[:3, :3] for T in inlier_poses] + if HAS_SCIPY: + r_objs = R.from_matrix(rotations) + R_mean = r_objs.mean().as_matrix() + else: + # Quaternion eigen mean fallback + quats = np.array([matrix_to_quaternion(rot) for rot in rotations]) + # Form the matrix M = sum(q_i * q_i^T) + M = np.zeros((4, 4)) + for i in range(n_inliers): + q = quats[i] + M += np.outer(q, q) + # The average quaternion is the eigenvector corresponding to the largest eigenvalue + evals, evecs = np.linalg.eigh(M) + q_mean = evecs[:, -1] + R_mean = quaternion_to_matrix(q_mean) + + T_mean = np.eye(4) + T_mean[:3, :3] = R_mean + T_mean[:3, 3] = t_mean + + stats = { + "n_total": n_total, + "n_inliers": n_inliers, + "median_reproj_error": float(np.median(inlier_errors)) + if inlier_errors + else 0.0, + } + + return T_mean, stats diff --git a/py_workspace/aruco/pose_math.py b/py_workspace/aruco/pose_math.py new file mode 100644 index 0000000..aca964f --- /dev/null +++ b/py_workspace/aruco/pose_math.py @@ -0,0 +1,144 @@ +import cv2 +import numpy as np +from typing import Optional, Tuple + + +def rvec_tvec_to_matrix(rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray: + """ + Converts rotation vector and translation vector to a 4x4 homogeneous transformation matrix. + + For solvePnP, rvec and tvec typically represent the transformation from object space + to camera space (T_camera_object). + + Args: + rvec: Rotation vector of shape (3,), (3, 1), or (1, 3). + tvec: Translation vector of shape (3,), (3, 1), or (1, 3). + + Returns: + np.ndarray: 4x4 homogeneous transformation matrix. + + Raises: + ValueError: If inputs have incorrect shapes. + """ + rvec = np.asarray(rvec, dtype=np.float64).flatten() + tvec = np.asarray(tvec, dtype=np.float64).flatten() + + if rvec.shape != (3,) or tvec.shape != (3,): + raise ValueError( + f"rvec and tvec must have 3 elements. Got rvec {rvec.shape} and tvec {tvec.shape}" + ) + + R, _ = cv2.Rodrigues(rvec) + T = np.eye(4, dtype=np.float64) + T[:3, :3] = R + T[:3, 3] = tvec + return T + + +def matrix_to_rvec_tvec(T: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Converts a 4x4 homogeneous transformation matrix to rotation and translation vectors. + + Args: + T: 4x4 homogeneous transformation matrix. + + Returns: + Tuple[np.ndarray, np.ndarray]: (rvec, tvec) both of shape (3,). + + Raises: + ValueError: If T is not a 4x4 matrix. + """ + T = np.asarray(T, dtype=np.float64) + if T.shape != (4, 4): + raise ValueError(f"Transformation matrix must be 4x4. Got {T.shape}") + + R = T[:3, :3] + tvec = T[:3, 3] + rvec, _ = cv2.Rodrigues(R) + return rvec.flatten(), tvec.flatten() + + +def invert_transform(T: np.ndarray) -> np.ndarray: + """ + Inverts a 4x4 homogeneous transformation matrix. + Uses the property that the inverse of [R | t; 0 | 1] is [R^T | -R^T * t; 0 | 1]. + + Args: + T: 4x4 homogeneous transformation matrix. + + Returns: + np.ndarray: Inverted 4x4 transformation matrix. + + Raises: + ValueError: If T is not a 4x4 matrix. + """ + T = np.asarray(T, dtype=np.float64) + if T.shape != (4, 4): + raise ValueError(f"Transformation matrix must be 4x4. Got {T.shape}") + + R = T[:3, :3] + t = T[:3, 3] + + T_inv = np.eye(4, dtype=np.float64) + R_inv = R.T + T_inv[:3, :3] = R_inv + T_inv[:3, 3] = -R_inv @ t + return T_inv + + +def compose_transforms(T1: np.ndarray, T2: np.ndarray) -> np.ndarray: + """ + Multiplies two 4x4 homogeneous transformation matrices. + + Args: + T1: First 4x4 transformation matrix. + T2: Second 4x4 transformation matrix. + + Returns: + np.ndarray: Result of T1 @ T2. + """ + return np.asarray(T1, dtype=np.float64) @ np.asarray(T2, dtype=np.float64) + + +def compute_reprojection_error( + obj_pts: np.ndarray, + img_pts: np.ndarray, + rvec: np.ndarray, + tvec: np.ndarray, + K: np.ndarray, + dist: Optional[np.ndarray] = None, +) -> float: + """ + Computes the mean Euclidean pixel error using cv2.projectPoints. + + Args: + obj_pts: Object points in 3D (N, 3). + img_pts: Observed image points (N, 2). + rvec: Rotation vector. + tvec: Translation vector. + K: 3x3 Camera intrinsic matrix. + dist: Optional lens distortion coefficients. + + Returns: + float: Mean Euclidean reprojection error. + + Raises: + ValueError: If point counts do not match. + """ + obj_pts = np.asarray(obj_pts, dtype=np.float64) + img_pts = np.asarray(img_pts, dtype=np.float64) + + if len(obj_pts) != len(img_pts): + raise ValueError( + f"Number of object points ({len(obj_pts)}) and image points ({len(img_pts)}) must match." + ) + + if len(obj_pts) == 0: + return 0.0 + + projected_pts, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, dist) + projected_pts = projected_pts.reshape(-1, 2) + img_pts = img_pts.reshape(-1, 2) + + errors = np.linalg.norm(projected_pts - img_pts, axis=1) + return float(np.mean(errors)) diff --git a/py_workspace/aruco/preview.py b/py_workspace/aruco/preview.py new file mode 100644 index 0000000..5932178 --- /dev/null +++ b/py_workspace/aruco/preview.py @@ -0,0 +1,93 @@ +import cv2 +import numpy as np + + +def draw_detected_markers( + image: np.ndarray, corners: list[np.ndarray] | np.ndarray, ids: np.ndarray | None +) -> np.ndarray: + """ + Draws ArUco markers outlines and IDs on the image. + + Args: + image: Input image (BGR or grayscale). + corners: List of marker corners or np.ndarray. + ids: Array of marker IDs. + + Returns: + Image with markers drawn. + """ + if image.ndim == 2: + image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) + + if corners is not None and len(corners) > 0: + # Draw outlines + # corners can be list of (1, 4, 2) or np.ndarray of (N, 4, 2) + int_corners = [c.astype(np.int32).reshape((-1, 1, 2)) for c in corners] + cv2.polylines(image, int_corners, True, (0, 255, 0), 2) + + if ids is not None: + # Flatten ids to handle (N,) or (N, 1) + flat_ids = ids.flatten() + for i, corner in enumerate(corners): + # Put ID near the first corner + # corner can be (1, 4, 2) or (4, 2) + c = corner.reshape((-1, 2)) + pos = c[0].astype(int) + cv2.putText( + image, + str(flat_ids[i]), + tuple(pos), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 0, 255), + 2, + ) + return image + + +def draw_pose_axes( + image: np.ndarray, + rvec: np.ndarray, + tvec: np.ndarray, + K: np.ndarray, + dist: np.ndarray | None = None, + length: float = 0.1, +) -> np.ndarray: + """ + Draws pose axes on the image. + + Args: + image: Input image. + rvec: Rotation vector. + tvec: Translation vector. + K: Camera intrinsic matrix. + dist: Distortion coefficients. + length: Length of the axes. + + Returns: + Image with axes drawn. + """ + if image.ndim == 2: + image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) + + cv2.drawFrameAxes(image, K, dist, rvec, tvec, length) + return image + + +def show_preview( + frames: dict[int, np.ndarray], *, wait_ms: int = 1, window_prefix: str = "View " +) -> int: + """ + Shows multiple camera frames in separate windows. + + Args: + frames: Dictionary mapping camera serials to images. + wait_ms: Time to wait for a key press in milliseconds. + window_prefix: Prefix for the window names. + + Returns: + The key code pressed, or -1 if no key was pressed. + """ + for serial, frame in frames.items(): + cv2.imshow(f"{window_prefix}{serial}", frame) + return cv2.waitKey(wait_ms) diff --git a/py_workspace/aruco/svo_sync.py b/py_workspace/aruco/svo_sync.py new file mode 100644 index 0000000..78864a3 --- /dev/null +++ b/py_workspace/aruco/svo_sync.py @@ -0,0 +1,163 @@ +import pyzed.sl as sl +import numpy as np +from dataclasses import dataclass +from typing import List, Optional, Dict +import os + + +@dataclass +class FrameData: + """Data structure for a single frame from an SVO.""" + + image: np.ndarray + timestamp_ns: int + frame_index: int + serial_number: int + + +class SVOReader: + """Handles synchronized playback of multiple SVO files.""" + + def __init__(self, svo_paths: List[str]): + self.svo_paths = svo_paths + self.cameras: List[sl.Camera] = [] + self.camera_info: List[Dict] = [] + self.runtime_params = sl.RuntimeParameters() + + for path in svo_paths: + if not os.path.exists(path): + print(f"Warning: SVO file not found: {path}") + continue + + init_params = sl.InitParameters() + init_params.set_from_svo_file(path) + init_params.svo_real_time_mode = False + init_params.depth_mode = sl.DEPTH_MODE.NONE + + cam = sl.Camera() + status = cam.open(init_params) + if status != sl.ERROR_CODE.SUCCESS: + print(f"Error: Could not open {path}: {status}") + continue + + info = cam.get_camera_information() + self.cameras.append(cam) + self.camera_info.append( + { + "serial": info.serial_number, + "fps": info.camera_configuration.fps, + "total_frames": cam.get_svo_number_of_frames(), + } + ) + + def sync_to_latest_start(self): + """Aligns all SVOs to the timestamp of the latest starting SVO.""" + if not self.cameras: + return + + start_timestamps = [] + for cam in self.cameras: + err = cam.grab(self.runtime_params) + if err == sl.ERROR_CODE.SUCCESS: + ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds() + start_timestamps.append(ts) + else: + start_timestamps.append(0) + + if not start_timestamps: + return + + max_start_ts = max(start_timestamps) + + for i, cam in enumerate(self.cameras): + ts = start_timestamps[i] + if ts < max_start_ts: + diff_ns = max_start_ts - ts + fps = self.camera_info[i]["fps"] + frames_to_skip = int((diff_ns / 1_000_000_000) * fps) + cam.set_svo_position(frames_to_skip) + else: + cam.set_svo_position(0) + + def grab_all(self) -> List[Optional[FrameData]]: + """Grabs a frame from all cameras without strict synchronization.""" + frames = [] + for i, cam in enumerate(self.cameras): + err = cam.grab(self.runtime_params) + if err == sl.ERROR_CODE.SUCCESS: + mat = sl.Mat() + cam.retrieve_image(mat, sl.VIEW.LEFT) + frames.append( + FrameData( + image=mat.get_data().copy(), + timestamp_ns=cam.get_timestamp( + sl.TIME_REFERENCE.IMAGE + ).get_nanoseconds(), + frame_index=cam.get_svo_position(), + serial_number=self.camera_info[i]["serial"], + ) + ) + elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: + cam.set_svo_position(0) + frames.append(None) + else: + frames.append(None) + return frames + + def grab_synced(self, tolerance_ms: int = 33) -> List[Optional[FrameData]]: + """ + Grabs frames from all cameras, attempting to keep them within tolerance_ms. + If a camera falls behind, it skips frames. + """ + frames = self.grab_all() + if not any(frames): + return frames + + # Find latest timestamp + valid_timestamps = [f.timestamp_ns for f in frames if f is not None] + if not valid_timestamps: + return frames + + max_ts = max(valid_timestamps) + tolerance_ns = tolerance_ms * 1_000_000 + + for i, frame in enumerate(frames): + if frame is None: + continue + + if max_ts - frame.timestamp_ns > tolerance_ns: + cam = self.cameras[i] + fps = self.camera_info[i]["fps"] + diff_ns = max_ts - frame.timestamp_ns + frames_to_skip = int((diff_ns / 1_000_000_000) * fps) + + if frames_to_skip > 0: + current_pos = cam.get_svo_position() + cam.set_svo_position(current_pos + frames_to_skip) + + err = cam.grab(self.runtime_params) + + if err == sl.ERROR_CODE.SUCCESS: + mat = sl.Mat() + cam.retrieve_image(mat, sl.VIEW.LEFT) + frames[i] = FrameData( + image=mat.get_data().copy(), + timestamp_ns=cam.get_timestamp( + sl.TIME_REFERENCE.IMAGE + ).get_nanoseconds(), + frame_index=cam.get_svo_position(), + serial_number=self.camera_info[i]["serial"], + ) + elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: + cam.set_svo_position(0) + frames[i] = None + else: + frames[i] = None + + return frames + + def close(self): + """Closes all cameras.""" + for cam in self.cameras: + cam.close() + self.cameras = []