Compare commits

..

10 Commits

28 changed files with 3907 additions and 1237 deletions

5
.gitattributes vendored
View File

@ -1,5 +1,2 @@
*.parquet filter=lfs diff=lfs merge=lfs -text *.parquet filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_24.pdf filter=lfs diff=lfs merge=lfs -text *.pdf filter=lfs diff=lfs merge=lfs -text
charuco_1189x841_10x7_s115_m90.pdf filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_16.pdf filter=lfs diff=lfs merge=lfs -text
charuco_400x400_3x3_s130_m100_no_20.pdf filter=lfs diff=lfs merge=lfs -text

2
.gitignore vendored
View File

@ -166,3 +166,5 @@ cython_debug/
#.idea/ #.idea/
.DS_Store .DS_Store
output/svg output/svg
*.mp4
output

1600
boom.ipynb

File diff suppressed because it is too large Load Diff

View File

@ -9,9 +9,6 @@ from itertools import chain
from typing import Optional, Sequence, TypedDict, cast from typing import Optional, Sequence, TypedDict, cast
import awkward as ak import awkward as ak
from matplotlib.pyplot import stem
from numpy import ndarray
class ArucoDictionary(Enum): class ArucoDictionary(Enum):
Dict_4X4_50 = aruco.DICT_4X4_50 Dict_4X4_50 = aruco.DICT_4X4_50
@ -37,10 +34,10 @@ class ArucoDictionary(Enum):
Dict_ArUco_ORIGINAL = aruco.DICT_ARUCO_ORIGINAL Dict_ArUco_ORIGINAL = aruco.DICT_ARUCO_ORIGINAL
IMAGE_FOLDER = Path("dumped/usbcam") IMAGE_FOLDER = Path("dumped/batch_three/c")
OUTPUT_FOLDER = Path("output") OUTPUT_FOLDER = Path("output")
DICTIONARY = ArucoDictionary.Dict_4X4_50 DICTIONARY = ArucoDictionary.Dict_4X4_50
CALIBRATION_PARQUET: Optional[Path] = OUTPUT_FOLDER / "usbcam_cal.parquet" CALIBRATION_PARQUET: Optional[Path] = OUTPUT_FOLDER / "c-af_03.parquet"
class CameraParams(TypedDict): class CameraParams(TypedDict):
@ -140,7 +137,7 @@ def main():
"rotation_vectors": rvecs, "rotation_vectors": rvecs,
"translation_vectors": tvecs, "translation_vectors": tvecs,
} }
ak.to_parquet([parameters], OUTPUT_FOLDER / "calibration.parquet") ak.to_parquet([parameters], CALIBRATION_PARQUET)
else: else:
logger.warning( logger.warning(
"no calibration data calculated; either no images or already calibrated" "no calibration data calculated; either no images or already calibrated"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

157
compute_3d_maybe.ipynb Normal file
View File

@ -0,0 +1,157 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"import awkward as ak\n",
"from awkward import Array as AwakwardArray, Record as AwkwardRecord\n",
"from typing import cast\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre>[{prediction: None, trackings: [], frame_num: 0, ...},\n",
" {prediction: None, trackings: [], frame_num: 1, ...},\n",
" {prediction: None, trackings: [], frame_num: 2, ...},\n",
" {prediction: None, trackings: [], frame_num: 3, ...},\n",
" {prediction: None, trackings: [], frame_num: 4, ...},\n",
" {prediction: None, trackings: [], frame_num: 5, ...},\n",
" {prediction: None, trackings: [], frame_num: 6, ...},\n",
" {prediction: None, trackings: [], frame_num: 7, ...},\n",
" {prediction: None, trackings: [], frame_num: 8, ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" ...,\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...},\n",
" {prediction: {Akeypoints: [[...]], ...}, trackings: [{...}], ...}]\n",
"-------------------------------------------------------------------\n",
"type: 808 * {\n",
" prediction: ?{\n",
" Akeypoints: var * var * var * float64,\n",
" bboxes: var * var * float64,\n",
" scores: var * var * var * float64,\n",
" frame_number: int64,\n",
" reference_frame_size: {\n",
" &quot;0&quot;: int64,\n",
" &quot;1&quot;: int64\n",
" }\n",
" },\n",
" trackings: var * {\n",
" id: int64,\n",
" bounding_boxes: var * var * float64\n",
" },\n",
" frame_num: int64,\n",
" reference_frame_size: {\n",
" height: int64,\n",
" width: int64\n",
" }\n",
"}</pre>"
],
"text/plain": [
"<Array [{prediction: None, ...}, ..., {...}] type='808 * {prediction: ?{Ake...'>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"a_ak = ak.from_parquet(\"pose/a.parquet\")\n",
"b_ak = ak.from_parquet(\"pose/b.parquet\")\n",
"# display(a_ak)\n",
"display(b_ak)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre>{Akeypoints: [[[893, 417], [898, 408], [...], ..., [782, 596], [785, 599]]],\n",
" bboxes: [[756, 341, 940, 597]],\n",
" scores: [[[0.907], [0.896], [0.916], [0.341], ..., [0.811], [0.835], [0.802]]],\n",
" frame_number: 5,\n",
" reference_frame_size: {&#x27;0&#x27;: 1080, &#x27;1&#x27;: 1920}}\n",
"--------------------------------------------------------------------------------\n",
"type: {\n",
" Akeypoints: var * var * var * float64,\n",
" bboxes: var * var * float64,\n",
" scores: var * var * var * float64,\n",
" frame_number: int64,\n",
" reference_frame_size: {\n",
" &quot;0&quot;: int64,\n",
" &quot;1&quot;: int64\n",
" }\n",
"}</pre>"
],
"text/plain": [
"<Record {Akeypoints: [[...]], bboxes: ..., ...} type='{Akeypoints: var * va...'>"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"a_ak[\"prediction\"][5]"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"unique_tracking_ids_a = np.unique(ak.to_numpy(ak.flatten(cast(AwakwardArray, a_ak[\"trackings\"][\"id\"]))))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

14
cvt_all_pdfs.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
# Set input folder
INPUT_DIR="board"
DPI=100 # Resolution in DPI
# Iterate over all .pdf files in the folder
for pdf in "$INPUT_DIR"/*.pdf; do
# Strip .pdf to get base name
base="${pdf%.pdf}"
# Convert to PNG
magick convert -density "$DPI" "$pdf" "${base}.png"
echo "Converted: $pdf${base}.png"
done

141
draw.ipynb Normal file

File diff suppressed because one or more lines are too long

58
dump_and_play.sh Executable file
View File

@ -0,0 +1,58 @@
#!/bin/bash
if [ -z $1 ]; then
echo "Usage: $0 <port>"
exit 1
else
echo "dumping video from port $1"
fi
TARGET_PORT=$1;
if ! [[ $TARGET_PORT =~ ^[0-9]+$ ]] ; then
echo "error: expect a number, got $TARGET_PORT" >&2
exit 1
fi
# See also majestic.yaml
# Get the current date and time in the format YYYYMMDD-HHMMSS
DATE=$(date +"%Y%m%d-%H%M%S")
# use mts as MPEG transport stream
FILENAME="output/video-${DATE}-${TARGET_PORT}.mts"
# SINK="autovideosink"
SINK="glimagesink"
# Run the GStreamer pipeline with the dynamic filename
# gst-launch-1.0 -e udpsrc port=$TARGET_PORT \
# ! 'application/x-rtp,encoding-name=H265,payload=96' \
# ! rtph265depay \
# ! h265parse \
# ! tee name=t \
# t. ! queue ! $DECODER ! videoconvert ! $SINK \
# t. ! queue ! mp4mux ! filesink location=$FILENAME
# DECODER="nvh265dec"
# DECODER="vulkanh265dec"
# DECODER="avdec_h265"
DECODER="vtdec_hw"
# DECODER="vtdec"
# gst-launch-1.0 -e udpsrc port=$TARGET_PORT auto-multicast=true multicast-group=224.0.0.123 \
# ! 'application/x-rtp,encoding-name=H265,payload=96' \
# ! rtph265depay \
# ! tee name=t \
# ! h265parse \
# t. ! queue ! $DECODER ! videoconvert ! $SINK \
# t. ! queue ! mpegtsmux ! filesink location=$FILENAME
# hvc1
# hev1
gst-launch-1.0 -e udpsrc port=$TARGET_PORT auto-multicast=true multicast-group=224.0.0.123 \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! tee name=t \
t. ! queue ! h265parse ! "video/x-h265,stream-format=hvc1" ! $DECODER ! videoconvert ! $SINK \
t. ! queue ! h265parse ! mpegtsmux ! filesink location=$FILENAME

View File

@ -10,9 +10,14 @@ import numpy as np
NDArray = np.ndarray NDArray = np.ndarray
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet" CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
DICTIONARY: Final[int] = aruco.DICT_4X4_50 # 7x7
DICTIONARY: Final[int] = aruco.DICT_7X7_1000
# 400mm # 400mm
MARKER_LENGTH: Final[float] = 0.4 MARKER_LENGTH: Final[float] = 0.4
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
def gen(): def gen():
@ -47,23 +52,18 @@ def main():
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape) # logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
for m, i in zip(markers, ids): for m, i in zip(markers, ids):
center = np.mean(m, axis=0).astype(int) center = np.mean(m, axis=0).astype(int)
GREY = (128, 128, 128)
logger.info("id={}, center={}", i, center) logger.info("id={}, center={}", i, center)
cv2.circle(frame, tuple(center), 5, GREY, -1) cv2.circle(frame, tuple(center), 5, RED, -1)
cv2.putText( cv2.putText(
frame, frame,
str(i), str(i),
tuple(center), tuple(center),
cv2.FONT_HERSHEY_SIMPLEX, cv2.FONT_HERSHEY_SIMPLEX,
1, 1,
GREY, RED,
2, 2,
) )
# BGR # BGR
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
color_map = [RED, GREEN, BLUE, YELLOW] color_map = [RED, GREEN, BLUE, YELLOW]
for color, corners in zip(color_map, m): for color, corners in zip(color_map, m):
corners = corners.astype(int) corners = corners.astype(int)

File diff suppressed because one or more lines are too long

150
find_cute_object.py Normal file
View File

@ -0,0 +1,150 @@
import cv2
from cv2 import aruco
from datetime import datetime
from loguru import logger
from pathlib import Path
from typing import Optional, cast, Final
import awkward as ak
from cv2.typing import MatLike
import numpy as np
NDArray = np.ndarray
CALIBRATION_PARQUET = Path("output") / "usbcam_cal.parquet"
OBJECT_POINTS_PARQUET = Path("output") / "object_points.parquet"
DICTIONARY: Final[int] = aruco.DICT_4X4_50
# 400mm
MARKER_LENGTH: Final[float] = 0.4
def gen():
API = cv2.CAP_AVFOUNDATION
cap = cv2.VideoCapture(0, API)
while True:
ret, frame = cap.read()
if not ret:
logger.warning("Failed to grab frame")
break
yield frame
def main():
aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)
cal = ak.from_parquet(CALIBRATION_PARQUET)[0]
camera_matrix = cast(MatLike, ak.to_numpy(cal["camera_matrix"]))
distortion_coefficients = cast(MatLike, ak.to_numpy(cal["distortion_coefficients"]))
ops = ak.from_parquet(OBJECT_POINTS_PARQUET)
detector = aruco.ArucoDetector(
dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()
)
total_ids = cast(NDArray, ak.to_numpy(ops["ids"])).flatten()
total_corners = cast(NDArray, ak.to_numpy(ops["corners"])).reshape(-1, 4, 3)
ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))
logger.info("ops_map={}", ops_map)
writer: Optional[cv2.VideoWriter] = None
for frame in gen():
grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# pylint: disable-next=unpacking-non-sequence
markers, ids, rejected = detector.detectMarkers(grey)
# `markers` is [N, 1, 4, 2]
# `ids` is [N, 1]
if ids is not None:
markers = np.reshape(markers, (-1, 4, 2))
ids = np.reshape(ids, (-1, 1))
# logger.info("markers={}, ids={}", np.array(markers).shape, np.array(ids).shape)
ips_map: dict[int, NDArray] = {}
for cs, id in zip(markers, ids):
id = int(id)
cs = cast(NDArray, cs)
ips_map[id] = cs
center = np.mean(cs, axis=0).astype(int)
GREY = (128, 128, 128)
# logger.info("id={}, center={}", id, center)
cv2.circle(frame, tuple(center), 5, GREY, -1)
cv2.putText(
frame,
str(id),
tuple(center),
cv2.FONT_HERSHEY_SIMPLEX,
1,
GREY,
2,
)
# BGR
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)
color_map = [RED, GREEN, BLUE, YELLOW]
for color, corners in zip(color_map, cs):
corners = corners.astype(int)
frame = cv2.circle(frame, corners, 5, color, -1)
# https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e
ops: NDArray = np.empty((0, 3), dtype=np.float32)
ips: NDArray = np.empty((0, 2), dtype=np.float32)
for id, ip in ips_map.items():
try:
op = ops_map[id]
assert ip.shape == (4, 2), f"corners.shape={ip.shape}"
assert op.shape == (4, 3), f"op.shape={op.shape}"
ops = np.concatenate((ops, op), axis=0)
ips = np.concatenate((ips, ip), axis=0)
except KeyError:
logger.warning("No object points for id={}", id)
continue
assert len(ops) == len(ips), f"len(ops)={len(ops)} != len(ips)={len(ips)}"
if len(ops) > 0:
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
# https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags
ret, rvec, tvec= cv2.solvePnP(
objectPoints=ops,
imagePoints=ips,
cameraMatrix=camera_matrix,
distCoeffs=distortion_coefficients,
flags=cv2.SOLVEPNP_SQPNP,
)
# ret, rvec, tvec, inliners = cv2.solvePnPRansac(
# objectPoints=ops,
# imagePoints=ips,
# cameraMatrix=camera_matrix,
# distCoeffs=distortion_coefficients,
# flags=cv2.SOLVEPNP_SQPNP,
# )
if ret:
cv2.drawFrameAxes(
frame,
camera_matrix,
distortion_coefficients,
rvec,
tvec,
MARKER_LENGTH,
)
else:
logger.warning("Failed to solvePnPRansac")
cv2.imshow("frame", frame)
if writer is not None:
writer.write(frame)
if (k := cv2.waitKey(1)) == ord("q"):
logger.info("Exiting")
break
elif k == ord("s"):
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.png"
logger.info("Saving to {}", file_name)
cv2.imwrite(file_name, frame)
elif k == ord("r"):
if writer is not None:
writer.release()
writer = None
logger.info("Recording stopped")
else:
now = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"aruco_{now}.mp4"
logger.info("Recording to {}", file_name)
fourcc = cv2.VideoWriter.fourcc(*"mp4v")
writer = cv2.VideoWriter(file_name, fourcc, 20.0, frame.shape[:2][::-1])
if __name__ == "__main__":
main()

375
get_ext.ipynb Normal file
View File

@ -0,0 +1,375 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"import cv2\n",
"from cv2 import aruco\n",
"from datetime import datetime\n",
"from loguru import logger\n",
"from pathlib import Path\n",
"from typing import Optional, cast, Final\n",
"import awkward as ak\n",
"from cv2.typing import MatLike\n",
"import numpy as np\n",
"from matplotlib import pyplot as plt\n",
"import awkward as ak\n",
"from awkward import Record as AwkwardRecord, Array as AwkwardArray"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"NDArray = np.ndarray\n",
"OBJECT_POINTS_PARQUET = Path(\"output\") / \"object_points.parquet\"\n",
"DICTIONARY: Final[int] = aruco.DICT_4X4_50\n",
"# 400mm\n",
"MARKER_LENGTH: Final[float] = 0.4\n",
"\n",
"A_CALIBRATION_PARQUET = Path(\"output\") / \"a-ae_08.parquet\"\n",
"B_CALIBRATION_PARQUET = Path(\"output\") / \"b-ae_09.parquet\"\n",
"C_CALIBRATION_PARQUET = Path(\"output\") / \"c-af_03.parquet\""
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"aruco_dict = aruco.getPredefinedDictionary(DICTIONARY)\n",
"def read_camera_calibration(path: Path) -> tuple[MatLike, MatLike]:\n",
" cal = ak.from_parquet(path)[0]\n",
" camera_matrix = cast(MatLike, ak.to_numpy(cal[\"camera_matrix\"]))\n",
" distortion_coefficients = cast(MatLike, ak.to_numpy(cal[\"distortion_coefficients\"]))\n",
" return camera_matrix, distortion_coefficients\n",
"\n",
"ops = ak.from_parquet(OBJECT_POINTS_PARQUET)\n",
"detector = aruco.ArucoDetector(\n",
" dictionary=aruco_dict, detectorParams=aruco.DetectorParameters()\n",
")\n",
"\n",
"total_ids = cast(NDArray, ak.to_numpy(ops[\"ids\"])).flatten()\n",
"total_corners = cast(NDArray, ak.to_numpy(ops[\"corners\"])).reshape(-1, 4, 3)\n",
"ops_map: dict[int, NDArray] = dict(zip(total_ids, total_corners))\n",
"# display(\"ops_map\", ops_map)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"def process(\n",
" frame: MatLike,\n",
" cam_mtx: MatLike,\n",
" dist_coeffs: MatLike,\n",
" target: Optional[MatLike] = None,\n",
") -> tuple[MatLike, Optional[MatLike], Optional[MatLike]]:\n",
" if target is None:\n",
" target = frame.copy()\n",
" grey = cv2.cvtColor(target, cv2.COLOR_BGR2GRAY)\n",
" # pylint: disable-next=unpacking-non-sequence\n",
" markers, ids, rejected = detector.detectMarkers(grey)\n",
" # `markers` is [N, 1, 4, 2]\n",
" # `ids` is [N, 1]\n",
" ret_rvec: Optional[MatLike] = None\n",
" ret_tvec: Optional[MatLike] = None\n",
" if ids is not None:\n",
" markers = np.reshape(markers, (-1, 4, 2))\n",
" ids = np.reshape(ids, (-1, 1))\n",
" # logger.info(\"markers={}, ids={}\", np.array(markers).shape, np.array(ids).shape)\n",
" ips_map: dict[int, NDArray] = {}\n",
" for cs, id in zip(markers, ids):\n",
" id = int(id)\n",
" cs = cast(NDArray, cs)\n",
" ips_map[id] = cs\n",
" center = np.mean(cs, axis=0).astype(int)\n",
" GREY = (128, 128, 128)\n",
" # logger.info(\"id={}, center={}\", id, center)\n",
" cv2.circle(target, tuple(center), 5, GREY, -1)\n",
" cv2.putText(\n",
" target,\n",
" str(id),\n",
" tuple(center),\n",
" cv2.FONT_HERSHEY_SIMPLEX,\n",
" 1,\n",
" GREY,\n",
" 2,\n",
" )\n",
" # BGR\n",
" RED = (0, 0, 255)\n",
" GREEN = (0, 255, 0)\n",
" BLUE = (255, 0, 0)\n",
" YELLOW = (0, 255, 255)\n",
" color_map = [RED, GREEN, BLUE, YELLOW]\n",
" for color, corners in zip(color_map, cs):\n",
" corners = corners.astype(int)\n",
" target = cv2.circle(target, corners, 5, color, -1)\n",
" # https://docs.opencv.org/4.x/d9/d0c/group__calib3d.html#ga50620f0e26e02caa2e9adc07b5fbf24e\n",
" ops: NDArray = np.empty((0, 3), dtype=np.float32)\n",
" ips: NDArray = np.empty((0, 2), dtype=np.float32)\n",
" for id, ip in ips_map.items():\n",
" try:\n",
" op = ops_map[id]\n",
" assert ip.shape == (4, 2), f\"corners.shape={ip.shape}\"\n",
" assert op.shape == (4, 3), f\"op.shape={op.shape}\"\n",
" ops = np.concatenate((ops, op), axis=0)\n",
" ips = np.concatenate((ips, ip), axis=0)\n",
" except KeyError:\n",
" logger.warning(\"No object points for id={}\", id)\n",
" continue\n",
" assert len(ops) == len(ips), f\"len(ops)={len(ops)} != len(ips)={len(ips)}\"\n",
" if len(ops) > 0:\n",
" # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html\n",
" # https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html#calib3d_solvePnP_flags\n",
" ret, rvec, tvec = cv2.solvePnP(\n",
" objectPoints=ops,\n",
" imagePoints=ips,\n",
" cameraMatrix=cam_mtx,\n",
" distCoeffs=dist_coeffs,\n",
" flags=cv2.SOLVEPNP_SQPNP,\n",
" )\n",
" # ret, rvec, tvec, inliners = cv2.solvePnPRansac(\n",
" # objectPoints=ops,\n",
" # imagePoints=ips,\n",
" # cameraMatrix=camera_matrix,\n",
" # distCoeffs=distortion_coefficients,\n",
" # flags=cv2.SOLVEPNP_SQPNP,\n",
" # )\n",
" if ret:\n",
" cv2.drawFrameAxes(\n",
" target,\n",
" cam_mtx,\n",
" dist_coeffs,\n",
" rvec,\n",
" tvec,\n",
" MARKER_LENGTH,\n",
" )\n",
" ret_rvec = rvec\n",
" ret_tvec = tvec\n",
" return target, ret_rvec, ret_tvec"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"A_IMG = Path(\"dumped/batch_three/video-20241224-154256-a.png\")\n",
"B_IMG = Path(\"dumped/batch_three/video-20241224-154302-b.png\")\n",
"C_IMG = Path(\"dumped/batch_three/video-20241224-154252-c.png\")\n",
"C_PRIME_IMG = Path(\"dumped/batch_three/video-20241224-153926-c-prime.png\")\n",
"\n",
"a_img = cv2.imread(str(A_IMG))\n",
"b_img = cv2.imread(str(B_IMG))\n",
"c_img = cv2.imread(str(C_IMG))\n",
"c_prime_img = cv2.imread(str(C_PRIME_IMG))\n",
"\n",
"a_mtx, a_dist = read_camera_calibration(A_CALIBRATION_PARQUET)\n",
"b_mtx, b_dist = read_camera_calibration(B_CALIBRATION_PARQUET)\n",
"c_mtx, c_dist = read_camera_calibration(C_CALIBRATION_PARQUET)"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/cj/0zmvpygn7m72m42lh6x_hcgw0000gn/T/ipykernel_79393/542219436.py:22: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)\n",
" id = int(id)\n"
]
}
],
"source": [
"a_result_img, a_rvec, a_tvec = process(a_img, a_mtx, a_dist)\n",
"# plt.imshow(cv2.cvtColor(a_result_img, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/cj/0zmvpygn7m72m42lh6x_hcgw0000gn/T/ipykernel_79393/542219436.py:22: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)\n",
" id = int(id)\n"
]
}
],
"source": [
"b_result_img, b_rvec, b_tvec = process(b_img, b_mtx, b_dist)\n",
"# plt.imshow(cv2.cvtColor(b_result_img, cv2.COLOR_BGR2RGB))"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/cj/0zmvpygn7m72m42lh6x_hcgw0000gn/T/ipykernel_79393/542219436.py:22: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)\n",
" id = int(id)\n"
]
}
],
"source": [
"c_result_img, c_rvec, c_tvec = process(c_img, c_mtx, c_dist)\n",
"c_prime_result_img, c_prime_rvec, c_prime_tvec = process(c_prime_img, c_mtx, c_dist)"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'params'"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre>[{name: &#x27;a-ae_08&#x27;, rvec: [[-0.602], ..., [-3.05]], tvec: [...], ...},\n",
" {name: &#x27;b-ae_09&#x27;, rvec: [[0.572], ..., [3.02]], tvec: [...], ...},\n",
" {name: &#x27;c-af_03&#x27;, rvec: [[-1.98], ..., [-2.4]], tvec: [...], ...},\n",
" {name: &#x27;c-prime-af_03&#x27;, rvec: [[-1.99], ...], tvec: [...], ...}]\n",
"---------------------------------------------------------------------\n",
"type: 4 * {\n",
" name: string,\n",
" rvec: var * var * float64,\n",
" tvec: var * var * float64,\n",
" camera_matrix: var * var * float64,\n",
" distortion_coefficients: var * var * float64\n",
"}</pre>"
],
"text/plain": [
"<Array [{name: 'a-ae_08', rvec: ..., ...}, ...] type='4 * {name: string, rv...'>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"<pyarrow._parquet.FileMetaData object at 0x311da8900>\n",
" created_by: parquet-cpp-arrow version 14.0.1\n",
" num_columns: 5\n",
" num_rows: 4\n",
" num_row_groups: 1\n",
" format_version: 2.6\n",
" serialized_size: 0"
]
},
"execution_count": 46,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"params = AwkwardArray(\n",
" [\n",
" {\n",
" \"name\": \"a-ae_08\",\n",
" \"rvec\": a_rvec,\n",
" \"tvec\": a_tvec,\n",
" \"camera_matrix\": a_mtx,\n",
" \"distortion_coefficients\": a_dist,\n",
" },\n",
" {\n",
" \"name\": \"b-ae_09\",\n",
" \"rvec\": b_rvec,\n",
" \"tvec\": b_tvec,\n",
" \"camera_matrix\": b_mtx,\n",
" \"distortion_coefficients\": b_dist,\n",
" },\n",
" {\n",
" \"name\": \"c-af_03\",\n",
" \"rvec\": c_rvec,\n",
" \"tvec\": c_tvec,\n",
" \"camera_matrix\": c_mtx,\n",
" \"distortion_coefficients\": c_dist\n",
" },\n",
" {\n",
" \"name\": \"c-prime-af_03\",\n",
" \"rvec\": c_prime_rvec,\n",
" \"tvec\": c_prime_tvec,\n",
" \"camera_matrix\": c_mtx,\n",
" \"distortion_coefficients\": c_dist\n",
" }\n",
" ]\n",
")\n",
"display(\"params\", params)\n",
"ak.to_parquet(params, Path(\"output\") / \"params.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 47,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"cv2.imwrite(\"output/a_result_img.png\", a_result_img)\n",
"cv2.imwrite(\"output/b_result_img.png\", b_result_img)\n",
"cv2.imwrite(\"output/c_result_img.png\", c_result_img)\n",
"cv2.imwrite(\"output/c_prime_result_img.png\", c_prime_result_img)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

2221
new_try.ipynb Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

132
run_capture.py Normal file
View File

@ -0,0 +1,132 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
import signal
from subprocess import Popen, TimeoutExpired
from typing import Any, Literal
from loguru import logger
import click
import loguru
# pacman -S python-loguru
# pacman -S python-click
Mode = Literal["preview", "save", "save_preview"]
MODE_LIST: list[Mode] = ["preview", "save", "save_preview"]
MULTICAST_ADDR = "224.0.0.123"
class DumpCommand:
port: int
output_path: str
def __init__(self, port: int, output_path: PathLike | str):
self.port = port
self.output_path = str(output_path)
def save_and_decode_nv_pipeline(self):
# note that capabilties SHOULD NOT have spaces in between
# `gst-launch-1.0` could tolerate that, but not the API itself
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! tee name=t \
t. ! queue ! nvh265dec ! videoconvert ! autovideosink \
t. ! queue ! mp4mux ! filesink location={self.output_path}
"""
def save_and_decode_nv_pipeline_multicast(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
auto-multicast=true \
multicast-group={MULTICAST_ADDR} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! tee name=t \
t. ! queue ! vtdec_hw ! videoconvert ! autovideosink \
t. ! queue ! mp4mux ! filesink location={self.output_path}
"""
# `vtdec_hw` for macos
# `nvh265dec` for nv
def save_pipeline(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp, encoding-name=H265, payload=96' \
! rtph265depay \
! queue ! h265parse ! mp4mux ! filesink location={self.output_path}
"""
def decode_cv_only(self):
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
! 'application/x-rtp,encoding-name=H265,payload=96' \
! rtph265depay \
! h265parse \
! nvh265dec \
! videoconvert \
! autovideosink
"""
def get_pipeline_from_mode(self, mode: Mode):
if mode == "save":
return self.save_pipeline()
elif mode == "save_preview":
return self.save_and_decode_nv_pipeline_multicast()
elif mode == "preview":
return self.decode_cv_only()
raise ValueError(f"Unknown mode: {mode}")
def test_filename(
port: int,
output_dir: PathLike | str,
date: datetime,
prefix="video_",
suffix=".mp4",
):
date_str = date.strftime("%Y-%m-%d_%H-%M-%S")
assert suffix.startswith("."), "suffix should start with a dot"
file_name = f"{prefix}{date_str}_{port}{suffix}"
return Path(output_dir) / file_name
# nmap -sS --open -p 22 192.168.2.0/24
@click.command()
@click.option("-o", "--output", type=click.Path(exists=True), default="output")
@click.option("-m", "--mode", type=click.Choice(MODE_LIST), default="save_preview")
def main(output: str, mode: Mode):
ports = [5601, 5602, 5603, 5604, 5605, 5606]
output_dir = Path(output)
now = datetime.now()
commands = [
DumpCommand(port, test_filename(port, output_dir, now)) for port in ports
]
ps: list[Popen] = []
run_flag: bool = True
def handle_sigint(signum: int, frame: Any):
nonlocal run_flag
run_flag = False
logger.info("Received SIGINT, stopping all processes")
for command in commands:
p = Popen(command.get_pipeline_from_mode(mode), shell=True)
ps.append(p)
signal.signal(signal.SIGINT, handle_sigint)
while run_flag:
pass
for p in ps:
p.send_signal(signal.SIGINT)
for p in ps:
try:
p.wait(3)
except TimeoutExpired:
logger.warning("Command `{}` timeout", p.args)
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter