Compare commits

..

17 Commits

Author SHA1 Message Date
e593c7b363 Refactor find_extrinsic_object.py: update object points parquet file name, enhance type annotations with MarkerFace TypedDict for diamond ArUco markers, and improve code organization. 2025-04-30 11:45:41 +08:00
c8f4a7ab26 Refactor Jupyter notebooks for marker processing: deleted boom.ipynb and compute_3d_maybe.ipynb, added calculate_box_coord_naive.ipynb, calculate_box_face_coord_naive.ipynb, estimate_extrinstic.ipynb, find_aruco_points_with_image.ipynb, find_aruco_points.py, and find_extrinsic_object.py. Updated .gitignore to include new output files. 2025-04-30 11:43:59 +08:00
733c6f8670 Update execution counts in find_cute_box_with_image.ipynb and adjust coordinate transformations for OpenGL to Blender compatibility. 2025-04-24 14:23:56 +08:00
801485e6d5 Add draw_uv.ipynb for UV layout generation, including image processing and canvas creation. Updated find_cute_box_with_image.ipynb with new functions for 3D coordinate extraction and improved type annotations. 2025-04-24 12:56:34 +08:00
9e1ac3d941 Add support for GLB files in LFS tracking, enhance find_cute_box_with_image.ipynb with new functions for 3D coordinate extraction, and introduce interactive_example.py for marker processing workflow. 2025-04-24 12:13:50 +08:00
aa081f46ec Refactor find_cute_box_with_image.ipynb to reset execution counts, enhance type annotations, and add new functions for marker processing and UV-to-3D interpolation. Introduced TypedDict for marker representation and improved code organization. 2025-04-23 19:13:04 +08:00
531bc6c29a Update Jupyter notebooks to reset execution counts and remove output cells. Added padding calculations for canvas in draw.ipynb and simplified output handling in find_cute_box_with_image.ipynb. 2025-04-23 17:07:59 +08:00
667c155aab Refactor PDF handling and add new scripts. Removed specific PDF files from LFS tracking and replaced with a wildcard for all PDFs. Added a script to convert PDFs to PNGs and created new Jupyter notebooks for drawing and marker detection. Added new charuco PDF files for 410x410 markers. 2025-04-23 16:57:13 +08:00
909a0f112f cleanup 2025-04-21 16:27:58 +08:00
dce9e11502 x 2025-04-21 16:26:21 +08:00
3598defe68 more paramters 2024-12-19 16:45:51 +08:00
2559055689 PnP works, kinda 2024-12-18 18:04:50 +08:00
a207c90cb9 cute object we go 2024-12-18 15:40:51 +08:00
bfac2c3b60 good 2024-12-18 12:01:00 +08:00
ba5cf29e49 that's what I want... somehow 2024-12-18 11:56:51 +08:00
0c0a4e8c97 sane has been gained 2024-12-18 11:28:09 +08:00
1843978c47 I know why... the board is reversed, i.e. y mirrored 2024-12-18 10:23:21 +08:00
35 changed files with 4301 additions and 3112 deletions

6
.gitattributes vendored
View File

@ -1,5 +1,3 @@
*.parquet filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_24.pdf filter=lfs diff=lfs merge=lfs -text
charuco_1189x841_10x7_s115_m90.pdf filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_16.pdf filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_20.pdf filter=lfs diff=lfs merge=lfs -text
*.pdf filter=lfs diff=lfs merge=lfs -text
*.glb filter=lfs diff=lfs merge=lfs -text

5
.gitignore vendored
View File

