Add ArUco detection and pose estimation modules
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -0,0 +1,95 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
# Order of detection result
|
||||
# 0, 1, 2, 3
|
||||
# TL, TR, BR, BL
|
||||
# RED, GREEN, BLUE, YELLOW
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiamondBoardParameter:
|
||||
marker_leghth: float
|
||||
"""
|
||||
the ArUco marker length in meter
|
||||
"""
|
||||
chess_length: float
|
||||
"""
|
||||
the length of the chess board in meter
|
||||
"""
|
||||
border_length: float = 0.01
|
||||
"""
|
||||
border_length in m, default is 1cm
|
||||
"""
|
||||
|
||||
@property
|
||||
def marker_border_length(self):
|
||||
assert self.chess_length > self.marker_leghth
|
||||
return (self.chess_length - self.marker_leghth) / 2
|
||||
|
||||
@property
|
||||
def total_side_length(self):
|
||||
assert self.chess_length > self.marker_leghth
|
||||
return self.marker_border_length * 2 + self.chess_length * 3
|
||||
|
||||
|
||||
# 9mm + 127mm + 127mm (97mm marker) + 127mm + 10mm
|
||||
# i.e. marker boarder = 127mm - 97mm = 30mm (15mm each side)
|
||||
Point2D = tuple[float, float]
|
||||
Quad2D = tuple[Point2D, Point2D, Point2D, Point2D]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArUcoMarker:
|
||||
id: int
|
||||
corners: Quad2D
|
||||
|
||||
|
||||
# let's let TL be the origin
|
||||
def generate_diamond_corners(
|
||||
ids: tuple[int, int, int, int], params: DiamondBoardParameter
|
||||
):
|
||||
"""
|
||||
A diamond chess board, which could be count as a kind of ChArUco board
|
||||
|
||||
C | 0 | C
|
||||
---------
|
||||
1 | C | 2
|
||||
---------
|
||||
C | 3 | C
|
||||
|
||||
where C is the chess box, and 0, 1, 2, 3 are the markers (whose ids are passed in order)
|
||||
|
||||
Args:
|
||||
ids: a tuple of 4 ids of the markers
|
||||
params: DiamondBoardParameter
|
||||
"""
|
||||
|
||||
def tl_to_square(tl_x: float, tl_y: float, side_length: float) -> Quad2D:
|
||||
return (
|
||||
(tl_x, tl_y),
|
||||
(tl_x + side_length, tl_y),
|
||||
(tl_x + side_length, tl_y + side_length),
|
||||
(tl_x, tl_y + side_length),
|
||||
)
|
||||
|
||||
tl_0_x = params.border_length + params.chess_length + params.marker_border_length
|
||||
tl_0_y = params.border_length + params.marker_border_length
|
||||
|
||||
tl_1_x = params.border_length + params.marker_border_length
|
||||
tl_1_y = params.border_length + params.chess_length + params.marker_border_length
|
||||
|
||||
tl_2_x = (
|
||||
params.border_length + params.chess_length * 2 + params.marker_border_length
|
||||
)
|
||||
tl_2_y = tl_1_y
|
||||
|
||||
tl_3_x = params.border_length + params.chess_length + params.marker_border_length
|
||||
tl_3_y = (
|
||||
params.border_length + params.chess_length * 2 + params.marker_border_length
|
||||
)
|
||||
return (
|
||||
ArUcoMarker(ids[0], tl_to_square(tl_0_x, tl_0_y, params.marker_leghth)),
|
||||
ArUcoMarker(ids[1], tl_to_square(tl_1_x, tl_1_y, params.marker_leghth)),
|
||||
ArUcoMarker(ids[2], tl_to_square(tl_2_x, tl_2_y, params.marker_leghth)),
|
||||
ArUcoMarker(ids[3], tl_to_square(tl_3_x, tl_3_y, params.marker_leghth)),
|
||||
)
|
||||
@@ -0,0 +1,144 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pyzed.sl as sl
|
||||
from typing import Optional, Tuple, Dict
|
||||
from .pose_math import compute_reprojection_error
|
||||
|
||||
|
||||
def create_detector(
|
||||
dictionary_id: int = cv2.aruco.DICT_4X4_50,
|
||||
params: Optional[cv2.aruco.DetectorParameters] = None,
|
||||
) -> cv2.aruco.ArucoDetector:
|
||||
"""
|
||||
Creates an ArUco detector with the specified dictionary and parameters.
|
||||
|
||||
Args:
|
||||
dictionary_id: The ArUco dictionary ID.
|
||||
params: Optional detector parameters. If None, default parameters are used.
|
||||
|
||||
Returns:
|
||||
cv2.aruco.ArucoDetector: The configured ArUco detector.
|
||||
"""
|
||||
dictionary = cv2.aruco.getPredefinedDictionary(dictionary_id)
|
||||
if params is None:
|
||||
params = cv2.aruco.DetectorParameters()
|
||||
return cv2.aruco.ArucoDetector(dictionary, params)
|
||||
|
||||
|
||||
def detect_markers(
|
||||
image: np.ndarray, detector: cv2.aruco.ArucoDetector
|
||||
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
|
||||
"""
|
||||
Detects ArUco markers in an image.
|
||||
|
||||
Args:
|
||||
image: Input image (BGR or grayscale).
|
||||
detector: The ArUco detector to use.
|
||||
|
||||
Returns:
|
||||
Tuple[np.ndarray, Optional[np.ndarray]]:
|
||||
- corners: Detected corners as float32 shape (N, 4, 2).
|
||||
- ids: Detected marker IDs as int shape (N,) or None if no markers detected.
|
||||
"""
|
||||
if image is None:
|
||||
raise ValueError("Input image is None")
|
||||
|
||||
if len(image.shape) == 3:
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = image
|
||||
|
||||
corners, ids, _ = detector.detectMarkers(gray)
|
||||
|
||||
if ids is not None:
|
||||
# corners is a list of (1, 4, 2) arrays, reshape to (N, 4, 2)
|
||||
corners_array = np.array(corners, dtype=np.float32).reshape(-1, 4, 2)
|
||||
ids_array = ids.flatten().astype(np.int32)
|
||||
return corners_array, ids_array
|
||||
else:
|
||||
return np.empty((0, 4, 2), dtype=np.float32), None
|
||||
|
||||
|
||||
def build_camera_matrix_from_zed(
|
||||
cam: sl.Camera, resizer: Optional[sl.Resolution] = None
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Builds a 3x3 camera intrinsic matrix from a ZED camera's calibration parameters.
|
||||
|
||||
Args:
|
||||
cam: The ZED camera object.
|
||||
resizer: Optional resolution to get parameters for.
|
||||
|
||||
Returns:
|
||||
np.ndarray: 3x3 float64 camera matrix K.
|
||||
"""
|
||||
cam_params = cam.get_camera_information(
|
||||
resizer
|
||||
).camera_configuration.calibration_parameters.left_cam
|
||||
fx = cam_params.fx
|
||||
fy = cam_params.fy
|
||||
cx = cam_params.cx
|
||||
cy = cam_params.cy
|
||||
|
||||
K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float64)
|
||||
|
||||
return K
|
||||
|
||||
|
||||
def estimate_pose_from_detections(
|
||||
corners: np.ndarray,
|
||||
ids: np.ndarray,
|
||||
marker_geometry: Dict[int, np.ndarray],
|
||||
K: np.ndarray,
|
||||
dist: Optional[np.ndarray] = None,
|
||||
min_markers: int = 4,
|
||||
pnp_flag: int = cv2.SOLVEPNP_SQPNP,
|
||||
) -> Optional[Tuple[np.ndarray, np.ndarray, float, int]]:
|
||||
"""
|
||||
Estimates the camera pose from detected ArUco markers.
|
||||
|
||||
Args:
|
||||
corners: Detected corners (N, 4, 2).
|
||||
ids: Detected marker IDs (N,).
|
||||
marker_geometry: Dictionary mapping marker ID to its 3D corners (4, 3).
|
||||
K: 3x3 camera matrix.
|
||||
dist: Optional distortion coefficients.
|
||||
min_markers: Minimum number of markers required for pose estimation.
|
||||
pnp_flag: OpenCV solvePnP flag.
|
||||
|
||||
Returns:
|
||||
Optional[Tuple[np.ndarray, np.ndarray, float, int]]:
|
||||
(rvec, tvec, reproj_err, n_markers) or None if estimation fails or not enough markers.
|
||||
"""
|
||||
if ids is None or len(ids) == 0:
|
||||
return None
|
||||
|
||||
obj_pts = []
|
||||
img_pts = []
|
||||
n_markers = 0
|
||||
|
||||
for i, marker_id in enumerate(ids):
|
||||
if marker_id in marker_geometry:
|
||||
obj_pts.append(marker_geometry[marker_id])
|
||||
img_pts.append(corners[i])
|
||||
n_markers += 1
|
||||
|
||||
if n_markers < min_markers:
|
||||
return None
|
||||
|
||||
# Stack points into (M*4, 3) and (M*4, 2)
|
||||
obj_pts_array = np.vstack(obj_pts).astype(np.float32)
|
||||
img_pts_array = np.vstack(img_pts).astype(np.float32)
|
||||
|
||||
success, rvec, tvec = cv2.solvePnP(
|
||||
obj_pts_array, img_pts_array, K, dist, flags=pnp_flag
|
||||
)
|
||||
|
||||
if not success:
|
||||
return None
|
||||
|
||||
reproj_err = compute_reprojection_error(
|
||||
obj_pts_array, img_pts_array, rvec, tvec, K, dist
|
||||
)
|
||||
|
||||
return rvec, tvec, reproj_err, n_markers
|
||||
@@ -0,0 +1,173 @@
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Final, Optional, TypedDict, cast
|
||||
|
||||
import awkward as ak
|
||||
import cv2
|
||||
import numpy as np
|
||||
from cv2 import aruco
|
||||
from cv2.typing import MatLike
|
||||
from jaxtyping import Int, Num
|
||||
from loguru import logger
|
||||
|
||||
NDArray = np.ndarray
|
||||
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
|
||||
# OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet"
|
||||
OBJECT_POINTS_PARQUET = Path("output") / "standard_box_markers.parquet"
|
||||
DICTIONARY: Final[int] = aruco.DICT_4X4_50
|
||||
# 400mm
|
||||
MARKER_LENGTH: Final[float] = 0.4
|
||||
|
||||
|
||||
class MarkerFace(TypedDict):
|
||||
"""
|
||||
for diamond ArUco markers, N is 4
|
||||
"""
|
||||
|
||||
name: str
|
||||
"""
|
||||
a label for the face
|
||||
"""
|
||||
ids: Int[NDArray, "N"]
|
||||
"""
|
||||
ArUco marker ids
|
||||
"""
|
||||
corners: Num[NDArray, "N 4 3"]
|
||||
"""
|
||||
Corner coordinates in 3D of rectangle,
|
||||
relative to the world origin
|
||||
"""
|
||||
|
||||
|
||||
def gen():
|
||||
API = cv2.CAP_AVFOUNDATION
|
||||
cap = cv2.VideoCapture(0, API)
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
logger.warning("Failed to grab frame")
|
||||
break
|
||||
yield frame
|
||||
|
||||
|
||||
def main():
|
||||
aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)
|
||||
cal = ak.from_parquet(CALIBRATION_PARQUET)[0]
|
||||
camera_matrix = cast(MatLike, ak.to_numpy(cal["camera_matrix"]))
|
||||
distortion_coefficients = cast(MatLike, ak.to_numpy(cal["distortion_coefficients"]))
|
||||
ops = ak.from_parquet(OBJECT_POINTS_PARQUET)
|
||||
detector = aruco.ArucoDetector(
|
||||
dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()
|
||||
)
|
||||
|
||||
total_ids = cast(NDArray, ak.to_numpy(ops["ids"])).flatten()
|
||||
total_corners = cast(NDArray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
|
||||
ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))
|
||||
logger.info("ops_map={}", ops_map)
|
||||
writer: Optional[cv2.VideoWriter] = None
|
||||
|
||||
for frame in gen():
|
||||
grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
# pylint: disable-next=unpacking-non-sequence
|
||||
markers, ids, rejected = detector.detectMarkers(grey)
|
||||
# `markers` is [N, 1, 4, 2]
|
||||
# `ids` is [N, 1]
|
||||
if ids is not None:
|
||||
markers = np.reshape(markers, (-1, 4, 2))
|
||||
ids = np.reshape(ids, (-1, 1))
|
||||
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
|
||||
ips_map: dict[int, NDArray] = {}
|
||||
for cs, id in zip(markers, ids):
|
||||
id = int(id)
|
||||
cs = cast(NDArray, cs)
|
||||
ips_map[id] = cs
|
||||
center = np.mean(cs, axis=0).astype(int)
|
||||
GREY = (128, 128, 128)
|
||||
# logger.info("id={}, center={}", id, center)
|
||||
cv2.circle(frame, tuple(center), 5, GREY, -1)
|
||||
cv2.putText(
|
||||
frame,
|
||||
str(id),
|
||||
tuple(center),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
1,
|
||||
GREY,
|
||||
2,
|
||||
)
|
||||
# BGR
|
||||
RED = (0, 0, 255)
|
||||
GREEN = (0, 255, 0)
|
||||
BLUE = (255, 0, 0)
|
||||
YELLOW = (0, 255, 255)
|
||||
color_map = [RED, GREEN, BLUE, YELLOW]
|
||||
for color, corners in zip(color_map, cs):
|
||||
corners = corners.astype(int)
|
||||
frame = cv2.circle(frame, corners, 5, color, -1)
|
||||
# https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e
|
||||
ops: NDArray = np.empty((0, 3), dtype=np.float32)
|
||||
ips: NDArray = np.empty((0, 2), dtype=np.float32)
|
||||
for id, ip in ips_map.items():
|
||||
try:
|
||||
op = ops_map[id]
|
||||
assert ip.shape == (4, 2), f"corners.shape={ip.shape}"
|
||||
assert op.shape == (4, 3), f"op.shape={op.shape}"
|
||||
ops = np.concatenate((ops, op), axis=0)
|
||||
ips = np.concatenate((ips, ip), axis=0)
|
||||
except KeyError:
|
||||
logger.warning("No object points for id={}", id)
|
||||
continue
|
||||
assert len(ops) == len(ips), f"len(ops)={len(ops)} != len(ips)={len(ips)}"
|
||||
if len(ops) > 0:
|
||||
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
|
||||
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags
|
||||
ret, rvec, tvec = cv2.solvePnP(
|
||||
objectPoints=ops,
|
||||
imagePoints=ips,
|
||||
cameraMatrix=camera_matrix,
|
||||
distCoeffs=distortion_coefficients,
|
||||
flags=cv2.SOLVEPNP_SQPNP,
|
||||
)
|
||||
# ret, rvec, tvec, inliners = cv2.solvePnPRansac(
|
||||
# objectPoints=ops,
|
||||
# imagePoints=ips,
|
||||
# cameraMatrix=camera_matrix,
|
||||
# distCoeffs=distortion_coefficients,
|
||||
# flags=cv2.SOLVEPNP_SQPNP,
|
||||
# )
|
||||
if ret:
|
||||
cv2.drawFrameAxes(
|
||||
frame,
|
||||
camera_matrix,
|
||||
distortion_coefficients,
|
||||
rvec,
|
||||
tvec,
|
||||
MARKER_LENGTH,
|
||||
)
|
||||
else:
|
||||
logger.warning("Failed to solvePnPRansac")
|
||||
cv2.imshow("frame", frame)
|
||||
if writer is not None:
|
||||
writer.write(frame)
|
||||
if (k := cv2.waitKey(1)) == ord("q"):
|
||||
logger.info("Exiting")
|
||||
break
|
||||
elif k == ord("s"):
|
||||
now = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
file_name = f"aruco_{now}.png"
|
||||
logger.info("Saving to {}", file_name)
|
||||
cv2.imwrite(file_name, frame)
|
||||
elif k == ord("r"):
|
||||
if writer is not None:
|
||||
writer.release()
|
||||
writer = None
|
||||
logger.info("Recording stopped")
|
||||
else:
|
||||
now = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
file_name = f"aruco_{now}.mp4"
|
||||
logger.info("Recording to {}", file_name)
|
||||
fourcc = cv2.VideoWriter.fourcc(*"mp4v")
|
||||
writer = cv2.VideoWriter(file_name, fourcc, 20.0, frame.shape[:2][::-1])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,85 @@
|
||||
import numpy as np
|
||||
import awkward as ak
|
||||
from pathlib import Path
|
||||
from typing import Union, cast
|
||||
|
||||
|
||||
def load_marker_geometry(parquet_path: Union[str, Path]) -> dict[int, np.ndarray]:
|
||||
"""
|
||||
Reads ArUco marker geometry from a parquet file.
|
||||
|
||||
The parquet file is expected to have 'ids' and 'corners' columns,
|
||||
similar to standard_box_markers.parquet.
|
||||
|
||||
Args:
|
||||
parquet_path: Path to the parquet file.
|
||||
|
||||
Returns:
|
||||
A dictionary mapping marker IDs to their 3D corner coordinates.
|
||||
The corners are represented as a (4, 3) float32 numpy array.
|
||||
"""
|
||||
path = Path(parquet_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Parquet file not found: {path}")
|
||||
|
||||
ops = ak.from_parquet(path)
|
||||
|
||||
# Extract IDs and corners using logic similar to find_extrinsic_object.py
|
||||
total_ids = cast(np.ndarray, ak.to_numpy(ops["ids"])).flatten()
|
||||
total_corners = cast(np.ndarray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
|
||||
|
||||
# Create the mapping and ensure coordinates are float32
|
||||
marker_geometry = {
|
||||
int(marker_id): corners.astype(np.float32)
|
||||
for marker_id, corners in zip(total_ids, total_corners)
|
||||
}
|
||||
|
||||
return marker_geometry
|
||||
|
||||
|
||||
def validate_marker_geometry(geometry: dict[int, np.ndarray]) -> None:
|
||||
"""
|
||||
Validates the marker geometry dictionary.
|
||||
|
||||
Checks:
|
||||
- Dictionary is not empty.
|
||||
- Every entry has shape (4, 3).
|
||||
- Every entry contains only finite numbers (no NaN or Inf).
|
||||
- Coordinates are within a reasonable range (abs(coord) < 100m).
|
||||
|
||||
Args:
|
||||
geometry: The marker geometry dictionary to validate.
|
||||
|
||||
Raises:
|
||||
ValueError: If any validation check fails.
|
||||
"""
|
||||
if not geometry:
|
||||
raise ValueError("Marker geometry is empty.")
|
||||
|
||||
for marker_id, corners in geometry.items():
|
||||
# Check shape
|
||||
if corners.shape != (4, 3):
|
||||
raise ValueError(
|
||||
f"Invalid shape for marker {marker_id}: "
|
||||
f"expected (4, 3), got {corners.shape}"
|
||||
)
|
||||
|
||||
# Check for non-finite values (NaN, Inf)
|
||||
if not np.isfinite(corners).all():
|
||||
raise ValueError(
|
||||
f"Marker {marker_id} contains non-finite values (NaN or Inf)."
|
||||
)
|
||||
|
||||
# Check for reasonable range (e.g., < 100 meters)
|
||||
if np.any(np.abs(corners) > 100.0):
|
||||
raise ValueError(
|
||||
f"Marker {marker_id} has coordinates exceeding reasonable range (> 100m): "
|
||||
f"{corners}"
|
||||
)
|
||||
|
||||
|
||||
def expected_ids(geometry: dict[int, np.ndarray]) -> set[int]:
|
||||
"""
|
||||
Returns the set of marker IDs present in the geometry.
|
||||
"""
|
||||
return set(geometry.keys())
|
||||
Vendored
BIN
Binary file not shown.
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,242 @@
|
||||
import numpy as np
|
||||
from typing import List, Dict, Tuple, Optional, Any
|
||||
|
||||
try:
|
||||
from scipy.spatial.transform import Rotation as R
|
||||
|
||||
HAS_SCIPY = True
|
||||
except ImportError:
|
||||
HAS_SCIPY = False
|
||||
|
||||
|
||||
def rotation_angle_deg(Ra: np.ndarray, Rb: np.ndarray) -> float:
|
||||
"""
|
||||
Compute the rotation angle in degrees between two rotation matrices.
|
||||
|
||||
Args:
|
||||
Ra: 3x3 rotation matrix.
|
||||
Rb: 3x3 rotation matrix.
|
||||
|
||||
Returns:
|
||||
Angle in degrees.
|
||||
"""
|
||||
R_diff = Ra.T @ Rb
|
||||
cos_theta = (np.trace(R_diff) - 1.0) / 2.0
|
||||
# Clip to avoid numerical issues
|
||||
cos_theta = np.clip(cos_theta, -1.0, 1.0)
|
||||
return np.degrees(np.arccos(cos_theta))
|
||||
|
||||
|
||||
def matrix_to_quaternion(R_mat: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert a 3x3 rotation matrix to a quaternion (w, x, y, z).
|
||||
Uses the method from "Unit Quaternions and Rotation Matrices" by Mike Day.
|
||||
"""
|
||||
tr = np.trace(R_mat)
|
||||
if tr > 0:
|
||||
S = np.sqrt(tr + 1.0) * 2
|
||||
qw = 0.25 * S
|
||||
qx = (R_mat[2, 1] - R_mat[1, 2]) / S
|
||||
qy = (R_mat[0, 2] - R_mat[2, 0]) / S
|
||||
qz = (R_mat[1, 0] - R_mat[0, 1]) / S
|
||||
elif (R_mat[0, 0] > R_mat[1, 1]) and (R_mat[0, 0] > R_mat[2, 2]):
|
||||
S = np.sqrt(1.0 + R_mat[0, 0] - R_mat[1, 1] - R_mat[2, 2]) * 2
|
||||
qw = (R_mat[2, 1] - R_mat[1, 2]) / S
|
||||
qx = 0.25 * S
|
||||
qy = (R_mat[0, 1] + R_mat[1, 0]) / S
|
||||
qz = (R_mat[0, 2] + R_mat[2, 0]) / S
|
||||
elif R_mat[1, 1] > R_mat[2, 2]:
|
||||
S = np.sqrt(1.0 + R_mat[1, 1] - R_mat[0, 0] - R_mat[2, 2]) * 2
|
||||
qw = (R_mat[0, 2] - R_mat[2, 0]) / S
|
||||
qx = (R_mat[0, 1] + R_mat[1, 0]) / S
|
||||
qy = 0.25 * S
|
||||
qz = (R_mat[1, 2] + R_mat[2, 1]) / S
|
||||
else:
|
||||
S = np.sqrt(1.0 + R_mat[2, 2] - R_mat[0, 0] - R_mat[1, 1]) * 2
|
||||
qw = (R_mat[1, 0] - R_mat[0, 1]) / S
|
||||
qx = (R_mat[0, 2] + R_mat[2, 0]) / S
|
||||
qy = (R_mat[1, 2] + R_mat[2, 1]) / S
|
||||
qz = 0.25 * S
|
||||
return np.array([qw, qx, qy, qz])
|
||||
|
||||
|
||||
def quaternion_to_matrix(q: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Convert a quaternion (w, x, y, z) to a 3x3 rotation matrix.
|
||||
"""
|
||||
qw, qx, qy, qz = q
|
||||
return np.array(
|
||||
[
|
||||
[
|
||||
1 - 2 * qy**2 - 2 * qz**2,
|
||||
2 * qx * qy - 2 * qz * qw,
|
||||
2 * qx * qz + 2 * qy * qw,
|
||||
],
|
||||
[
|
||||
2 * qx * qy + 2 * qz * qw,
|
||||
1 - 2 * qx**2 - 2 * qz**2,
|
||||
2 * qy * qz - 2 * qx * qw,
|
||||
],
|
||||
[
|
||||
2 * qx * qz - 2 * qy * qw,
|
||||
2 * qy * qz + 2 * qx * qw,
|
||||
1 - 2 * qx**2 - 2 * qy**2,
|
||||
],
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class PoseAccumulator:
|
||||
"""
|
||||
Accumulates 4x4 poses and provides robust averaging methods.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.poses: List[np.ndarray] = []
|
||||
self.reproj_errors: List[float] = []
|
||||
self.frame_ids: List[int] = []
|
||||
|
||||
def add_pose(self, T: np.ndarray, reproj_error: float, frame_id: int) -> None:
|
||||
"""
|
||||
Add a 4x4 pose to the accumulator.
|
||||
|
||||
Args:
|
||||
T: 4x4 transformation matrix.
|
||||
reproj_error: Reprojection error associated with the pose.
|
||||
frame_id: ID of the frame where the pose was detected.
|
||||
"""
|
||||
if not isinstance(T, np.ndarray) or T.shape != (4, 4):
|
||||
raise ValueError("T must be a 4x4 numpy array")
|
||||
|
||||
self.poses.append(T.copy())
|
||||
self.reproj_errors.append(float(reproj_error))
|
||||
self.frame_ids.append(int(frame_id))
|
||||
|
||||
def filter_by_reproj(self, max_reproj_error: float) -> List[int]:
|
||||
"""
|
||||
Return indices of poses with reprojection error below threshold.
|
||||
"""
|
||||
return [
|
||||
i for i, err in enumerate(self.reproj_errors) if err <= max_reproj_error
|
||||
]
|
||||
|
||||
def ransac_filter(
|
||||
self,
|
||||
rot_thresh_deg: float = 5.0,
|
||||
trans_thresh_m: float = 0.05,
|
||||
max_iters: int = 200,
|
||||
random_seed: int = 0,
|
||||
) -> List[int]:
|
||||
"""
|
||||
Perform RANSAC to find the largest consensus set of poses.
|
||||
|
||||
Args:
|
||||
rot_thresh_deg: Rotation threshold in degrees.
|
||||
trans_thresh_m: Translation threshold in meters.
|
||||
max_iters: Maximum number of RANSAC iterations.
|
||||
random_seed: Seed for reproducibility.
|
||||
|
||||
Returns:
|
||||
Indices of the inlier poses.
|
||||
"""
|
||||
n = len(self.poses)
|
||||
if n == 0:
|
||||
return []
|
||||
if n == 1:
|
||||
return [0]
|
||||
|
||||
rng = np.random.default_rng(random_seed)
|
||||
best_inliers = []
|
||||
|
||||
for _ in range(max_iters):
|
||||
# Pick a random reference pose
|
||||
ref_idx = rng.integers(0, n)
|
||||
T_ref = self.poses[ref_idx]
|
||||
R_ref = T_ref[:3, :3]
|
||||
t_ref = T_ref[:3, 3]
|
||||
|
||||
current_inliers = []
|
||||
for i in range(n):
|
||||
T_i = self.poses[i]
|
||||
R_i = T_i[:3, :3]
|
||||
t_i = T_i[:3, 3]
|
||||
|
||||
# Check rotation
|
||||
d_rot = rotation_angle_deg(R_ref, R_i)
|
||||
# Check translation
|
||||
d_trans = np.linalg.norm(t_ref - t_i)
|
||||
|
||||
if d_rot <= rot_thresh_deg and d_trans <= trans_thresh_m:
|
||||
current_inliers.append(i)
|
||||
|
||||
if len(current_inliers) > len(best_inliers):
|
||||
best_inliers = current_inliers
|
||||
if len(best_inliers) == n:
|
||||
break
|
||||
|
||||
return best_inliers
|
||||
|
||||
def compute_robust_mean(
|
||||
self, inlier_indices: Optional[List[int]] = None
|
||||
) -> Tuple[np.ndarray, Dict[str, Any]]:
|
||||
"""
|
||||
Compute the robust mean of the accumulated poses.
|
||||
|
||||
Args:
|
||||
inlier_indices: Optional list of indices to use. If None, uses all poses.
|
||||
|
||||
Returns:
|
||||
T_mean: 4x4 averaged transformation matrix.
|
||||
stats: Dictionary with statistics.
|
||||
"""
|
||||
if inlier_indices is None:
|
||||
inlier_indices = list(range(len(self.poses)))
|
||||
|
||||
n_total = len(self.poses)
|
||||
n_inliers = len(inlier_indices)
|
||||
|
||||
if n_inliers == 0:
|
||||
return np.eye(4), {
|
||||
"n_total": n_total,
|
||||
"n_inliers": 0,
|
||||
"median_reproj_error": 0.0,
|
||||
}
|
||||
|
||||
inlier_poses = [self.poses[i] for i in inlier_indices]
|
||||
inlier_errors = [self.reproj_errors[i] for i in inlier_indices]
|
||||
|
||||
# Translation: median
|
||||
translations = np.array([T[:3, 3] for T in inlier_poses])
|
||||
t_mean = np.median(translations, axis=0)
|
||||
|
||||
# Rotation: mean
|
||||
rotations = [T[:3, :3] for T in inlier_poses]
|
||||
if HAS_SCIPY:
|
||||
r_objs = R.from_matrix(rotations)
|
||||
R_mean = r_objs.mean().as_matrix()
|
||||
else:
|
||||
# Quaternion eigen mean fallback
|
||||
quats = np.array([matrix_to_quaternion(rot) for rot in rotations])
|
||||
# Form the matrix M = sum(q_i * q_i^T)
|
||||
M = np.zeros((4, 4))
|
||||
for i in range(n_inliers):
|
||||
q = quats[i]
|
||||
M += np.outer(q, q)
|
||||
# The average quaternion is the eigenvector corresponding to the largest eigenvalue
|
||||
evals, evecs = np.linalg.eigh(M)
|
||||
q_mean = evecs[:, -1]
|
||||
R_mean = quaternion_to_matrix(q_mean)
|
||||
|
||||
T_mean = np.eye(4)
|
||||
T_mean[:3, :3] = R_mean
|
||||
T_mean[:3, 3] = t_mean
|
||||
|
||||
stats = {
|
||||
"n_total": n_total,
|
||||
"n_inliers": n_inliers,
|
||||
"median_reproj_error": float(np.median(inlier_errors))
|
||||
if inlier_errors
|
||||
else 0.0,
|
||||
}
|
||||
|
||||
return T_mean, stats
|
||||
@@ -0,0 +1,144 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def rvec_tvec_to_matrix(rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Converts rotation vector and translation vector to a 4x4 homogeneous transformation matrix.
|
||||
|
||||
For solvePnP, rvec and tvec typically represent the transformation from object space
|
||||
to camera space (T_camera_object).
|
||||
|
||||
Args:
|
||||
rvec: Rotation vector of shape (3,), (3, 1), or (1, 3).
|
||||
tvec: Translation vector of shape (3,), (3, 1), or (1, 3).
|
||||
|
||||
Returns:
|
||||
np.ndarray: 4x4 homogeneous transformation matrix.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs have incorrect shapes.
|
||||
"""
|
||||
rvec = np.asarray(rvec, dtype=np.float64).flatten()
|
||||
tvec = np.asarray(tvec, dtype=np.float64).flatten()
|
||||
|
||||
if rvec.shape != (3,) or tvec.shape != (3,):
|
||||
raise ValueError(
|
||||
f"rvec and tvec must have 3 elements. Got rvec {rvec.shape} and tvec {tvec.shape}"
|
||||
)
|
||||
|
||||
R, _ = cv2.Rodrigues(rvec)
|
||||
T = np.eye(4, dtype=np.float64)
|
||||
T[:3, :3] = R
|
||||
T[:3, 3] = tvec
|
||||
return T
|
||||
|
||||
|
||||
def matrix_to_rvec_tvec(T: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Converts a 4x4 homogeneous transformation matrix to rotation and translation vectors.
|
||||
|
||||
Args:
|
||||
T: 4x4 homogeneous transformation matrix.
|
||||
|
||||
Returns:
|
||||
Tuple[np.ndarray, np.ndarray]: (rvec, tvec) both of shape (3,).
|
||||
|
||||
Raises:
|
||||
ValueError: If T is not a 4x4 matrix.
|
||||
"""
|
||||
T = np.asarray(T, dtype=np.float64)
|
||||
if T.shape != (4, 4):
|
||||
raise ValueError(f"Transformation matrix must be 4x4. Got {T.shape}")
|
||||
|
||||
R = T[:3, :3]
|
||||
tvec = T[:3, 3]
|
||||
rvec, _ = cv2.Rodrigues(R)
|
||||
return rvec.flatten(), tvec.flatten()
|
||||
|
||||
|
||||
def invert_transform(T: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Inverts a 4x4 homogeneous transformation matrix.
|
||||
Uses the property that the inverse of [R | t; 0 | 1] is [R^T | -R^T * t; 0 | 1].
|
||||
|
||||
Args:
|
||||
T: 4x4 homogeneous transformation matrix.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Inverted 4x4 transformation matrix.
|
||||
|
||||
Raises:
|
||||
ValueError: If T is not a 4x4 matrix.
|
||||
"""
|
||||
T = np.asarray(T, dtype=np.float64)
|
||||
if T.shape != (4, 4):
|
||||
raise ValueError(f"Transformation matrix must be 4x4. Got {T.shape}")
|
||||
|
||||
R = T[:3, :3]
|
||||
t = T[:3, 3]
|
||||
|
||||
T_inv = np.eye(4, dtype=np.float64)
|
||||
R_inv = R.T
|
||||
T_inv[:3, :3] = R_inv
|
||||
T_inv[:3, 3] = -R_inv @ t
|
||||
return T_inv
|
||||
|
||||
|
||||
def compose_transforms(T1: np.ndarray, T2: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Multiplies two 4x4 homogeneous transformation matrices.
|
||||
|
||||
Args:
|
||||
T1: First 4x4 transformation matrix.
|
||||
T2: Second 4x4 transformation matrix.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Result of T1 @ T2.
|
||||
"""
|
||||
return np.asarray(T1, dtype=np.float64) @ np.asarray(T2, dtype=np.float64)
|
||||
|
||||
|
||||
def compute_reprojection_error(
|
||||
obj_pts: np.ndarray,
|
||||
img_pts: np.ndarray,
|
||||
rvec: np.ndarray,
|
||||
tvec: np.ndarray,
|
||||
K: np.ndarray,
|
||||
dist: Optional[np.ndarray] = None,
|
||||
) -> float:
|
||||
"""
|
||||
Computes the mean Euclidean pixel error using cv2.projectPoints.
|
||||
|
||||
Args:
|
||||
obj_pts: Object points in 3D (N, 3).
|
||||
img_pts: Observed image points (N, 2).
|
||||
rvec: Rotation vector.
|
||||
tvec: Translation vector.
|
||||
K: 3x3 Camera intrinsic matrix.
|
||||
dist: Optional lens distortion coefficients.
|
||||
|
||||
Returns:
|
||||
float: Mean Euclidean reprojection error.
|
||||
|
||||
Raises:
|
||||
ValueError: If point counts do not match.
|
||||
"""
|
||||
obj_pts = np.asarray(obj_pts, dtype=np.float64)
|
||||
img_pts = np.asarray(img_pts, dtype=np.float64)
|
||||
|
||||
if len(obj_pts) != len(img_pts):
|
||||
raise ValueError(
|
||||
f"Number of object points ({len(obj_pts)}) and image points ({len(img_pts)}) must match."
|
||||
)
|
||||
|
||||
if len(obj_pts) == 0:
|
||||
return 0.0
|
||||
|
||||
projected_pts, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, dist)
|
||||
projected_pts = projected_pts.reshape(-1, 2)
|
||||
img_pts = img_pts.reshape(-1, 2)
|
||||
|
||||
errors = np.linalg.norm(projected_pts - img_pts, axis=1)
|
||||
return float(np.mean(errors))
|
||||
@@ -0,0 +1,93 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def draw_detected_markers(
|
||||
image: np.ndarray, corners: list[np.ndarray] | np.ndarray, ids: np.ndarray | None
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Draws ArUco markers outlines and IDs on the image.
|
||||
|
||||
Args:
|
||||
image: Input image (BGR or grayscale).
|
||||
corners: List of marker corners or np.ndarray.
|
||||
ids: Array of marker IDs.
|
||||
|
||||
Returns:
|
||||
Image with markers drawn.
|
||||
"""
|
||||
if image.ndim == 2:
|
||||
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
if corners is not None and len(corners) > 0:
|
||||
# Draw outlines
|
||||
# corners can be list of (1, 4, 2) or np.ndarray of (N, 4, 2)
|
||||
int_corners = [c.astype(np.int32).reshape((-1, 1, 2)) for c in corners]
|
||||
cv2.polylines(image, int_corners, True, (0, 255, 0), 2)
|
||||
|
||||
if ids is not None:
|
||||
# Flatten ids to handle (N,) or (N, 1)
|
||||
flat_ids = ids.flatten()
|
||||
for i, corner in enumerate(corners):
|
||||
# Put ID near the first corner
|
||||
# corner can be (1, 4, 2) or (4, 2)
|
||||
c = corner.reshape((-1, 2))
|
||||
pos = c[0].astype(int)
|
||||
cv2.putText(
|
||||
image,
|
||||
str(flat_ids[i]),
|
||||
tuple(pos),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.5,
|
||||
(0, 0, 255),
|
||||
2,
|
||||
)
|
||||
return image
|
||||
|
||||
|
||||
def draw_pose_axes(
|
||||
image: np.ndarray,
|
||||
rvec: np.ndarray,
|
||||
tvec: np.ndarray,
|
||||
K: np.ndarray,
|
||||
dist: np.ndarray | None = None,
|
||||
length: float = 0.1,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Draws pose axes on the image.
|
||||
|
||||
Args:
|
||||
image: Input image.
|
||||
rvec: Rotation vector.
|
||||
tvec: Translation vector.
|
||||
K: Camera intrinsic matrix.
|
||||
dist: Distortion coefficients.
|
||||
length: Length of the axes.
|
||||
|
||||
Returns:
|
||||
Image with axes drawn.
|
||||
"""
|
||||
if image.ndim == 2:
|
||||
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
cv2.drawFrameAxes(image, K, dist, rvec, tvec, length)
|
||||
return image
|
||||
|
||||
|
||||
def show_preview(
|
||||
frames: dict[int, np.ndarray], *, wait_ms: int = 1, window_prefix: str = "View "
|
||||
) -> int:
|
||||
"""
|
||||
Shows multiple camera frames in separate windows.
|
||||
|
||||
Args:
|
||||
frames: Dictionary mapping camera serials to images.
|
||||
wait_ms: Time to wait for a key press in milliseconds.
|
||||
window_prefix: Prefix for the window names.
|
||||
|
||||
Returns:
|
||||
The key code pressed, or -1 if no key was pressed.
|
||||
"""
|
||||
for serial, frame in frames.items():
|
||||
cv2.imshow(f"{window_prefix}{serial}", frame)
|
||||
return cv2.waitKey(wait_ms)
|
||||
@@ -0,0 +1,163 @@
|
||||
import pyzed.sl as sl
|
||||
import numpy as np
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Dict
|
||||
import os
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameData:
|
||||
"""Data structure for a single frame from an SVO."""
|
||||
|
||||
image: np.ndarray
|
||||
timestamp_ns: int
|
||||
frame_index: int
|
||||
serial_number: int
|
||||
|
||||
|
||||
class SVOReader:
|
||||
"""Handles synchronized playback of multiple SVO files."""
|
||||
|
||||
def __init__(self, svo_paths: List[str]):
|
||||
self.svo_paths = svo_paths
|
||||
self.cameras: List[sl.Camera] = []
|
||||
self.camera_info: List[Dict] = []
|
||||
self.runtime_params = sl.RuntimeParameters()
|
||||
|
||||
for path in svo_paths:
|
||||
if not os.path.exists(path):
|
||||
print(f"Warning: SVO file not found: {path}")
|
||||
continue
|
||||
|
||||
init_params = sl.InitParameters()
|
||||
init_params.set_from_svo_file(path)
|
||||
init_params.svo_real_time_mode = False
|
||||
init_params.depth_mode = sl.DEPTH_MODE.NONE
|
||||
|
||||
cam = sl.Camera()
|
||||
status = cam.open(init_params)
|
||||
if status != sl.ERROR_CODE.SUCCESS:
|
||||
print(f"Error: Could not open {path}: {status}")
|
||||
continue
|
||||
|
||||
info = cam.get_camera_information()
|
||||
self.cameras.append(cam)
|
||||
self.camera_info.append(
|
||||
{
|
||||
"serial": info.serial_number,
|
||||
"fps": info.camera_configuration.fps,
|
||||
"total_frames": cam.get_svo_number_of_frames(),
|
||||
}
|
||||
)
|
||||
|
||||
def sync_to_latest_start(self):
|
||||
"""Aligns all SVOs to the timestamp of the latest starting SVO."""
|
||||
if not self.cameras:
|
||||
return
|
||||
|
||||
start_timestamps = []
|
||||
for cam in self.cameras:
|
||||
err = cam.grab(self.runtime_params)
|
||||
if err == sl.ERROR_CODE.SUCCESS:
|
||||
ts = cam.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()
|
||||
start_timestamps.append(ts)
|
||||
else:
|
||||
start_timestamps.append(0)
|
||||
|
||||
if not start_timestamps:
|
||||
return
|
||||
|
||||
max_start_ts = max(start_timestamps)
|
||||
|
||||
for i, cam in enumerate(self.cameras):
|
||||
ts = start_timestamps[i]
|
||||
if ts < max_start_ts:
|
||||
diff_ns = max_start_ts - ts
|
||||
fps = self.camera_info[i]["fps"]
|
||||
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
|
||||
cam.set_svo_position(frames_to_skip)
|
||||
else:
|
||||
cam.set_svo_position(0)
|
||||
|
||||
def grab_all(self) -> List[Optional[FrameData]]:
|
||||
"""Grabs a frame from all cameras without strict synchronization."""
|
||||
frames = []
|
||||
for i, cam in enumerate(self.cameras):
|
||||
err = cam.grab(self.runtime_params)
|
||||
if err == sl.ERROR_CODE.SUCCESS:
|
||||
mat = sl.Mat()
|
||||
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
||||
frames.append(
|
||||
FrameData(
|
||||
image=mat.get_data().copy(),
|
||||
timestamp_ns=cam.get_timestamp(
|
||||
sl.TIME_REFERENCE.IMAGE
|
||||
).get_nanoseconds(),
|
||||
frame_index=cam.get_svo_position(),
|
||||
serial_number=self.camera_info[i]["serial"],
|
||||
)
|
||||
)
|
||||
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
||||
cam.set_svo_position(0)
|
||||
frames.append(None)
|
||||
else:
|
||||
frames.append(None)
|
||||
return frames
|
||||
|
||||
def grab_synced(self, tolerance_ms: int = 33) -> List[Optional[FrameData]]:
|
||||
"""
|
||||
Grabs frames from all cameras, attempting to keep them within tolerance_ms.
|
||||
If a camera falls behind, it skips frames.
|
||||
"""
|
||||
frames = self.grab_all()
|
||||
if not any(frames):
|
||||
return frames
|
||||
|
||||
# Find latest timestamp
|
||||
valid_timestamps = [f.timestamp_ns for f in frames if f is not None]
|
||||
if not valid_timestamps:
|
||||
return frames
|
||||
|
||||
max_ts = max(valid_timestamps)
|
||||
tolerance_ns = tolerance_ms * 1_000_000
|
||||
|
||||
for i, frame in enumerate(frames):
|
||||
if frame is None:
|
||||
continue
|
||||
|
||||
if max_ts - frame.timestamp_ns > tolerance_ns:
|
||||
cam = self.cameras[i]
|
||||
fps = self.camera_info[i]["fps"]
|
||||
diff_ns = max_ts - frame.timestamp_ns
|
||||
frames_to_skip = int((diff_ns / 1_000_000_000) * fps)
|
||||
|
||||
if frames_to_skip > 0:
|
||||
current_pos = cam.get_svo_position()
|
||||
cam.set_svo_position(current_pos + frames_to_skip)
|
||||
|
||||
err = cam.grab(self.runtime_params)
|
||||
|
||||
if err == sl.ERROR_CODE.SUCCESS:
|
||||
mat = sl.Mat()
|
||||
cam.retrieve_image(mat, sl.VIEW.LEFT)
|
||||
frames[i] = FrameData(
|
||||
image=mat.get_data().copy(),
|
||||
timestamp_ns=cam.get_timestamp(
|
||||
sl.TIME_REFERENCE.IMAGE
|
||||
).get_nanoseconds(),
|
||||
frame_index=cam.get_svo_position(),
|
||||
serial_number=self.camera_info[i]["serial"],
|
||||
)
|
||||
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
|
||||
cam.set_svo_position(0)
|
||||
frames[i] = None
|
||||
else:
|
||||
frames[i] = None
|
||||
|
||||
return frames
|
||||
|
||||
def close(self):
|
||||
"""Closes all cameras."""
|
||||
for cam in self.cameras:
|
||||
cam.close()
|
||||
self.cameras = []
|
||||
Reference in New Issue
Block a user