@ -165,4 +165,7 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.DS_Store
output/svg
*.mp4
output/*.json
~output/standard_box_markers.parquet
~output/object_points.parquet

2647
boom.ipynb

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,497 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"import numpy as np\n",
"from matplotlib import pyplot as plt\n",
"\n",
"NDArray = np.ndarray\n",
"\n",
"# Order of detection result\n",
"# 0, 1, 2, 3\n",
"# TL, TR, BR, BL\n",
"# RED, GREEN, BLUE, YELLOW\n",
"\n",
"\n",
"@dataclass\n",
"class DiamondBoardParameter:\n",
" marker_leghth: float\n",
" \"\"\"\n",
" the ArUco marker length in meter\n",
" \"\"\"\n",
" chess_length: float\n",
" \"\"\"\n",
" the length of the chess board in meter\n",
" \"\"\"\n",
" border_length: float = 0.01\n",
" \"\"\"\n",
" border_length in m, default is 1cm\n",
" \"\"\"\n",
"\n",
" @property\n",
" def marker_border_length(self):\n",
" assert self.chess_length > self.marker_leghth\n",
" return (self.chess_length - self.marker_leghth) / 2\n",
"\n",
" @property\n",
" def total_side_length(self):\n",
" assert self.chess_length > self.marker_leghth\n",
" return self.marker_border_length * 2 + self.chess_length * 3\n",
"\n",
"\n",
"# 9mm + 127mm + 127mm (97mm marker) + 127mm + 10mm\n",
"# i.e. marker boarder = 127mm - 97mm = 30mm (15mm each side)\n",
"Point2D = tuple[float, float]\n",
"Quad2D = tuple[Point2D, Point2D, Point2D, Point2D]\n",
"\n",
"\n",
"@dataclass\n",
"class ArUcoMarker2D:\n",
" id: int\n",
" corners: Quad2D\n",
" params: DiamondBoardParameter\n",
"\n",
" @property\n",
" def np_corners(self):\n",
" \"\"\"\n",
" returns corners in numpy array\n",
" (4, 2) shape\n",
" \"\"\"\n",
" return np.array(self.corners, dtype=np.float32)\n",
"\n",
"\n",
"# let's let TL be the origin\n",
"def generate_diamond_corners(\n",
" ids: tuple[int, int, int, int], params: DiamondBoardParameter\n",
"):\n",
" \"\"\"\n",
" A diamond chess board, which could be count as a kind of ChArUco board\n",
"\n",
" C | 0 | C\n",
" ---------\n",
" 1 | C | 2\n",
" ---------\n",
" C | 3 | C\n",
"\n",
" where C is the chess box, and 0, 1, 2, 3 are the markers (whose ids are passed in order)\n",
"\n",
" Args:\n",
" ids: a tuple of 4 ids of the markers\n",
" params: DiamondBoardParameter\n",
" \"\"\"\n",
"\n",
" def tl_to_square(tl_x: float, tl_y: float, side_length: float) -> Quad2D:\n",
" return (\n",
" (tl_x, tl_y),\n",
" (tl_x + side_length, tl_y),\n",
" (tl_x + side_length, tl_y + side_length),\n",
" (tl_x, tl_y + side_length),\n",
" )\n",
"\n",
" tl_0_x = params.border_length + params.chess_length + params.marker_border_length\n",
" tl_0_y = params.border_length + params.marker_border_length\n",
"\n",
" tl_1_x = params.border_length + params.marker_border_length\n",
" tl_1_y = params.border_length + params.chess_length + params.marker_border_length\n",
"\n",
" tl_2_x = (\n",
" params.border_length + params.chess_length * 2 + params.marker_border_length\n",
" )\n",
" tl_2_y = tl_1_y\n",
"\n",
" tl_3_x = params.border_length + params.chess_length + params.marker_border_length\n",
" tl_3_y = (\n",
" params.border_length + params.chess_length * 2 + params.marker_border_length\n",
" )\n",
" return (\n",
" ArUcoMarker2D(ids[0], tl_to_square(tl_0_x, tl_0_y, params.marker_leghth), params),\n",
" ArUcoMarker2D(ids[1], tl_to_square(tl_1_x, tl_1_y, params.marker_leghth), params),\n",
" ArUcoMarker2D(ids[2], tl_to_square(tl_2_x, tl_2_y, params.marker_leghth), params),\n",
" ArUcoMarker2D(ids[3], tl_to_square(tl_3_x, tl_3_y, params.marker_leghth), params),\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"params = DiamondBoardParameter(0.097, 0.127)\n",
"markers = generate_diamond_corners((16, 17, 18, 19), params)\n",
"\n",
"fig = plt.figure()\n",
"ax = fig.gca()\n",
"ax.set_xlim((0, params.total_side_length))\n",
"ax.set_ylim((0, params.total_side_length)) # type: ignore\n",
"ax.set_aspect(\"equal\")\n",
"# set origin to top-left (from bottom-left)\n",
"ax.invert_yaxis()\n",
"ax.xaxis.set_ticks_position('top')\n",
"\n",
"for marker in markers:\n",
" plt.plot(*marker.np_corners.T, \"o-\", label=str(marker.id))\n",
" for i, (x, y) in enumerate(marker.corners):\n",
" ax.text(x, y, str(i))\n",
"plt.legend()\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from typing import Sequence\n",
"import plotly.graph_objects as go\n",
"import awkward as ak\n",
"import cv2\n",
"from cv2.typing import MatLike\n",
"\n",
"\n",
"def transform_point(matrix: MatLike, point: MatLike):\n",
" assert matrix.shape == (4, 4)\n",
" assert point.shape == (3,)\n",
"\n",
" # Lift point to 4D\n",
" homogeneous_point = np.array([point[0], point[1], point[2], 1])\n",
" # Apply transformation\n",
" transformed = matrix @ homogeneous_point\n",
" # Project back to 3D if w != 1\n",
" if transformed[3] != 1:\n",
" transformed = transformed / transformed[3]\n",
" return transformed[:3]\n",
"\n",
"\n",
"class DiamondPlane3D:\n",
" _ids: NDArray\n",
" \"\"\"\n",
" (n,)\n",
" \"\"\"\n",
" _corners: NDArray\n",
" \"\"\"\n",
" (n, 4, 3)\n",
" \"\"\"\n",
" _transform_matrix: NDArray\n",
" \"\"\"\n",
" 4x4 transformation matrix\n",
" \"\"\"\n",
" _normal_vector: NDArray\n",
" \"\"\"\n",
" (2, 3)\n",
" start (the center of the plane) and end (the normal vector), length 1\n",
" \"\"\"\n",
"\n",
" def __init__(self, items: Sequence[ArUcoMarker2D]):\n",
" self._ids = np.array([item.id for item in items])\n",
" # (n, 4, 2)\n",
" corners_2d = np.array([item.np_corners for item in items])\n",
" # (n, 4, 3)\n",
" self._corners = np.concatenate(\n",
" [corners_2d, np.zeros((corners_2d.shape[0], 4, 1))], axis=-1\n",
" )\n",
" self._transform_matrix = np.eye(4)\n",
"\n",
" def center(items: Sequence[ArUcoMarker2D]):\n",
" return np.mean([item.np_corners for item in items], axis=(0, 1))\n",
"\n",
" c = center(items)\n",
" assert c.shape == (2,)\n",
" self._normal_vector = np.array([(c[0], c[1], 0), (c[0], c[1], 0.1)])\n",
"\n",
" @property\n",
" def ids(self):\n",
" return self._ids\n",
"\n",
" @property\n",
" def corners(self):\n",
" return self._corners\n",
"\n",
" @property\n",
" def transform_matrix(self):\n",
" return self._transform_matrix\n",
"\n",
" @property\n",
" def transformed_corners(self):\n",
" def g():\n",
" for corner in self.corners:\n",
" yield np.array(\n",
" [transform_point(self.transform_matrix, c) for c in corner]\n",
" )\n",
"\n",
" return np.array(list(g()))\n",
"\n",
" @property\n",
" def transformed_normal_vector(self):\n",
" def g():\n",
" for v in self._normal_vector:\n",
" yield transform_point(self.transform_matrix, v)\n",
"\n",
" return np.array(list(g()))\n",
"\n",
" @property\n",
" def transformed_geometry_center(self):\n",
" return np.mean(self.transformed_corners, axis=(0, 1))\n",
"\n",
" def local_rotate(self, angle: float, axis: NDArray):\n",
" \"\"\"\n",
" rotate the plane by angle (in radian) around local center\n",
"\n",
" Args:\n",
" angle: in radian\n",
" axis: (3,)\n",
"\n",
" change basis to local basis, rotate, then change back\n",
" \"\"\"\n",
" raise NotImplementedError\n",
"\n",
" def rotate(self, angle: float, axis: NDArray):\n",
" \"\"\"\n",
" rotate the plane by angle (in radian) around the axis\n",
" \"\"\"\n",
" assert axis.shape == (3,)\n",
" rot_mat = cv2.Rodrigues(axis * angle)[0]\n",
" self._transform_matrix[:3, :3] = np.dot(rot_mat, self._transform_matrix[:3, :3])\n",
"\n",
" def translate(self, vec: NDArray):\n",
" \"\"\"\n",
" translate the plane by vec\n",
" \"\"\"\n",
" assert vec.shape == (3,)\n",
" self._transform_matrix[:3, 3] += vec\n",
"\n",
" def set_transform_matrix(self, mat: NDArray):\n",
" assert mat.shape == (4, 4)\n",
" self._transform_matrix = mat"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"plane_a = DiamondPlane3D(markers)\n",
"\n",
"OFFSET = 0.000\n",
"markers_b = generate_diamond_corners((20, 21, 22, 23), params)\n",
"plane_b = DiamondPlane3D(markers_b)\n",
"# plane_b.translate(np.array([0, 0, 0.1]))\n",
"plane_b.rotate(np.pi/2, np.array([1, 0, 0]))\n",
"plane_b.rotate(np.pi, np.array([0, 0, 1]))\n",
"tmp_c = plane_b.transformed_geometry_center\n",
"plane_b.translate(-tmp_c)\n",
"plane_b.rotate(np.pi, np.array([0, 1, 0]))\n",
"plane_b.translate(tmp_c)\n",
"plane_b.translate(np.array([0, 0, params.total_side_length]))\n",
"plane_b.translate(np.array([0, 0, -OFFSET]))\n",
"# OFFSET for plane_b\n",
"# plane_b.translate(np.array([0, 0.001, 0]))\n",
"\n",
"markers_c = generate_diamond_corners((24, 25, 26, 27), params)\n",
"plane_c = DiamondPlane3D(markers_c)\n",
"tmp = plane_c.transformed_geometry_center\n",
"plane_c.translate(-tmp)\n",
"plane_c.rotate(-np.pi/2, np.array([0, 0, 1]))\n",
"plane_c.translate(tmp)\n",
"plane_c.translate(np.array([0, params.total_side_length-params.border_length, 0]))\n",
"plane_c.rotate(np.pi/2, np.array([0, 1, 0]))\n",
"plane_c.translate(np.array([0, 0, params.total_side_length]))\n",
"plane_c.translate(np.array([0, 0, -OFFSET]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fig = go.Figure()\n",
"t_corners_a = plane_a.transformed_corners\n",
"for i, corners in enumerate(t_corners_a):\n",
" fig.add_trace(\n",
" go.Scatter3d(\n",
" x=corners[:, 0],\n",
" y=corners[:, 1],\n",
" z=corners[:, 2],\n",
" mode=\"markers+lines+text\",\n",
" text=list(map(lambda x: f\"{plane_a.ids[i]}:{x}\", range(4))),\n",
" textposition=\"middle center\",\n",
" name=str(plane_a.ids[i]),\n",
" marker=dict(size=1),\n",
" )\n",
" )\n",
"\n",
"# normal vector\n",
"fig.add_trace(\n",
" go.Scatter3d(\n",
" x=plane_a.transformed_normal_vector[:, 0],\n",
" y=plane_a.transformed_normal_vector[:, 1],\n",
" z=plane_a.transformed_normal_vector[:, 2],\n",
" mode=\"markers+lines\",\n",
" name=\"normal_a\",\n",
" marker=dict(size=2),\n",
" )\n",
")\n",
"\n",
"t_corners_b = plane_b.transformed_corners\n",
"for i, corners in enumerate(t_corners_b):\n",
" fig.add_trace(\n",
" go.Scatter3d(\n",
" x=corners[:, 0],\n",
" y=corners[:, 1],\n",
" z=corners[:, 2],\n",
" mode=\"markers+lines+text\",\n",
" text=list(map(lambda x: f\"{plane_b.ids[i]}:{x}\", range(4))),\n",
" textposition=\"middle center\",\n",
" name=str(plane_b.ids[i]),\n",
" marker=dict(size=1),\n",
" )\n",
" )\n",
"fig.add_trace(\n",
" go.Scatter3d(\n",
" x=plane_b.transformed_normal_vector[:, 0],\n",
" y=plane_b.transformed_normal_vector[:, 1],\n",
" z=plane_b.transformed_normal_vector[:, 2],\n",
" mode=\"markers+lines\",\n",
" name=\"normal_b\",\n",
" marker=dict(size=2),\n",
" )\n",
")\n",
"\n",
"t_corners_c = plane_c.transformed_corners\n",
"for i, corners in enumerate(t_corners_c):\n",
" fig.add_trace(\n",
" go.Scatter3d(\n",
" x=corners[:, 0],\n",
" y=corners[:, 1],\n",
" z=corners[:, 2],\n",
" mode=\"markers+lines+text\",\n",
" text=list(map(lambda x: f\"{plane_c.ids[i]}:{x}\", range(4))),\n",
" name=str(plane_c.ids[i]),\n",
" marker=dict(size=1),\n",
" )\n",
" )\n",
"fig.add_trace(\n",
" go.Scatter3d(\n",
" x=plane_c.transformed_normal_vector[:, 0],\n",
" y=plane_c.transformed_normal_vector[:, 1],\n",
" z=plane_c.transformed_normal_vector[:, 2],\n",
" mode=\"markers+lines\",\n",
" textposition=\"middle center\",\n",
" name=\"normal_c\",\n",
" marker=dict(size=2),\n",
" )\n",
")\n",
"\n",
"# fig.update_layout(\n",
"# scene=dict(\n",
"# aspectmode=\"cube\",\n",
"# yaxis_autorange=\"reversed\",\n",
"# )\n",
"# )\n",
"\n",
"fig.update_layout(\n",
" scene=dict(\n",
" aspectmode='cube',\n",
" xaxis=dict(range=[-0.1, params.total_side_length]),\n",
" yaxis=dict(range=[params.total_side_length, -0.1]),\n",
" zaxis=dict(range=[-0.1, params.total_side_length]),\n",
" )\n",
")\n",
"fig.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import awkward as ak\n",
"from awkward import Record as AwkwardRecord, Array as AwkwardArray\n",
"\n",
"coords = AwkwardArray(\n",
" [\n",
" {\n",
" \"name\": \"a\",\n",
" \"ids\": plane_a.ids,\n",
" \"corners\": t_corners_a,\n",
" },\n",
" {\n",
" \"name\": \"b\",\n",
" \"ids\": plane_b.ids,\n",
" \"corners\": t_corners_b,\n",
" },\n",
" {\n",
" \"name\": \"c\",\n",
" \"ids\": plane_c.ids,\n",
" \"corners\": t_corners_c,\n",
" },\n",
" ]\n",
")\n",
"display(coords)\n",
"_ = ak.to_parquet(coords, \"output/object_points.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from typing import cast\n",
"total_ids = cast(NDArray, ak.to_numpy(coords[\"ids\"])).flatten()\n",
"total_corners = cast(NDArray, ak.to_numpy(coords[\"corners\"])).reshape(-1, 4, 3)\n",
"#display(total_ids, total_corners)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dict(zip(total_ids, total_corners))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"total_ids = np.concatenate([plane_a.ids, plane_b.ids, plane_c.ids])\n",
"total_corners = np.concatenate([t_corners_a, t_corners_b, t_corners_c])\n",
"id_corner_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because it is too large Load Diff

View File

@ -9,9 +9,6 @@ from itertools import chain
from typing import Optional, Sequence, TypedDict, cast
import awkward as ak
from matplotlib.pyplot import stem
from numpy import ndarray
class ArucoDictionary(Enum):
Dict_4X4_50 = aruco.DICT_4X4_50
@ -37,10 +34,10 @@ class ArucoDictionary(Enum):
Dict_ArUco_ORIGINAL = aruco.DICT_ARUCO_ORIGINAL
IMAGE_FOLDER = Path("dumped/usbcam")
IMAGE_FOLDER = Path("dumped/batch_three/c")
OUTPUT_FOLDER = Path("output")
DICTIONARY = ArucoDictionary.Dict_4X4_50
CALIBRATION_PARQUET: Optional[Path] = OUTPUT_FOLDER / "usbcam_cal.parquet"
CALIBRATION_PARQUET: Optional[Path] = OUTPUT_FOLDER / "c-af_03.parquet"
class CameraParams(TypedDict):
@ -140,7 +137,7 @@ def main():
"rotation_vectors": rvecs,
"translation_vectors": tvecs,
}
ak.to_parquet([parameters], OUTPUT_FOLDER / "calibration.parquet")
ak.to_parquet([parameters], CALIBRATION_PARQUET)
else:
logger.warning(
"no calibration data calculated; either no images or already calibrated"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

14
cvt_all_pdfs.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
# Set input folder
INPUT_DIR="board"
DPI=100 # Resolution in DPI
# Iterate over all .pdf files in the folder
for pdf in "$INPUT_DIR"/*.pdf; do
# Strip .pdf to get base name
base="${pdf%.pdf}"
# Convert to PNG
magick convert -density "$DPI" "$pdf" "${base}.png"
echo "Converted: $pdf${base}.png"
done

131
draw_uv.ipynb Normal file
View File

@ -0,0 +1,131 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from PIL import Image, ImageOps\n",
"from pathlib import Path\n",
"from typing import Optional\n",
"from matplotlib import pyplot as plt\n",
"import logging"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TILE_SIZE = 1650 # in pixels\n",
"BORDER_SIZE = 200 # in pixels\n",
"BORDER_COLOR = (255, 255, 255)\n",
"BACKGROUND_COLOR = (255, 255, 255)\n",
"IMAGE_DIR = Path(\"board\")\n",
"\n",
"# Define the layout grid (rows of indices, None for empty)\n",
"# fmt: off\n",
"layout:list[list[Optional[int]]] = [\n",
" [None, None, 0, None, None],\n",
" [None, None, 1, None, None],\n",
" [None, 5, 2, 4, None],\n",
" [None, None, 3, None, None],\n",
"]\n",
"# fmt: on\n",
"\n",
"\n",
"\n",
"# charuco_410x410_3x3_s133_m105_face3_no_12_DICT_7X7_1000\n",
"# xxxxxxx_<phy_size>_<grid_size>_<checker_size>_<marker_space>_face<face_idx>_no_<no_idx>_DICT_<dict_size>\n",
"# 0 1 2 3 4 5 6 7 8\n",
"def parse_filename_to_face_idx(filename: str):\n",
" parts = filename.split(\"_\")\n",
" return int(parts[5][len(\"face\") :])\n",
"\n",
"\n",
"image_pathes = list(IMAGE_DIR.glob(\"*.png\"))\n",
"image_indice = map(lambda p: parse_filename_to_face_idx(p.stem), image_pathes)\n",
"images = {k: v for k, v in zip(image_indice, image_pathes)}\n",
"display(images)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create blank canvas\n",
"rows = len(layout)\n",
"cols = len(layout[0])\n",
"canvas = Image.new(\"RGB\", (cols * TILE_SIZE, rows * TILE_SIZE), BACKGROUND_COLOR)\n",
"\n",
"# Paste tiles\n",
"for y, row in enumerate(layout):\n",
" for x, idx in enumerate(row):\n",
" if idx is not None:\n",
" path = images.get(idx)\n",
" if path is not None:\n",
" tile = Image.open(path)\n",
" # for the face index 4, rotate the tile 180 degrees\n",
" if idx == 4:\n",
" tile = tile.rotate(180)\n",
" canvas.paste(tile, (x * TILE_SIZE, y * TILE_SIZE))\n",
" else:\n",
" logging.warning(f\"Missing: {idx}\")\n",
"\n",
"# Calculate canvas size (before border)\n",
"canvas_width = cols * TILE_SIZE\n",
"canvas_height = rows * TILE_SIZE\n",
"\n",
"# Determine target size to make it square after padding\n",
"target_size = max(canvas_width, canvas_height)\n",
"extra_padding = target_size - canvas_height\n",
"top_pad = extra_padding // 2\n",
"bottom_pad = extra_padding - top_pad\n",
"\n",
"# First add vertical padding to center the layout\n",
"canvas_with_border = ImageOps.expand(\n",
" canvas,\n",
" border=(0, top_pad, 0, bottom_pad), # (left, top, right, bottom)\n",
" fill=BACKGROUND_COLOR,\n",
")\n",
"\n",
"plt.imshow(canvas_with_border)\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"canvas_with_border.save(\"merged_uv_layout.png\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

58
dump_and_play.sh Executable file
View File

@ -0,0 +1,58 @@
#!/bin/bash
if [ -z $1 ]; then
echo "Usage: $0 <port>"
exit 1
else
echo "dumping video from port $1"
fi
TARGET_PORT=$1;
if ! [[ $TARGET_PORT =~ ^[0-9]+$ ]] ; then
echo "error: expect a number, got $TARGET_PORT" >&2
exit 1
fi
# See also majestic.yaml
# Get the current date and time in the format YYYYMMDD-HHMMSS
DATE=$(date +"%Y%m%d-%H%M%S")
# use mts as MPEG transport stream
FILENAME="output/video-${DATE}-${TARGET_PORT}.mts"
# SINK="autovideosink"
SINK="glimagesink"
# Run the GStreamer pipeline with the dynamic filename
# gst-launch-1.0 -e udpsrc port=$TARGET_PORT \
# ! 'application/x-rtp,encoding-name=H265,payload=96' \
# ! rtph265depay \
# ! h265parse \
# ! tee name=t \
# t. ! queue ! $DECODER ! videoconvert ! $SINK \
# t. ! queue ! mp4mux ! filesink location=$FILENAME
# DECODER="nvh265dec"
# DECODER="vulkanh265dec"
# DECODER="avdec_h265"
DECODER="vtdec_hw"
# DECODER="vtdec"
# gst-launch-1.0 -e udpsrc port=$TARGET_PORT auto-multicast=true multicast-group=224.0.0.123 \
# ! 'application/x-rtp,encoding-name=H265,payload=96' \
# ! rtph265depay \
# ! tee name=t \
# ! h265parse \
# t. ! queue ! $DECODER ! videoconvert ! $SINK \
# t. ! queue ! mpegtsmux ! filesink location=$FILENAME
# hvc1
# hev1
gst-launch-1.0 -e udpsrc port=$TARGET_PORT auto-multicast=true multicast-group=224.0.0.123 \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! tee name=t \
t. ! queue ! h265parse ! "video/x-h265,stream-format=hvc1" ! $DECODER ! videoconvert ! $SINK \
t. ! queue ! h265parse ! mpegtsmux ! filesink location=$FILENAME

230
estimate_extrinstic.ipynb Normal file
View File

@ -0,0 +1,230 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import cv2\n",
"import cv2.aruco as aruco\n",
"from typing import Sequence, cast\n",
"import awkward as ak\n",
"from pathlib import Path\n",
"import numpy as np\n",
"from typing import Final\n",
"from matplotlib import pyplot as plt\n",
"from cv2.typing import MatLike"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"A_PATH = Path(\"output/af_03.parquet\")\n",
"B_PATH = Path(\"output/ae_08.parquet\")\n",
"\n",
"a_params = ak.from_parquet(A_PATH)[0]\n",
"b_params = ak.from_parquet(B_PATH)[0]\n",
"display(a_params)\n",
"display(b_params)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_new_aruco_marker_origin(marker_length: float):\n",
" \"\"\"\n",
" Create a new ArUco marker origin with the given length.\n",
"\n",
" 0 -> x\n",
" |\n",
" v\n",
" y\n",
"\n",
" 0---1\n",
" | |\n",
" 3---2\n",
"\n",
" So that the center of the marker is the origin for this PnP problem.\n",
"\n",
" Args:\n",
" marker_length: The length of the marker.\n",
" \"\"\"\n",
" return np.array(\n",
" [\n",
" [-marker_length / 2, marker_length / 2, 0],\n",
" [marker_length / 2, marker_length / 2, 0],\n",
" [marker_length / 2, -marker_length / 2, 0],\n",
" [-marker_length / 2, -marker_length / 2, 0],\n",
" ]\n",
" ).astype(np.float32)\n",
"\n",
"\n",
"DICTIONARY: Final[int] = aruco.DICT_4X4_50\n",
"# 400mm\n",
"MARKER_LENGTH: Final[float] = 0.4\n",
"aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)\n",
"detector = aruco.ArucoDetector(\n",
" dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"a_img = cv2.imread(str(Path(\"dumped/marker/video-20241205-152716-board.png\")))\n",
"a_mtx = ak.to_numpy(a_params[\"camera_matrix\"])\n",
"a_dist = ak.to_numpy(a_params[\"distortion_coefficients\"])\n",
"\n",
"b_img = cv2.imread(str(Path(\"dumped/marker/video-20241205-152721-board.png\")))\n",
"b_mtx = ak.to_numpy(b_params[\"camera_matrix\"])\n",
"b_dist = ak.to_numpy(b_params[\"distortion_coefficients\"])"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"a_corners, a_ids, _a_rejected = detector.detectMarkers(a_img)\n",
"b_corners, b_ids, _b_rejected = detector.detectMarkers(b_img)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_corners"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ok, a_rvec, a_tvec = cv2.solvePnP(create_new_aruco_marker_origin(MARKER_LENGTH), a_corners[0], a_mtx, a_dist)\n",
"if not ok:\n",
" raise ValueError(\"Failed to solve PnP for A\")\n",
"a_img_output = cv2.drawFrameAxes(a_img, a_mtx, a_dist, a_rvec, a_tvec, MARKER_LENGTH)\n",
"plt.imshow(cv2.cvtColor(a_img_output, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ok, b_rvec, b_tvec = cv2.solvePnP(create_new_aruco_marker_origin(MARKER_LENGTH), b_corners[0], b_mtx, b_dist)\n",
"if not ok:\n",
" raise ValueError(\"Failed to solve PnP for B\")\n",
"b_img_output = cv2.drawFrameAxes(b_img, b_mtx, b_dist, b_rvec, b_tvec, MARKER_LENGTH)\n",
"plt.imshow(cv2.cvtColor(b_img_output, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from typing import TypeVar, Union\n",
"\n",
"\n",
"T = TypeVar(\"T\")\n",
"\n",
"\n",
"def create_transform_matrix(rvec: MatLike, tvec: MatLike, dtype: type = np.float32):\n",
" assert rvec.shape == (3, 1)\n",
" assert tvec.shape == (3, 1)\n",
" R, _ = cv2.Rodrigues(rvec)\n",
" transform = np.eye(4, dtype=dtype)\n",
" transform[:3, :3] = R\n",
" transform[:3, 3] = tvec.flatten()\n",
" return transform\n",
"\n",
"\n",
"def extract_translation(transform: MatLike):\n",
" assert transform.shape == (4, 4)\n",
" return transform[:3, 3]\n",
"\n",
"\n",
"def extract_rotation(transform: MatLike):\n",
" assert transform.shape == (4, 4)\n",
" return transform[:3, :3]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_trans = create_transform_matrix(a_rvec, a_tvec)\n",
"display(a_trans)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"np.linalg.inv(a_trans)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Converts a rotation matrix to a rotation vector or vice versa\n",
"a_rmtx, _ = cv2.Rodrigues(a_rvec)\n",
"b_rmtx, _ = cv2.Rodrigues(b_rvec)\n",
"a_camera_coord = -(a_rmtx.T@ a_tvec)\n",
"b_camera_coord = -(b_rmtx.T @ b_tvec)\n",
"distance = np.linalg.norm(a_camera_coord - b_camera_coord)\n",
"a_distance = np.linalg.norm(a_camera_coord)\n",
"b_distance = np.linalg.norm(b_camera_coord)\n",
"display(\"d_ab={:.4}m a={:.4}m b={:.4}m\".format(distance, a_distance, b_distance))\n",
"display(\"a_coord={}\".format(a_camera_coord.T))\n",
"display(\"b_coord={}\".format(b_camera_coord.T))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -10,9 +10,14 @@ import numpy as np
NDArray = np.ndarray
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
DICTIONARY: Final[int] = aruco.DICT_4X4_50
# 7x7
DICTIONARY: Final[int] = aruco.DICT_7X7_1000
# 400mm
MARKER_LENGTH: Final[float] = 0.4
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
def gen():
@ -47,23 +52,18 @@ def main():
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
for m, i in zip(markers, ids):
center = np.mean(m, axis=0).astype(int)
GREY = (128, 128, 128)
logger.info("id={}, center={}", i, center)
cv2.circle(frame, tuple(center), 5, GREY, -1)
cv2.circle(frame, tuple(center), 5, RED, -1)
cv2.putText(
frame,
str(i),
tuple(center),
cv2.FONT_HERSHEY_SIMPLEX,
1,
GREY,
RED,
2,
)
# BGR
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
color_map = [RED, GREEN, BLUE, YELLOW]
for color, corners in zip(color_map, m):
corners = corners.astype(int)

View File

@ -0,0 +1,442 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"from pathlib import Path\n",
"from typing import Any, Final, Optional, TypeAlias, TypedDict, Union, cast\n",
"from dataclasses import dataclass\n",
"\n",
"import cv2\n",
"import numpy as np\n",
"import orjson\n",
"import trimesh\n",
"from beartype import beartype\n",
"from cv2 import aruco\n",
"from cv2.typing import MatLike\n",
"from jaxtyping import Float, Int, Num, jaxtyped\n",
"from loguru import logger\n",
"from matplotlib import pyplot as plt\n",
"from numpy.typing import ArrayLike\n",
"from numpy.typing import NDArray as NDArrayT\n",
"\n",
"NDArray: TypeAlias = np.ndarray"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"INPUT_IMAGE = Path(\"merged_uv_layout.png\")\n",
"# 7x7\n",
"DICTIONARY: Final[int] = aruco.DICT_7X7_1000\n",
"# 400mm\n",
"MARKER_LENGTH: Final[float] = 0.4"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)\n",
"detector = aruco.ArucoDetector(\n",
" dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"frame = cv2.imread(str(INPUT_IMAGE))\n",
"grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)\n",
"# pylint: disable-next=unpacking-non-sequence\n",
"markers, ids, rejected = detector.detectMarkers(grey)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"# Note: BGR\n",
"RED = (0, 0, 255)\n",
"GREEN = (0, 255, 0)\n",
"BLUE = (255, 0, 0)\n",
"YELLOW = (0, 255, 255)\n",
"GREY = (128, 128, 128)\n",
"CYAN = (255, 255, 0)\n",
"MAGENTA = (255, 0, 255)\n",
"ORANGE = (0, 165, 255)\n",
"PINK = (147, 20, 255)\n",
"\n",
"UI_SCALE = 10\n",
"UI_SCALE_FONT = 8\n",
"UI_SCALE_FONT_WEIGHT = 20"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"out = frame.copy()\n",
"# `markers` is [N, 1, 4, 2]\n",
"# `ids` is [N, 1]\n",
"if ids is not None:\n",
" markers = np.reshape(markers, (-1, 4, 2))\n",
" ids = np.reshape(ids, (-1, 1))\n",
" # logger.info(\"markers={}, ids={}\", np.array(markers).shape, np.array(ids).shape)\n",
" for m, i in zip(markers, ids):\n",
" # logger.info(\"id={}, center={}\", i, center)\n",
" center = np.mean(m, axis=0).astype(int) # type: ignore\n",
" # BGR\n",
" color_map = [RED, GREEN, BLUE, YELLOW]\n",
" for color, corners in zip(color_map, m):\n",
" corners = corners.astype(int)\n",
" out = cv2.circle(out, corners, 5*UI_SCALE, color, -1)\n",
" cv2.circle(out, tuple(center), 5*UI_SCALE, CYAN, -1)\n",
" cv2.putText(\n",
" out,\n",
" str(i),\n",
" tuple(center),\n",
" cv2.FONT_HERSHEY_SIMPLEX,\n",
" 1*UI_SCALE_FONT,\n",
" MAGENTA,\n",
" UI_SCALE_FONT_WEIGHT,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"@jaxtyped(typechecker=beartype)\n",
"@dataclass\n",
"class Marker:\n",
" id: int\n",
" center: Num[NDArray, \"2\"]\n",
" corners: Num[NDArray, \"4 2\"]\n",
"\n",
"\n",
"output_markers: list[Marker] = []\n",
"if ids is not None:\n",
" IMAGE_WIDTH = frame.shape[1]\n",
" IMAGE_HEIGHT = frame.shape[0]\n",
"\n",
" def normalize_point(point: NDArrayT[Any]) -> NDArrayT[np.float64]:\n",
" \"\"\"\n",
" input could be: [N, 2] or [2]\n",
" \"\"\"\n",
" if point.ndim == 1:\n",
" return point / np.array([IMAGE_WIDTH, IMAGE_HEIGHT])\n",
" elif point.ndim == 2:\n",
" return point / np.array([IMAGE_WIDTH, IMAGE_HEIGHT])\n",
" else:\n",
" raise ValueError(f\"Invalid shape: {point.shape}\")\n",
"\n",
" def flip_y(point: NDArrayT[Any], y_max: int) -> NDArrayT[Any]:\n",
" \"\"\"\n",
" flip y axis;\n",
"\n",
" Usually OpenCV image y-axis is inverted. (origin at top-left)\n",
" In UV layout, the origin is at bottom-left.\n",
" \"\"\"\n",
" return np.array([point[0], y_max - point[1]])\n",
"\n",
" for m, i in zip(markers, ids):\n",
" center = np.mean(m, axis=0).astype(int) # type: ignore\n",
" output_markers.append(\n",
" Marker(\n",
" id=int(i[0]),\n",
" center=flip_y(normalize_point(center), 1),\n",
" corners=np.array([flip_y(normalize_point(c), 1) for c in m]),\n",
" )\n",
" )\n",
"\n",
"with open(\"output/aruco_2d_uv_coords_normalized.json\", \"wb\") as f:\n",
" f.write(orjson.dumps(output_markers, option=orjson.OPT_SERIALIZE_NUMPY))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plt.imshow(cv2.cvtColor(out, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cv2.imwrite(\"merged_uv_layout_with_markers.png\", out)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"@jaxtyped(typechecker=beartype)\n",
"def interpolate_uvs_to_3d(\n",
" uv_points: Num[NDArray, \"N 2\"],\n",
" vertices: Num[NDArray, \"V 3\"],\n",
" uvs: Num[NDArray, \"V 2\"],\n",
" faces: Num[NDArray, \"F 3\"],\n",
" epsilon: float = 1e-6,\n",
") -> Num[NDArray, \"N 3\"]:\n",
" \"\"\"\n",
" Map multiple UV points to 3D coordinates using barycentric interpolation.\n",
"\n",
" Args:\n",
" uv_points: (N, 2) array of UV coordinates in [0,1]\n",
" vertices: (V, 3) array of mesh vertex positions\n",
" uvs: (V, 2) array of per-vertex UV coordinates\n",
" faces: (F, 3) array of triangle vertex indices\n",
" epsilon: barycentric inside-triangle tolerance\n",
"\n",
" Returns:\n",
" (N, 3) array of interpolated 3D coordinates (NaNs if no triangle found)\n",
" \"\"\"\n",
" results = np.full((uv_points.shape[0], 3), np.nan, dtype=np.float64)\n",
"\n",
" for pi, uv_point in enumerate(uv_points):\n",
" for face in faces:\n",
" uv_tri = uvs[face] # (3,2)\n",
" v_tri = vertices[face] # (3,3)\n",
"\n",
" A = np.array(\n",
" [\n",
" [uv_tri[0, 0] - uv_tri[2, 0], uv_tri[1, 0] - uv_tri[2, 0]],\n",
" [uv_tri[0, 1] - uv_tri[2, 1], uv_tri[1, 1] - uv_tri[2, 1]],\n",
" ]\n",
" )\n",
" b = uv_point - uv_tri[2]\n",
"\n",
" try:\n",
" w0, w1 = np.linalg.solve(A, b)\n",
" w2 = 1.0 - w0 - w1\n",
" if min(w0, w1, w2) >= -epsilon:\n",
" results[pi] = w0 * v_tri[0] + w1 * v_tri[1] + w2 * v_tri[2]\n",
" break # Stop after first matching triangle\n",
" except np.linalg.LinAlgError:\n",
" continue\n",
"\n",
" return results\n",
"\n",
"\n",
"@jaxtyped(typechecker=beartype)\n",
"def interpolate_uvs_to_3d_trimesh(\n",
" uv_points: Num[NDArray, \"N 2\"],\n",
" mesh: Union[trimesh.Trimesh, trimesh.Scene],\n",
" epsilon: float = 1e-6,\n",
") -> Num[NDArray, \"N 3\"]:\n",
" \"\"\"\n",
" Wrapper for batched UV-to-3D interpolation using a trimesh mesh or scene.\n",
"\n",
" Args:\n",
" uv_points: (N, 2) UV coordinates to convert\n",
" mesh: a Trimesh or Scene object\n",
" epsilon: barycentric epsilon tolerance\n",
"\n",
" Returns:\n",
" (N, 3) array of 3D positions (NaN if outside mesh)\n",
" \"\"\"\n",
" if isinstance(mesh, trimesh.Scene):\n",
" if len(mesh.geometry) == 0:\n",
" raise ValueError(\"Scene has no geometry.\")\n",
" mesh = list(mesh.geometry.values())[0]\n",
"\n",
" if not isinstance(mesh, trimesh.Trimesh):\n",
" raise TypeError(\"Expected a Trimesh or Scene with geometry.\")\n",
"\n",
" if mesh.visual is None:\n",
" raise ValueError(\"Mesh does not have visual.\")\n",
"\n",
" if mesh.visual.uv is None:\n",
" raise ValueError(\"Mesh does not have UVs.\")\n",
"\n",
" return interpolate_uvs_to_3d(\n",
" uv_points=uv_points,\n",
" vertices=mesh.vertices,\n",
" uvs=mesh.visual.uv,\n",
" faces=mesh.faces,\n",
" epsilon=epsilon,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"m = trimesh.load_mesh(\"sample/standard_box.glb\")\n",
"def marker_to_3d_coords(marker: Marker, mesh: trimesh.Trimesh):\n",
" uv_points = marker.corners\n",
" return interpolate_uvs_to_3d_trimesh(uv_points, mesh)\n",
"\n",
"id_to_3d_coords = {marker.id: marker_to_3d_coords(marker, m) for marker in output_markers}\n",
"# note that the glb is Y up\n",
"# when visualizing with matplotlib, it's Z up\n",
"OPEN_GL_TO_BLENDER = np.array([[1, 0, 0], [0, 0, -1], [0, 1, 0]])\n",
"display(np.linalg.inv(OPEN_GL_TO_BLENDER)) # should be the same"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"# matplotlib default colors scheme\n",
"colors: list[str] = plt.rcParams[\"axes.prop_cycle\"].by_key()[\"color\"]\n",
"\n",
"def hex_to_rgb(hex_color: str) -> tuple[float, float, float]:\n",
" assert hex_color.startswith(\"#\")\n",
" assert len(hex_color) == 7\n",
" return tuple(int(hex_color[i:i+2], 16) / 255.0 for i in (1, 3, 5)) # type: ignore"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from functools import lru_cache\n",
"from typing import Optional, TypedDict\n",
"import awkward as ak\n",
"\n",
"\n",
"class MarkerFace(TypedDict):\n",
" name: str\n",
" ids: Int[NDArray, \"N\"]\n",
" \"\"\"\n",
" ArUco marker ids\n",
" \"\"\"\n",
" corners: Num[NDArray, \"N 4 3\"]\n",
" \"\"\"\n",
" Corner coordinates in 3D of rectangle,\n",
" relative to the world origin\n",
" \"\"\"\n",
"\n",
"\n",
"@dataclass\n",
"class Face:\n",
" color: tuple[float, float, float]\n",
" marker_ids: list[int]\n",
"\n",
"\n",
"# fmt: off\n",
"layout:list[list[Optional[int]]] = [\n",
" [None, None, 0, None, None],\n",
" [None, None, 1, None, None],\n",
" [None, 5, 2, 4, None],\n",
" [None, None, 3, None, None],\n",
"]\n",
"# fmt: on\n",
"\n",
"faces = {\n",
" \"bottom\": Face(color=hex_to_rgb(colors[0]), marker_ids=[0, 1, 2, 3]),\n",
" \"back\": Face(color=hex_to_rgb(colors[1]), marker_ids=[4, 5, 6, 7]),\n",
" \"top\": Face(color=hex_to_rgb(colors[2]), marker_ids=[8, 9, 10, 11]),\n",
" \"front\": Face(color=hex_to_rgb(colors[3]), marker_ids=[12, 13, 14, 15]),\n",
" \"right\": Face(color=hex_to_rgb(colors[4]), marker_ids=[16, 17, 18, 19]),\n",
" \"left\": Face(color=hex_to_rgb(colors[5]), marker_ids=[20, 21, 22, 23]),\n",
"}\n",
"\n",
"markers: list[MarkerFace] = []\n",
"for name, face in faces.items():\n",
" corners = np.array([id_to_3d_coords[id] for id in face.marker_ids])\n",
" assert corners.shape == (4, 4, 3)\n",
" markers.append(MarkerFace(name=name, ids=np.array(face.marker_ids), corners=corners))\n",
"display(markers)\n",
"ak.to_parquet(markers, \"output/standard_box_markers.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@lru_cache\n",
"def get_face_by_marker_id(marker_id: int) -> Optional[Face]:\n",
" for face in faces.values():\n",
" if marker_id in face.marker_ids:\n",
" return face\n",
" return None\n",
"\n",
"\n",
"# 3D Visualization (with flipped and fully valid data)\n",
"fig = plt.figure(figsize=(8, 8))\n",
"ax = fig.add_subplot(111, projection=\"3d\")\n",
"\n",
"for tag_id, corners in id_to_3d_coords.items():\n",
" corners = np.array(corners)\n",
" face = get_face_by_marker_id(tag_id)\n",
" assert face is not None\n",
" color = face.color\n",
" for i in range(4):\n",
" p1 = OPEN_GL_TO_BLENDER @ corners[i]\n",
" p2 = OPEN_GL_TO_BLENDER @ corners[(i + 1) % 4]\n",
" ax.plot(*zip(p1, p2), color=color)\n",
" center = OPEN_GL_TO_BLENDER @ corners.mean(axis=0)\n",
" ax.scatter(*center, color=color)\n",
" ax.text(*center, str(tag_id), fontsize=9, color=\"black\") # type: ignore\n",
"\n",
"ax.set_box_aspect([1, 1, 1]) # type: ignore\n",
"ax.set_title(\"ArUco Corners in 3D\")\n",
"ax.set_xlabel(\"X\")\n",
"ax.set_ylabel(\"Y\")\n",
"ax.set_zlabel(\"Z\") # type: ignore\n",
"\n",
"# Set the viewing angle\n",
"# ax.view_init(elev=60, azim=35) # type: ignore\n",
"\n",
"plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

173
find_extrinsic_object.py Normal file
View File

@ -0,0 +1,173 @@
from datetime import datetime
from pathlib import Path
from typing import Final, Optional, TypedDict, cast
import awkward as ak
import cv2
import numpy as np
from cv2 import aruco
from cv2.typing import MatLike
from jaxtyping import Int, Num
from loguru import logger
NDArray = np.ndarray
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
# OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet"
OBJECT_POINTS_PARQUET = Path("output") / "standard_box_markers.parquet"
DICTIONARY: Final[int] = aruco.DICT_4X4_50
# 400mm
MARKER_LENGTH: Final[float] = 0.4
class MarkerFace(TypedDict):
"""
for diamond ArUco markers, N is 4
"""
name: str
"""
a label for the face
"""
ids: Int[NDArray, "N"]
"""
ArUco marker ids
"""
corners: Num[NDArray, "N 4 3"]
"""
Corner coordinates in 3D of rectangle,
relative to the world origin
"""
def gen():
API = cv2.CAP_AVFOUNDATION
cap = cv2.VideoCapture(0, API)
while True:
ret, frame = cap.read()
if not ret:
logger.warning("Failed to grab frame")
break
yield frame
def main():
aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)
cal = ak.from_parquet(CALIBRATION_PARQUET)[0]
camera_matrix = cast(MatLike, ak.to_numpy(cal["camera_matrix"]))
distortion_coefficients = cast(MatLike, ak.to_numpy(cal["distortion_coefficients"]))
ops = ak.from_parquet(OBJECT_POINTS_PARQUET)
detector = aruco.ArucoDetector(
dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()
)
total_ids = cast(NDArray, ak.to_numpy(ops["ids"])).flatten()
total_corners = cast(NDArray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))
logger.info("ops_map={}", ops_map)
writer: Optional[cv2.VideoWriter] = None
for frame in gen():
grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# pylint: disable-next=unpacking-non-sequence
markers, ids, rejected = detector.detectMarkers(grey)
# `markers` is [N, 1, 4, 2]
# `ids` is [N, 1]
if ids is not None:
markers = np.reshape(markers, (-1, 4, 2))
ids = np.reshape(ids, (-1, 1))
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
ips_map: dict[int, NDArray] = {}
for cs, id in zip(markers, ids):
id = int(id)
cs = cast(NDArray, cs)
ips_map[id] = cs
center = np.mean(cs, axis=0).astype(int)
GREY = (128, 128, 128)
# logger.info("id={}, center={}", id, center)
cv2.circle(frame, tuple(center), 5, GREY, -1)
cv2.putText(
frame,
str(id),
tuple(center),
cv2.FONT_HERSHEY_SIMPLEX,
1,
GREY,
2,
)
# BGR
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
color_map = [RED, GREEN, BLUE, YELLOW]
for color, corners in zip(color_map, cs):
corners = corners.astype(int)
frame = cv2.circle(frame, corners, 5, color, -1)
# https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e
ops: NDArray = np.empty((0, 3), dtype=np.float32)
ips: NDArray = np.empty((0, 2), dtype=np.float32)
for id, ip in ips_map.items():
try:
op = ops_map[id]
assert ip.shape == (4, 2), f"corners.shape={ip.shape}"
assert op.shape == (4, 3), f"op.shape={op.shape}"
ops = np.concatenate((ops, op), axis=0)
ips = np.concatenate((ips, ip), axis=0)
except KeyError:
logger.warning("No object points for id={}", id)
continue
assert len(ops) == len(ips), f"len(ops)={len(ops)} != len(ips)={len(ips)}"
if len(ops) > 0:
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags
ret, rvec, tvec = cv2.solvePnP(
objectPoints=ops,
imagePoints=ips,
cameraMatrix=camera_matrix,
distCoeffs=distortion_coefficients,
flags=cv2.SOLVEPNP_SQPNP,
)
# ret, rvec, tvec, inliners = cv2.solvePnPRansac(
# objectPoints=ops,
# imagePoints=ips,
# cameraMatrix=camera_matrix,
# distCoeffs=distortion_coefficients,
# flags=cv2.SOLVEPNP_SQPNP,
# )
if ret:
cv2.drawFrameAxes(
frame,
camera_matrix,
distortion_coefficients,
rvec,
tvec,
MARKER_LENGTH,
)
else:
logger.warning("Failed to solvePnPRansac")
cv2.imshow("frame", frame)
if writer is not None:
writer.write(frame)
if (k := cv2.waitKey(1)) == ord("q"):
logger.info("Exiting")
break
elif k == ord("s"):
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.png"
logger.info("Saving to {}", file_name)
cv2.imwrite(file_name, frame)
elif k == ord("r"):
if writer is not None:
writer.release()
writer = None
logger.info("Recording stopped")
else:
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.mp4"
logger.info("Recording to {}", file_name)
fourcc = cv2.VideoWriter.fourcc(*"mp4v")
writer = cv2.VideoWriter(file_name, fourcc, 20.0, frame.shape[:2][::-1])
if __name__ == "__main__":
main()

18
interactive_example.py Normal file
View File

@ -0,0 +1,18 @@
# %%
import numpy as np
# %%
# %% [markdown]
# # Extract the 3D coordinates of the ArUco markers from the image
#
# 1. Load the image
# 2. Detect the ArUco markers
# 3. Get the 3D coordinates of the markers
# 4. Save the 3D coordinates to a file
# %%
# %%

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
output/object_points.parquet LFS Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

53
playground.py Normal file
View File

@ -0,0 +1,53 @@
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.17.0
# kernelspec:
# language: python
# name: python3
# ---
# %%
import awkward as ak
from pathlib import Path
import numpy as np
from IPython.display import display
from typing import TypedDict
from jaxtyping import Int, Num
NDArray = np.ndarray
# %%
class MarkerFace(TypedDict):
"""
for diamond ArUco markers, N is 4
"""
name: str
"""
a label for the face
"""
ids: Int[NDArray, "N"]
"""
ArUco marker ids
"""
corners: Num[NDArray, "N 4 3"]
"""
Corner coordinates in 3D of rectangle,
relative to the world origin
"""
# %%
# OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet"
OBJECT_POINTS_PARQUET = Path("output") / "standard_box_markers.parquet"
ops = ak.from_parquet(OBJECT_POINTS_PARQUET)
display(ops)
# %%

View File

@ -0,0 +1,288 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"import cv2\n",
"from cv2 import aruco\n",
"from datetime import datetime\n",
"from loguru import logger\n",
"from pathlib import Path\n",
"from typing import Optional, cast, Final\n",
"import awkward as ak\n",
"from cv2.typing import MatLike\n",
"import numpy as np\n",
"from matplotlib import pyplot as plt\n",
"import awkward as ak\n",
"from awkward import Record as AwkwardRecord, Array as AwkwardArray"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"NDArray = np.ndarray\n",
"OBJECT_POINTS_PARQUET = Path(\"output\") / \"object_points.parquet\"\n",
"DICTIONARY: Final[int] = aruco.DICT_4X4_50\n",
"# 400mm\n",
"MARKER_LENGTH: Final[float] = 0.4\n",
"\n",
"A_CALIBRATION_PARQUET = Path(\"output\") / \"a-ae_08.parquet\"\n",
"B_CALIBRATION_PARQUET = Path(\"output\") / \"b-ae_09.parquet\"\n",
"C_CALIBRATION_PARQUET = Path(\"output\") / \"c-af_03.parquet\""
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)\n",
"def read_camera_calibration(path: Path) -> tuple[MatLike, MatLike]:\n",
" cal = ak.from_parquet(path)[0]\n",
" camera_matrix = cast(MatLike, ak.to_numpy(cal[\"camera_matrix\"]))\n",
" distortion_coefficients = cast(MatLike, ak.to_numpy(cal[\"distortion_coefficients\"]))\n",
" return camera_matrix, distortion_coefficients\n",
"\n",
"ops = ak.from_parquet(OBJECT_POINTS_PARQUET)\n",
"detector = aruco.ArucoDetector(\n",
" dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()\n",
")\n",
"\n",
"total_ids = cast(NDArray, ak.to_numpy(ops[\"ids\"])).flatten()\n",
"total_corners = cast(NDArray, ak.to_numpy(ops[\"corners\"])).reshape(-1, 4, 3)\n",
"ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))\n",
"# display(\"ops_map\", ops_map)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"def process(\n",
" frame: MatLike,\n",
" cam_mtx: MatLike,\n",
" dist_coeffs: MatLike,\n",
" target: Optional[MatLike] = None,\n",
") -> tuple[MatLike, Optional[MatLike], Optional[MatLike]]:\n",
" if target is None:\n",
" target = frame.copy()\n",
" grey = cv2.cvtColor(target, cv2.COLOR_BGR2GRAY)\n",
" # pylint: disable-next=unpacking-non-sequence\n",
" markers, ids, rejected = detector.detectMarkers(grey)\n",
" # `markers` is [N, 1, 4, 2]\n",
" # `ids` is [N, 1]\n",
" ret_rvec: Optional[MatLike] = None\n",
" ret_tvec: Optional[MatLike] = None\n",
" if ids is not None:\n",
" markers = np.reshape(markers, (-1, 4, 2))\n",
" ids = np.reshape(ids, (-1, 1))\n",
" # logger.info(\"markers={}, ids={}\", np.array(markers).shape, np.array(ids).shape)\n",
" ips_map: dict[int, NDArray] = {}\n",
" for cs, id in zip(markers, ids):\n",
" id = int(id)\n",
" cs = cast(NDArray, cs)\n",
" ips_map[id] = cs\n",
" center = np.mean(cs, axis=0).astype(int)\n",
" GREY = (128, 128, 128)\n",
" # logger.info(\"id={}, center={}\", id, center)\n",
" cv2.circle(target, tuple(center), 5, GREY, -1)\n",
" cv2.putText(\n",
" target,\n",
" str(id),\n",
" tuple(center),\n",
" cv2.FONT_HERSHEY_SIMPLEX,\n",
" 1,\n",
" GREY,\n",
" 2,\n",
" )\n",
" # BGR\n",
" RED = (0, 0, 255)\n",
" GREEN = (0, 255, 0)\n",
" BLUE = (255, 0, 0)\n",
" YELLOW = (0, 255, 255)\n",
" color_map = [RED, GREEN, BLUE, YELLOW]\n",
" for color, corners in zip(color_map, cs):\n",
" corners = corners.astype(int)\n",
" target = cv2.circle(target, corners, 5, color, -1)\n",
" # https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e\n",
" ops: NDArray = np.empty((0, 3), dtype=np.float32)\n",
" ips: NDArray = np.empty((0, 2), dtype=np.float32)\n",
" for id, ip in ips_map.items():\n",
" try:\n",
" op = ops_map[id]\n",
" assert ip.shape == (4, 2), f\"corners.shape={ip.shape}\"\n",
" assert op.shape == (4, 3), f\"op.shape={op.shape}\"\n",
" ops = np.concatenate((ops, op), axis=0)\n",
" ips = np.concatenate((ips, ip), axis=0)\n",
" except KeyError:\n",
" logger.warning(\"No object points for id={}\", id)\n",
" continue\n",
" assert len(ops) == len(ips), f\"len(ops)={len(ops)} != len(ips)={len(ips)}\"\n",
" if len(ops) > 0:\n",
" # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html\n",
" # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags\n",
" ret, rvec, tvec = cv2.solvePnP(\n",
" objectPoints=ops,\n",
" imagePoints=ips,\n",
" cameraMatrix=cam_mtx,\n",
" distCoeffs=dist_coeffs,\n",
" flags=cv2.SOLVEPNP_SQPNP,\n",
" )\n",
" # ret, rvec, tvec, inliners = cv2.solvePnPRansac(\n",
" # objectPoints=ops,\n",
" # imagePoints=ips,\n",
" # cameraMatrix=camera_matrix,\n",
" # distCoeffs=distortion_coefficients,\n",
" # flags=cv2.SOLVEPNP_SQPNP,\n",
" # )\n",
" if ret:\n",
" cv2.drawFrameAxes(\n",
" target,\n",
" cam_mtx,\n",
" dist_coeffs,\n",
" rvec,\n",
" tvec,\n",
" MARKER_LENGTH,\n",
" )\n",
" ret_rvec = rvec\n",
" ret_tvec = tvec\n",
" return target, ret_rvec, ret_tvec"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"A_IMG = Path(\"dumped/batch_three/video-20241224-154256-a.png\")\n",
"B_IMG = Path(\"dumped/batch_three/video-20241224-154302-b.png\")\n",
"C_IMG = Path(\"dumped/batch_three/video-20241224-154252-c.png\")\n",
"C_PRIME_IMG = Path(\"dumped/batch_three/video-20241224-153926-c-prime.png\")\n",
"\n",
"a_img = cv2.imread(str(A_IMG))\n",
"b_img = cv2.imread(str(B_IMG))\n",
"c_img = cv2.imread(str(C_IMG))\n",
"c_prime_img = cv2.imread(str(C_PRIME_IMG))\n",
"\n",
"a_mtx, a_dist = read_camera_calibration(A_CALIBRATION_PARQUET)\n",
"b_mtx, b_dist = read_camera_calibration(B_CALIBRATION_PARQUET)\n",
"c_mtx, c_dist = read_camera_calibration(C_CALIBRATION_PARQUET)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_result_img, a_rvec, a_tvec = process(a_img, a_mtx, a_dist)\n",
"# plt.imshow(cv2.cvtColor(a_result_img, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"b_result_img, b_rvec, b_tvec = process(b_img, b_mtx, b_dist)\n",
"# plt.imshow(cv2.cvtColor(b_result_img, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c_result_img, c_rvec, c_tvec = process(c_img, c_mtx, c_dist)\n",
"c_prime_result_img, c_prime_rvec, c_prime_tvec = process(c_prime_img, c_mtx, c_dist)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"params = AwkwardArray(\n",
" [\n",
" {\n",
" \"name\": \"a-ae_08\",\n",
" \"rvec\": a_rvec,\n",
" \"tvec\": a_tvec,\n",
" \"camera_matrix\": a_mtx,\n",
" \"distortion_coefficients\": a_dist,\n",
" },\n",
" {\n",
" \"name\": \"b-ae_09\",\n",
" \"rvec\": b_rvec,\n",
" \"tvec\": b_tvec,\n",
" \"camera_matrix\": b_mtx,\n",
" \"distortion_coefficients\": b_dist,\n",
" },\n",
" {\n",
" \"name\": \"c-af_03\",\n",
" \"rvec\": c_rvec,\n",
" \"tvec\": c_tvec,\n",
" \"camera_matrix\": c_mtx,\n",
" \"distortion_coefficients\": c_dist\n",
" },\n",
" {\n",
" \"name\": \"c-prime-af_03\",\n",
" \"rvec\": c_prime_rvec,\n",
" \"tvec\": c_prime_tvec,\n",
" \"camera_matrix\": c_mtx,\n",
" \"distortion_coefficients\": c_dist\n",
" }\n",
" ]\n",
")\n",
"display(\"params\", params)\n",
"ak.to_parquet(params, Path(\"output\") / \"params.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cv2.imwrite(\"output/a_result_img.png\", a_result_img)\n",
"cv2.imwrite(\"output/b_result_img.png\", b_result_img)\n",
"cv2.imwrite(\"output/c_result_img.png\", c_result_img)\n",
"cv2.imwrite(\"output/c_prime_result_img.png\", c_prime_result_img)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

132
run_capture.py Normal file
View File

@ -0,0 +1,132 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
import signal
from subprocess import Popen, TimeoutExpired
from typing import Any, Literal
from loguru import logger
import click
import loguru
# pacman -S python-loguru
# pacman -S python-click
Mode = Literal["preview", "save", "save_preview"]
MODE_LIST: list[Mode] = ["preview", "save", "save_preview"]
MULTICAST_ADDR = "224.0.0.123"
class DumpCommand:
port: int
output_path: str
def __init__(self, port: int, output_path: PathLike | str):
self.port = port
self.output_path = str(output_path)
def save_and_decode_nv_pipeline(self):
# note that capabilties SHOULD NOT have spaces in between
# `gst-launch-1.0` could tolerate that, but not the API itself
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! tee name=t \
t. ! queue ! nvh265dec ! videoconvert ! autovideosink \
t. ! queue ! mp4mux ! filesink location={self.output_path}
"""
def save_and_decode_nv_pipeline_multicast(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
auto-multicast=true \
multicast-group={MULTICAST_ADDR} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! tee name=t \
t. ! queue ! vtdec_hw ! videoconvert ! autovideosink \
t. ! queue ! mp4mux ! filesink location={self.output_path}
"""
# `vtdec_hw` for macos
# `nvh265dec` for nv
def save_pipeline(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp, encoding-name=H265, payload=96' \
! rtph265depay \
! queue ! h265parse ! mp4mux ! filesink location={self.output_path}
"""
def decode_cv_only(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! nvh265dec \
! videoconvert \
! autovideosink
"""
def get_pipeline_from_mode(self, mode: Mode):
if mode == "save":
return self.save_pipeline()
elif mode == "save_preview":
return self.save_and_decode_nv_pipeline_multicast()
elif mode == "preview":
return self.decode_cv_only()
raise ValueError(f"Unknown mode: {mode}")
def test_filename(
port: int,
output_dir: PathLike | str,
date: datetime,
prefix="video_",
suffix=".mp4",
):
date_str = date.strftime("%Y-%m-%d_%H-%M-%S")
assert suffix.startswith("."), "suffix should start with a dot"
file_name = f"{prefix}{date_str}_{port}{suffix}"
return Path(output_dir) / file_name
# nmap -sS --open -p 22 192.168.2.0/24
@click.command()
@click.option("-o", "--output", type=click.Path(exists=True), default="output")
@click.option("-m", "--mode", type=click.Choice(MODE_LIST), default="save_preview")
def main(output: str, mode: Mode):
ports = [5601, 5602, 5603, 5604, 5605, 5606]
output_dir = Path(output)
now = datetime.now()
commands = [
DumpCommand(port, test_filename(port, output_dir, now)) for port in ports
]
ps: list[Popen] = []
run_flag: bool = True
def handle_sigint(signum: int, frame: Any):
nonlocal run_flag
run_flag = False
logger.info("Received SIGINT, stopping all processes")
for command in commands:
p = Popen(command.get_pipeline_from_mode(mode), shell=True)
ps.append(p)
signal.signal(signal.SIGINT, handle_sigint)
while run_flag:
pass
for p in ps:
p.send_signal(signal.SIGINT)
for p in ps:
try:
p.wait(3)
except TimeoutExpired:
logger.warning("Command `{}` timeout", p.args)
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter

BIN
sample/standard_box.glb LFS Normal file

Binary file not shown.