Merge remote-tracking branch 'origin/trt' into jetson

This commit is contained in:
Isse
2024-12-19 15:16:16 +01:00
35 changed files with 5482 additions and 3285 deletions

View File

@ -27,6 +27,11 @@ Fast triangulation of multiple persons from multiple camera views.
./run_container.sh ./run_container.sh
``` ```
- Build triangulator:
```bash
cd /RapidPoseTriangulation/swig/ && make all && cd ../tests/ && python3 test_interface.py && cd ..
```
- Test with samples: - Test with samples:
```bash ```bash
python3 /RapidPoseTriangulation/scripts/test_triangulate.py python3 /RapidPoseTriangulation/scripts/test_triangulate.py
@ -37,11 +42,3 @@ Fast triangulation of multiple persons from multiple camera views.
export CUDA_VISIBLE_DEVICES=0 export CUDA_VISIBLE_DEVICES=0
python3 /RapidPoseTriangulation/scripts/test_skelda_dataset.py python3 /RapidPoseTriangulation/scripts/test_skelda_dataset.py
``` ```
<br>
## Debugging
```bash
cd /RapidPoseTriangulation/swig/ && make all && cd ../tests/ && python3 test_interface.py
```

1
data/.gitignore vendored
View File

@ -5,3 +5,4 @@
*.json *.json
!*/*.json !*/*.json
testoutput/ testoutput/
trt_cache/

View File

@ -1,4 +1,4 @@
FROM nvcr.io/nvidia/pytorch:23.02-py3 FROM nvcr.io/nvidia/tensorrt:24.10-py3
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
ENV LANG=C.UTF-8 ENV LANG=C.UTF-8
@ -15,31 +15,16 @@ RUN apt-get update && apt-get install -y --no-install-recommends python3-tk
# Update pip to allow installation of skelda in editable mode # Update pip to allow installation of skelda in editable mode
RUN pip3 install --upgrade --no-cache-dir pip RUN pip3 install --upgrade --no-cache-dir pip
# Install MMPose
ENV FORCE_CUDA="1"
ENV MMCV_WITH_OPS=1
RUN pip3 install --upgrade --no-cache-dir openmim
RUN mim install mmengine
RUN mim install "mmcv>=2,<2.2.0"
RUN mim install "mmdet>=3"
RUN mim install "mmpose>=1.1.0"
# Fix an error when importing mmpose
RUN pip3 install --upgrade --no-cache-dir numpy scipy
RUN git clone --depth=1 --branch=main https://github.com/open-mmlab/mmpose.git
# Download pretrained model
COPY scripts/utils_2d_pose.py /
RUN python3 -c "from utils_2d_pose import load_model; load_model();"
RUN python3 -c "from utils_2d_pose import load_wb_model; load_wb_model();"
# Fix an undefined symbol error with ompi
RUN echo "ldconfig" >> ~/.bashrc
# Install swig and later dependencies # Install swig and later dependencies
RUN apt-get update && apt-get install -y --no-install-recommends build-essential RUN apt-get update && apt-get install -y --no-install-recommends build-essential
RUN apt-get update && apt-get install -y --no-install-recommends swig RUN apt-get update && apt-get install -y --no-install-recommends swig
RUN apt-get update && apt-get install -y --no-install-recommends libopencv-dev RUN apt-get update && apt-get install -y --no-install-recommends libopencv-dev
# Install ONNX runtime
RUN pip3 install --upgrade --no-cache-dir onnxruntime-gpu
# Install skelda
RUN pip3 install --upgrade --no-cache-dir scipy
COPY ./skelda/ /skelda/ COPY ./skelda/ /skelda/
RUN pip3 install --no-cache-dir -e /skelda/ RUN pip3 install --no-cache-dir -e /skelda/

18
extras/easypose/README.md Normal file
View File

@ -0,0 +1,18 @@
# Test ONNX with EasyPose
Code files originally from: https://github.com/Dominic23331/EasyPose.git
<br>
```bash
docker build --progress=plain -f extras/easypose/dockerfile -t rpt_easypose .
./extras/easypose/run_container.sh
```
```bash
export CUDA_VISIBLE_DEVICES=0
python3 /RapidPoseTriangulation/scripts/test_triangulate.py
python3 /RapidPoseTriangulation/scripts/test_skelda_dataset.py
```

View File

@ -0,0 +1,65 @@
import warnings
from abc import ABC, abstractmethod
from typing import List
import time
import numpy as np
import onnxruntime as ort
from tqdm import tqdm
class BaseModel(ABC):
def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30):
self.opt = ort.SessionOptions()
if device == 'CUDA':
provider = 'CUDAExecutionProvider'
if provider not in ort.get_available_providers():
warnings.warn("No CUDAExecutionProvider found, switched to CPUExecutionProvider.", UserWarning)
provider = 'CPUExecutionProvider'
elif device == 'CPU':
provider = 'CPUExecutionProvider'
else:
raise ValueError('Provider {} does not exist.'.format(device))
self.session = ort.InferenceSession(model_path,
providers=[provider],
sess_options=self.opt)
self.input_name = self.session.get_inputs()[0].name
self.input_shape = self.session.get_inputs()[0].shape
input_type = self.session.get_inputs()[0].type
if input_type == 'tensor(float32)':
self.input_type = np.float32
elif input_type == 'tensor(float16)':
self.input_type = np.float16
elif input_type == 'tensor(uint8)':
self.input_type = np.uint8
else:
raise ValueError('Unknown input type: ', input_type)
if warmup > 0:
self.warmup(warmup)
@abstractmethod
def preprocess(self, image: np.ndarray):
pass
@abstractmethod
def postprocess(self, tensor: List[np.ndarray]):
pass
def forward(self, image: np.ndarray):
tensor = self.preprocess(image)
result = self.session.run(None, {self.input_name: tensor})
output = self.postprocess(result)
return output
def warmup(self, epoch: int = 30):
print('{} start warmup!'.format(self.__class__.__name__))
tensor = np.random.random(self.input_shape).astype(self.input_type)
for _ in tqdm(range(epoch)):
self.session.run(None, {self.input_name: tensor})
def __call__(self, image: np.ndarray, *args, **kwargs):
return self.forward(image)

View File

@ -0,0 +1,100 @@
import numpy as np
from typing import List
from .base_model import BaseModel
from .utils import letterbox, nms_optimized, xywh2xyxy
class RTMDet(BaseModel):
def __init__(self,
model_path: str,
conf_threshold: float,
iou_threshold: float,
device: str = 'CUDA',
warmup: int = 30):
super(RTMDet, self).__init__(model_path, device, warmup)
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.dx = 0
self.dy = 0
self.scale = 0
def preprocess(self, image: np.ndarray):
th, tw = self.input_shape[1:3]
image, self.dx, self.dy, self.scale = letterbox(
image, (tw, th), fill_value=114
)
tensor = np.asarray(image).astype(self.input_type, copy=False)[..., ::-1]
tensor = np.expand_dims(tensor, axis=0)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
boxes = np.squeeze(tensor[0], axis=0)
classes = np.expand_dims(np.squeeze(tensor[1], axis=0), axis=-1)
boxes = np.concatenate([boxes, classes], axis=-1)
boxes = nms_optimized(boxes, self.iou_threshold, self.conf_threshold)
if boxes.shape[0] == 0:
return boxes
human_class = boxes[..., -1] == 0
boxes = boxes[human_class][..., :4]
boxes[:, 0] -= self.dx
boxes[:, 2] -= self.dx
boxes[:, 1] -= self.dy
boxes[:, 3] -= self.dy
boxes = np.clip(boxes, a_min=0, a_max=None)
boxes[:, :4] /= self.scale
return boxes
class Yolov8(BaseModel):
def __init__(self,
model_path: str,
conf_threshold: float,
iou_threshold: float,
device: str = 'CUDA',
warmup: int = 30):
super(Yolov8, self).__init__(model_path, device, warmup)
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.dx = 0
self.dy = 0
self.scale = 0
def preprocess(self, image):
th, tw = self.input_shape[2:]
image, self.dx, self.dy, self.scale = letterbox(image, (tw, th))
tensor = image / 255.
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor):
feature_map = tensor[0]
feature_map = np.squeeze(feature_map, axis=0).transpose((1, 0))
pred_class = feature_map[..., 4:]
pred_conf = np.max(pred_class, axis=-1, keepdims=True)
pred_class = np.argmax(pred_class, axis=-1, keepdims=True)
boxes = np.concatenate([feature_map[..., :4], pred_conf, pred_class], axis=-1)
boxes = xywh2xyxy(boxes)
boxes = nms(boxes, self.iou_threshold, self.conf_threshold)
if boxes.shape[0] == 0:
return boxes
human_class = boxes[..., -1] == 0
boxes = boxes[human_class][..., :4]
boxes[:, 0] -= self.dx
boxes[:, 2] -= self.dx
boxes[:, 1] -= self.dy
boxes[:, 3] -= self.dy
boxes = np.clip(boxes, a_min=0, a_max=None)
boxes[:, :4] /= self.scale
return boxes

View File

@ -0,0 +1,10 @@
FROM rapidposetriangulation
WORKDIR /
RUN pip3 install --upgrade --no-cache-dir onnxruntime-gpu
RUN git clone https://github.com/Dominic23331/EasyPose.git --depth=1
RUN cd /EasyPose/; pip install -v -e .
WORKDIR /RapidPoseTriangulation/
CMD ["/bin/bash"]

362
extras/easypose/pipeline.py Normal file
View File

@ -0,0 +1,362 @@
import os
import cv2
import numpy as np
from easypose import model
from easypose.model import detection
from easypose.model import pose
from .download import get_url, get_model_path, download
from .consts import AvailablePoseModels, AvailableDetModels
from .common import Person, region_of_interest, restore_keypoints
def get_pose_model(pose_model_path, pose_model_decoder, device, warmup):
if pose_model_decoder == 'Dark':
pose_model = pose.Heatmap(pose_model_path, dark=True, device=device, warmup=warmup)
else:
pose_model = getattr(pose, pose_model_decoder)(pose_model_path, device=device, warmup=warmup)
return pose_model
def get_det_model(det_model_path, model_type, conf_thre, iou_thre, device, warmup):
det_model = getattr(detection, model_type)(det_model_path, conf_thre, iou_thre, device, warmup)
return det_model
def region_of_interest_warped(
image: np.ndarray,
box: np.ndarray,
target_size=(288, 384),
padding_scale: float = 1.25,
):
start_x, start_y, end_x, end_y = box
target_w, target_h = target_size
# Calculate original bounding box width and height
bbox_w = end_x - start_x
bbox_h = end_y - start_y
if bbox_w <= 0 or bbox_h <= 0:
raise ValueError("Invalid bounding box!")
# Calculate the aspect ratios
bbox_aspect = bbox_w / bbox_h
target_aspect = target_w / target_h
# Adjust the scaled bounding box to match the target aspect ratio
if bbox_aspect > target_aspect:
adjusted_h = bbox_w / target_aspect
adjusted_w = bbox_w
else:
adjusted_w = bbox_h * target_aspect
adjusted_h = bbox_h
# Scale the bounding box by the padding_scale
scaled_bbox_w = adjusted_w * padding_scale
scaled_bbox_h = adjusted_h * padding_scale
# Calculate the center of the original box
center_x = (start_x + end_x) / 2.0
center_y = (start_y + end_y) / 2.0
# Calculate scaled bounding box coordinates
new_start_x = center_x - scaled_bbox_w / 2.0
new_start_y = center_y - scaled_bbox_h / 2.0
new_end_x = center_x + scaled_bbox_w / 2.0
new_end_y = center_y + scaled_bbox_h / 2.0
# Define the new box coordinates
new_box = np.array(
[new_start_x, new_start_y, new_end_x, new_end_y], dtype=np.float32
)
scale = target_w / scaled_bbox_w
# Define source and destination points for affine transformation
# See: /mmpose/structures/bbox/transforms.py
src_pts = np.array(
[
[center_x, center_y],
[new_start_x, center_y],
[new_start_x, center_y + (center_x - new_start_x)],
],
dtype=np.float32,
)
dst_pts = np.array(
[
[target_w * 0.5, target_h * 0.5],
[0, target_h * 0.5],
[0, target_h * 0.5 + (target_w * 0.5 - 0)],
],
dtype=np.float32,
)
# Compute the affine transformation matrix
M = cv2.getAffineTransform(src_pts, dst_pts)
# Apply affine transformation with border filling
extracted_region = cv2.warpAffine(
image,
M,
target_size,
flags=cv2.INTER_LINEAR,
)
return extracted_region, new_box, scale
class TopDown:
def __init__(self,
pose_model_name,
pose_model_decoder,
det_model_name,
conf_threshold=0.6,
iou_threshold=0.6,
device='CUDA',
warmup=30):
if not pose_model_name.endswith('.onnx') and pose_model_name not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model_name))
if not pose_model_name.endswith('.onnx') and pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder,
pose_model_name))
if not pose_model_name.endswith('.onnx') and det_model_name not in AvailableDetModels.DET_MODELS:
raise ValueError(
'The {} detection model is not in the model repository.'.format(det_model_name))
if not pose_model_name.endswith('.onnx'):
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder])
else:
pose_model_path = pose_model_name
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
if not det_model_name.endswith('.onnx'):
det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
det_model_path = os.path.join(det_model_dir,
AvailableDetModels.DET_MODELS[det_model_name]['file_name'])
det_model_type = AvailableDetModels.DET_MODELS[det_model_name]['model_type']
else:
det_model_path = det_model_name
if "rtmdet" in det_model_name:
det_model_type = 'RTMDet'
if os.path.exists(det_model_path):
try:
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
except Exception:
url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
def predict(self, image):
boxes = self.det_model(image)
results = []
for i in range(boxes.shape[0]):
p = Person()
p.box = boxes[i]
region, p.box, _ = region_of_interest_warped(image, p.box)
kp = self.pose_model(region)
# See: /mmpose/models/pose_estimators/topdown.py - add_pred_to_datasample()
th, tw = region.shape[:2]
bw, bh = [p.box[2] - p.box[0], p.box[3] - p.box[1]]
kp[:, :2] /= np.array([tw, th])
kp[:, :2] *= np.array([bw, bh])
kp[:, :2] += np.array([p.box[0] + bw / 2, p.box[1] + bh / 2])
kp[:, :2] -= 0.5 * np.array([bw, bh])
p.keypoints = kp
results.append(p)
return results
class Pose:
def __init__(self,
pose_model_name,
pose_model_decoder,
device='CUDA',
warmup=30):
if pose_model_name not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model_name))
if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder,
pose_model_name))
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder])
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
def predict(self, image):
p = Person()
box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0])
p.box = box
p.keypoints = self.pose_model(image)
return p
class CustomTopDown:
def __init__(self,
pose_model,
det_model,
pose_decoder=None,
device='CUDA',
iou_threshold=0.6,
conf_threshold=0.6,
warmup=30):
if isinstance(pose_model, model.BaseModel):
self.pose_model = pose_model
elif isinstance(pose_model, str):
if pose_model not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model))
if pose_model not in AvailablePoseModels.POSE_MODELS[pose_model]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_decoder,
pose_model))
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder])
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
else:
raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.")
if isinstance(det_model, model.BaseModel):
self.det_model = det_model
elif isinstance(det_model, str):
if det_model not in AvailableDetModels.DET_MODELS:
raise ValueError(
'The {} detection model is not in the model repository.'.format(det_model))
det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
det_model_path = os.path.join(det_model_dir,
AvailableDetModels.DET_MODELS[det_model]['file_name'])
det_model_type = AvailableDetModels.DET_MODELS[det_model]['model_type']
if os.path.exists(det_model_path):
try:
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
except Exception:
url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
raise TypeError("Invalid type for detection model, Please write a custom model based on 'BaseModel'.")
def predict(self, image):
boxes = self.det_model(image)
results = []
for i in range(boxes.shape[0]):
p = Person()
p.box = boxes[i]
region = region_of_interest(image, p.box)
kp = self.pose_model(region)
p.keypoints = restore_keypoints(p.box, kp)
results.append(p)
return results
class CustomSinglePose:
def __init__(self, pose_model):
if isinstance(pose_model, model.BaseModel):
self.pose_model = pose_model
else:
raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.")
def predict(self, image):
p = Person()
box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0])
p.box = box
p.keypoints = self.pose_model(image)
return p

52
extras/easypose/pose.py Normal file
View File

@ -0,0 +1,52 @@
import numpy as np
from typing import List
from .base_model import BaseModel
from .utils import letterbox, get_heatmap_points, \
get_real_keypoints, refine_keypoints_dark, refine_keypoints, simcc_decoder
class Heatmap(BaseModel):
def __init__(self,
model_path: str,
dark: bool = False,
device: str = 'CUDA',
warmup: int = 30):
super(Heatmap, self).__init__(model_path, device, warmup)
self.use_dark = dark
self.img_size = ()
def preprocess(self, image: np.ndarray):
th, tw = self.input_shape[2:]
self.img_size = image.shape[:2]
image, _, _, _ = letterbox(image, (tw, th))
tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395))
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
heatmaps = tensor[0]
heatmaps = np.squeeze(heatmaps, axis=0)
keypoints = get_heatmap_points(heatmaps)
if self.use_dark:
keypoints = refine_keypoints_dark(keypoints, heatmaps, 11)
else:
keypoints = refine_keypoints(keypoints, heatmaps)
keypoints = get_real_keypoints(keypoints, heatmaps, self.img_size)
return keypoints
class SimCC(BaseModel):
def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30):
super(SimCC, self).__init__(model_path, device, warmup)
def preprocess(self, image: np.ndarray):
tensor = np.asarray(image).astype(self.input_type, copy=False)
tensor = np.expand_dims(tensor, axis=0)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
keypoints = np.concatenate(
[tensor[0][0], np.expand_dims(tensor[1][0], axis=-1)], axis=-1
)
return keypoints

View File

@ -0,0 +1,16 @@
#! /bin/bash
xhost +
docker run --privileged --rm --network host -it \
--gpus all --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \
--volume "$(pwd)"/:/RapidPoseTriangulation/ \
--volume "$(pwd)"/extras/easypose/pipeline.py:/EasyPose/easypose/pipeline.py \
--volume "$(pwd)"/extras/easypose/base_model.py:/EasyPose/easypose/model/base_model.py \
--volume "$(pwd)"/extras/easypose/detection.py:/EasyPose/easypose/model/detection.py \
--volume "$(pwd)"/extras/easypose/pose.py:/EasyPose/easypose/model/pose.py \
--volume "$(pwd)"/extras/easypose/utils.py:/EasyPose/easypose/model/utils.py \
--volume "$(pwd)"/../datasets/:/datasets/ \
--volume "$(pwd)"/skelda/:/skelda/ \
--volume /tmp/.X11-unix:/tmp/.X11-unix \
--env DISPLAY --env QT_X11_NO_MITSHM=1 \
rpt_easypose

259
extras/easypose/utils.py Normal file
View File

@ -0,0 +1,259 @@
from itertools import product
from typing import Sequence
import cv2
import numpy as np
def letterbox(img: np.ndarray, target_size: Sequence[int], fill_value: int = 128):
h, w = img.shape[:2]
tw, th = target_size
scale = min(tw / w, th / h)
nw, nh = int(w * scale), int(h * scale)
dx, dy = (tw - nw) // 2, (th - nh) // 2
canvas = np.full((th, tw, img.shape[2]), fill_value, dtype=img.dtype)
canvas[dy:dy + nh, dx:dx + nw, :] = cv2.resize(img, (nw, nh))
return canvas, dx, dy, scale
def intersection_over_union(box1: np.ndarray, box2: np.ndarray):
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
intersection = (x2 - x1) * (y2 - y1)
union = area1 + area2 - intersection
iou = intersection / (union + 1e-6)
return iou
def xywh2xyxy(boxes):
boxes[:, 0] -= boxes[:, 2] / 2
boxes[:, 1] -= boxes[:, 3] / 2
boxes[:, 2] += boxes[:, 0]
boxes[:, 3] += boxes[:, 1]
return boxes
def nms(boxes: np.ndarray, iou_threshold: float, conf_threshold: float):
conf = boxes[..., 4] > conf_threshold
boxes = boxes[conf]
boxes = list(boxes)
boxes.sort(reverse=True, key=lambda x: x[4])
result = []
while boxes:
chosen_box = boxes.pop()
b = []
for box in boxes:
if box[-1] != chosen_box[-1] or \
intersection_over_union(chosen_box, box) \
< iou_threshold:
b.append(box)
result.append(chosen_box)
boxes = b
return np.array(result)
def nms_optimized(boxes: np.ndarray, iou_threshold: float, conf_threshold: float):
"""
Perform Non-Maximum Suppression (NMS) on bounding boxes for a single class.
"""
# Filter out boxes with low confidence scores
scores = boxes[:, 4]
keep = scores > conf_threshold
boxes = boxes[keep]
scores = scores[keep]
if boxes.shape[0] == 0:
return np.empty((0, 5), dtype=boxes.dtype)
# Compute the area of the bounding boxes
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
# Sort the boxes by scores in descending order
order = scores.argsort()[::-1]
keep_indices = []
while order.size > 0:
i = order[0]
keep_indices.append(i)
# Compute IoU of the current box with the rest
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
# Compute width and height of the overlapping area
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
# Compute the area of the intersection
inter = w * h
# Compute the IoU
iou = inter / (areas[i] + areas[order[1:]] - inter)
# Keep boxes with IoU less than the threshold
inds = np.where(iou <= iou_threshold)[0]
# Update the order array
order = order[inds + 1]
# Return the boxes that are kept
return boxes[keep_indices]
def get_heatmap_points(heatmap: np.ndarray):
keypoints = np.zeros([1, heatmap.shape[0], 3], dtype=np.float32)
for i in range(heatmap.shape[0]):
h, w = np.nonzero(heatmap[i] == heatmap[i].max())
h, w = h[0], w[0]
h_fixed = h + 0.5
w_fixed = w + 0.5
score = heatmap[i][h][w]
keypoints[0][i][0] = w_fixed
keypoints[0][i][1] = h_fixed
keypoints[0][i][2] = score
return keypoints
def gaussian_blur(heatmaps: np.ndarray, kernel: int = 11):
assert kernel % 2 == 1
border = (kernel - 1) // 2
K, H, W = heatmaps.shape
for k in range(K):
origin_max = np.max(heatmaps[k])
dr = np.zeros((H + 2 * border, W + 2 * border), dtype=np.float32)
dr[border:-border, border:-border] = heatmaps[k].copy()
dr = cv2.GaussianBlur(dr, (kernel, kernel), 0)
heatmaps[k] = dr[border:-border, border:-border].copy()
heatmaps[k] *= origin_max / np.max(heatmaps[k])
return heatmaps
def refine_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray):
N, K = keypoints.shape[:2]
H, W = heatmaps.shape[:2]
for n, k in product(range(N), range(K)):
x, y = keypoints[n, k, :2].astype(int)
if 1 < x < W - 1 and 0 < y < H:
dx = heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1]
else:
dx = 0.
if 1 < y < H - 1 and 0 < x < W:
dy = heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x]
else:
dy = 0.
keypoints[n, k] += np.sign([dx, dy, 0], dtype=np.float32) * 0.25
return keypoints
def refine_keypoints_dark(keypoints: np.ndarray, heatmaps: np.ndarray, blur_kernel_size: int = 11):
N, K = keypoints.shape[:2]
H, W = heatmaps.shape[1:]
# modulate heatmaps
heatmaps = gaussian_blur(heatmaps, blur_kernel_size)
np.maximum(heatmaps, 1e-10, heatmaps)
np.log(heatmaps, heatmaps)
for n, k in product(range(N), range(K)):
x, y = keypoints[n, k, :2].astype(int)
if 1 < x < W - 2 and 1 < y < H - 2:
dx = 0.5 * (heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1])
dy = 0.5 * (heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x])
dxx = 0.25 * (
heatmaps[k, y, x + 2] - 2 * heatmaps[k, y, x] +
heatmaps[k, y, x - 2])
dxy = 0.25 * (
heatmaps[k, y + 1, x + 1] - heatmaps[k, y - 1, x + 1] -
heatmaps[k, y + 1, x - 1] + heatmaps[k, y - 1, x - 1])
dyy = 0.25 * (
heatmaps[k, y + 2, x] - 2 * heatmaps[k, y, x] +
heatmaps[k, y - 2, x])
derivative = np.array([[dx], [dy]])
hessian = np.array([[dxx, dxy], [dxy, dyy]])
if dxx * dyy - dxy ** 2 != 0:
hessianinv = np.linalg.inv(hessian)
offset = -hessianinv @ derivative
offset = np.squeeze(np.array(offset.T), axis=0)
keypoints[n, k, :2] += offset
return keypoints
def get_real_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray, img_size: Sequence[int]):
img_h, img_w = img_size
heatmap_h, heatmap_w = heatmaps.shape[1:]
heatmap_ratio = heatmaps.shape[1] / heatmaps.shape[2]
img_ratio = img_h / img_w
if heatmap_ratio > img_ratio:
resize_w = img_w
resize_h = int(img_w * heatmap_ratio)
elif heatmap_ratio < img_ratio:
resize_h = img_h
resize_w = int(img_h / heatmap_ratio)
else:
resize_w = img_w
resize_h = img_h
keypoints[:, :, 0] = (keypoints[:, :, 0] / heatmap_w) * resize_w - (resize_w - img_w) / 2
keypoints[:, :, 1] = (keypoints[:, :, 1] / heatmap_h) * resize_h - (resize_h - img_h) / 2
keypoints = np.squeeze(keypoints, axis=0)
return keypoints
def simcc_decoder(
simcc_x: np.ndarray,
simcc_y: np.ndarray,
input_size: Sequence[int],
dx: int,
dy: int,
scale: float,
):
# See: /mmpose/codecs/utils/post_processing.py - get_simcc_maximum()
x = np.argmax(simcc_x, axis=-1, keepdims=True).astype(np.float32)
y = np.argmax(simcc_y, axis=-1, keepdims=True).astype(np.float32)
x_conf = np.max(simcc_x, axis=-1, keepdims=True)
y_conf = np.max(simcc_y, axis=-1, keepdims=True)
conf = np.minimum(x_conf, y_conf)
x /= simcc_x.shape[-1]
y /= simcc_y.shape[-1]
x *= input_size[1]
y *= input_size[0]
keypoints = np.concatenate([x, y, conf], axis=-1)
keypoints[..., 0] -= dx
keypoints[..., 1] -= dy
keypoints[..., :2] /= scale
return keypoints

View File

@ -0,0 +1,68 @@
import os
import cv2
import easypose as ep
import numpy as np
# ==================================================================================================
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
# ==================================================================================================
def load_model():
print("Loading mmpose model ...")
model = ep.TopDown(
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx",
"SimCC",
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx",
conf_threshold=0.3,
iou_threshold=0.3,
warmup=10,
)
print("Loaded mmpose model")
return model
def load_wb_model():
print("Loading mmpose whole body model ...")
model = None
print("Loaded mmpose model")
return model
# ==================================================================================================
def get_2d_pose(model, imgs, num_joints=17):
"""See: https://mmpose.readthedocs.io/en/latest/user_guides/inference.html#basic-usage"""
new_poses = []
for i in range(len(imgs)):
img = imgs[i]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
poses = []
dets = model.predict(img)
for pose in dets:
pose = pose.keypoints
pose = np.asarray(pose)
scores = pose[:, 2].reshape(-1, 1)
scores = np.clip(scores, 0, 1)
pose = np.concatenate((pose[:, :2], scores), axis=-1)
poses.append(pose)
if len(poses) == 0:
poses.append(np.zeros([num_joints, 3]))
poses = np.array(poses)
new_poses.append(poses)
return new_poses

122
extras/mmdeploy/README.md Normal file
View File

@ -0,0 +1,122 @@
# Exporting MMPose models
```bash
docker build --progress=plain -f extras/mmdeploy/dockerfile -t rpt_mmdeploy .
./extras/mmdeploy/run_container.sh
```
<br>
## ONNX
```bash
cd /mmdeploy/
export withFP16="_fp16"
cp /RapidPoseTriangulation/extras/mmdeploy/configs/detection_onnxruntime_static-320x320"$withFP16".py configs/mmdet/detection/
python3 ./tools/deploy.py \
configs/mmdet/detection/detection_onnxruntime_static-320x320"$withFP16".py \
/mmpose/projects/rtmpose/rtmdet/person/rtmdet_nano_320-8xb32_coco-person.py \
https://download.openmmlab.com/mmpose/v1/projects/rtmpose/rtmdet_nano_8xb32-100e_coco-obj365-person-05d8511e.pth \
/mmpose/projects/rtmpose/examples/onnxruntime/human-pose.jpeg \
--work-dir work_dir \
--show
mv /mmdeploy/work_dir/end2end.onnx /RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x3x320x320"$withFP16".onnx
```
```bash
cd /mmdeploy/
export withFP16="_fp16"
cp /RapidPoseTriangulation/extras/mmdeploy/configs/pose-detection_simcc_onnxruntime_static-384x288"$withFP16".py configs/mmpose/
cp /RapidPoseTriangulation/extras/mmdeploy/configs/pose-detection_simcc_onnxruntime_dynamic-384x288"$withFP16".py configs/mmpose/
python3 ./tools/deploy.py \
configs/mmpose/pose-detection_simcc_onnxruntime_static-384x288"$withFP16".py \
/mmpose/projects/rtmpose/rtmpose/body_2d_keypoint/rtmpose-m_8xb256-420e_coco-384x288.py \
https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-m_simcc-body7_pt-body7_420e-384x288-65e718c4_20230504.pth \
/mmpose/projects/rtmpose/examples/onnxruntime/human-pose.jpeg \
--work-dir work_dir \
--show
mv /mmdeploy/work_dir/end2end.onnx /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_1x3x384x288"$withFP16".onnx
python3 ./tools/deploy.py \
configs/mmpose/pose-detection_simcc_onnxruntime_dynamic-384x288"$withFP16".py \
/mmpose/projects/rtmpose/rtmpose/body_2d_keypoint/rtmpose-m_8xb256-420e_coco-384x288.py \
https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-m_simcc-body7_pt-body7_420e-384x288-65e718c4_20230504.pth \
/mmpose/projects/rtmpose/examples/onnxruntime/human-pose.jpeg \
--work-dir work_dir \
--show
mv /mmdeploy/work_dir/end2end.onnx /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_Bx3x384x288"$withFP16".onnx
python3 ./tools/deploy.py \
configs/mmpose/pose-detection_simcc_onnxruntime_static-384x288"$withFP16".py \
/mmpose/projects/rtmpose/rtmpose/wholebody_2d_keypoint/rtmpose-l_8xb32-270e_coco-wholebody-384x288.py \
https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-l_simcc-coco-wholebody_pt-aic-coco_270e-384x288-eaeb96c8_20230125.pth \
/mmpose/projects/rtmpose/examples/onnxruntime/human-pose.jpeg \
--work-dir work_dir \
--show
mv /mmdeploy/work_dir/end2end.onnx /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_1x3x384x288"$withFP16".onnx
python3 ./tools/deploy.py \
configs/mmpose/pose-detection_simcc_onnxruntime_dynamic-384x288"$withFP16".py \
/mmpose/projects/rtmpose/rtmpose/wholebody_2d_keypoint/rtmpose-l_8xb32-270e_coco-wholebody-384x288.py \
https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-l_simcc-coco-wholebody_pt-aic-coco_270e-384x288-eaeb96c8_20230125.pth \
/mmpose/projects/rtmpose/examples/onnxruntime/human-pose.jpeg \
--work-dir work_dir \
--show
mv /mmdeploy/work_dir/end2end.onnx /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_Bx3x384x288"$withFP16".onnx
```
```bash
python3 /RapidPoseTriangulation/extras/mmdeploy/make_extra_graphs.py
```
```bash
python3 /RapidPoseTriangulation/extras/mmdeploy/add_extra_steps.py
```
<br>
## TensorRT
Run this directly in the inference container (the TensorRT versions need to be the same)
```bash
export withFP16="_fp16"
trtexec --fp16 \
--onnx=/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3"$withFP16"_extra-steps.onnx \
--saveEngine=end2end.engine
mv ./end2end.engine /RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3"$withFP16"_extra-steps.engine
trtexec --fp16 \
--onnx=/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_Bx384x288x3"$withFP16"_extra-steps.onnx \
--saveEngine=end2end.engine \
--minShapes=image_input:1x384x288x3 \
--optShapes=image_input:1x384x288x3 \
--maxShapes=image_input:1x384x288x3
mv ./end2end.engine /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_1x384x288x3"$withFP16"_extra-steps.engine
```
<br>
## Benchmark
```bash
cd /mmdeploy/
export withFP16="_fp16"
python3 ./tools/profiler.py \
configs/mmpose/pose-detection_simcc_onnxruntime_static-384x288"$withFP16".py \
/mmpose/projects/rtmpose/rtmpose/body_2d_keypoint/rtmpose-m_8xb256-420e_coco-384x288.py \
/RapidPoseTriangulation/extras/mmdeploy/testimages/ \
--model /RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_1x3x384x288"$withFP16".onnx \
--shape 384x288 \
--device cuda \
--warmup 50 \
--num-iter 200
```

View File

@ -0,0 +1,145 @@
import re
import numpy as np
import onnx
from onnx import TensorProto, helper, numpy_helper
# ==================================================================================================
base_path = "/RapidPoseTriangulation/extras/mmdeploy/exports/"
det_model_path = base_path + "rtmdet-nano_1x3x320x320.onnx"
pose_model_path1 = base_path + "rtmpose-m_Bx3x384x288.onnx"
pose_model_path2 = base_path + "rtmpose-m_1x3x384x288.onnx"
pose_model_path3 = base_path + "rtmpose-l_wb_Bx3x384x288.onnx"
pose_model_path4 = base_path + "rtmpose-l_wb_1x3x384x288.onnx"
norm_mean = -1 * (np.array([0.485, 0.456, 0.406]) * 255)
norm_std = 1.0 / (np.array([0.229, 0.224, 0.225]) * 255)
# ==================================================================================================
def add_steps_to_onnx(model_path):
# Load existing model
model = onnx.load(model_path)
graph = model.graph
mean = norm_mean.astype(np.float32)
std = norm_std.astype(np.float32)
mean = np.reshape(mean, (1, 3, 1, 1)).astype(np.float32)
std = np.reshape(std, (1, 3, 1, 1)).astype(np.float32)
use_fp16 = bool("fp16" in model_path)
if use_fp16:
mean = mean.astype(np.float16)
std = std.astype(np.float16)
# Add the initializers to the graph
mean_initializer = numpy_helper.from_array(mean, name="norm_mean")
std_initializer = numpy_helper.from_array(std, name="norm_std")
graph.initializer.extend([mean_initializer, std_initializer])
# Define layer names, assuming the first input is the image tensor
input_name = graph.input[0].name
# Cast to internal type
# This has to be the first node, because tensorrt does not support uint8 layers
cast_type = 10 if use_fp16 else 1
casted_output = "casted_output"
cast_node = helper.make_node(
"Cast",
inputs=[input_name],
outputs=[casted_output],
to=cast_type,
)
# Node to transpose
transpose_output = "transpose_output"
transpose_node = helper.make_node(
"Transpose",
inputs=[casted_output],
outputs=[transpose_output],
perm=[0, 3, 1, 2],
name="Transpose",
)
# Node to add mean
mean_added_output = "mean_added_output"
mean_add_node = helper.make_node(
"Add",
inputs=[transpose_output, "norm_mean"],
outputs=[mean_added_output],
name="Mean_Addition",
)
# Node to multiply by std
std_mult_output = "std_mult_output"
std_mul_node = helper.make_node(
"Mul",
inputs=[mean_added_output, "norm_std"],
outputs=[std_mult_output],
name="Std_Multiplication",
)
# Replace original input of the model with the output of normalization
for node in graph.node:
for idx, input_name_in_node in enumerate(node.input):
if input_name_in_node == input_name:
node.input[idx] = std_mult_output
# Add the new nodes to the graph
graph.node.insert(0, cast_node)
graph.node.insert(1, transpose_node)
graph.node.insert(2, mean_add_node)
graph.node.insert(3, std_mul_node)
# Transpose the input shape
input_shape = graph.input[0].type.tensor_type.shape.dim
dims = [dim.dim_value for dim in input_shape]
for i, j in enumerate([0, 3, 1, 2]):
input_shape[j].dim_value = dims[i]
# Set the batch size to a defined string
input_shape = graph.input[0].type.tensor_type.shape.dim
if input_shape[0].dim_value == 0:
input_shape[0].dim_param = "batch_size"
# Rename the input tensor
main_input_image_name = model.graph.input[0].name
for node in model.graph.node:
for idx, name in enumerate(node.input):
if name == main_input_image_name:
node.input[idx] = "image_input"
model.graph.input[0].name = "image_input"
# Set input image type to int8
model.graph.input[0].type.tensor_type.elem_type = TensorProto.UINT8
path = re.sub(r"(x)(\d+)x(\d+)x(\d+)", r"\1\3x\4x\2", model_path)
path = path.replace(".onnx", "_extra-steps.onnx")
onnx.save(model, path)
# ==================================================================================================
def main():
add_steps_to_onnx(det_model_path)
add_steps_to_onnx(pose_model_path1)
add_steps_to_onnx(pose_model_path2)
add_steps_to_onnx(pose_model_path3)
add_steps_to_onnx(pose_model_path4)
add_steps_to_onnx(det_model_path.replace(".onnx", "_fp16.onnx"))
add_steps_to_onnx(pose_model_path1.replace(".onnx", "_fp16.onnx"))
add_steps_to_onnx(pose_model_path2.replace(".onnx", "_fp16.onnx"))
add_steps_to_onnx(pose_model_path3.replace(".onnx", "_fp16.onnx"))
add_steps_to_onnx(pose_model_path4.replace(".onnx", "_fp16.onnx"))
# ==================================================================================================
if __name__ == "__main__":
main()

View File

@ -0,0 +1,18 @@
_base_ = ["../_base_/base_static.py", "../../_base_/backends/onnxruntime.py"]
onnx_config = dict(
input_shape=[320, 320],
)
codebase_config = dict(
# For later TensorRT inference, the number of output boxes needs to be as stable as possible,
# because a drop in the box count leads to a re-optimization which takes a lot of time,
# therefore reduce the maximum number of output boxes to the smallest usable value and sort out
# low confidence boxes outside the model.
post_processing=dict(
score_threshold=0.0,
confidence_threshold=0.0,
iou_threshold=0.5,
max_output_boxes_per_class=10,
),
)

View File

@ -0,0 +1,18 @@
_base_ = ["../_base_/base_static.py", "../../_base_/backends/onnxruntime-fp16.py"]
onnx_config = dict(
input_shape=[320, 320],
)
codebase_config = dict(
# For later TensorRT inference, the number of output boxes needs to be as stable as possible,
# because a drop in the box count leads to a re-optimization which takes a lot of time,
# therefore reduce the maximum number of output boxes to the smallest usable value and sort out
# low confidence boxes outside the model.
post_processing=dict(
score_threshold=0.0,
confidence_threshold=0.0,
iou_threshold=0.5,
max_output_boxes_per_class=10,
),
)

View File

@ -0,0 +1,19 @@
_base_ = ["./pose-detection_static.py", "../_base_/backends/onnxruntime.py"]
onnx_config = dict(
input_shape=[288, 384],
output_names=["kpts", "scores"],
dynamic_axes={
"input": {
0: "batch",
},
"kpts": {
0: "batch",
},
"scores": {
0: "batch",
},
},
)
codebase_config = dict(export_postprocess=True) # export get_simcc_maximum

View File

@ -0,0 +1,19 @@
_base_ = ["./pose-detection_static.py", "../_base_/backends/onnxruntime-fp16.py"]
onnx_config = dict(
input_shape=[288, 384],
output_names=["kpts", "scores"],
dynamic_axes={
"input": {
0: "batch",
},
"kpts": {
0: "batch",
},
"scores": {
0: "batch",
},
},
)
codebase_config = dict(export_postprocess=True) # export get_simcc_maximum

View File

@ -0,0 +1,8 @@
_base_ = ["./pose-detection_static.py", "../_base_/backends/onnxruntime.py"]
onnx_config = dict(
input_shape=[288, 384],
output_names=["kpts", "scores"],
)
codebase_config = dict(export_postprocess=True) # export get_simcc_maximum

View File

@ -0,0 +1,8 @@
_base_ = ["./pose-detection_static.py", "../_base_/backends/onnxruntime-fp16.py"]
onnx_config = dict(
input_shape=[288, 384],
output_names=["kpts", "scores"],
)
codebase_config = dict(export_postprocess=True) # export get_simcc_maximum

View File

@ -0,0 +1,38 @@
FROM openmmlab/mmdeploy:ubuntu20.04-cuda11.8-mmdeploy1.3.1
ARG DEBIAN_FRONTEND=noninteractive
ENV LANG=C.UTF-8
ENV LC_ALL=C.UTF-8
WORKDIR /
RUN apt-get update && apt-get install -y --no-install-recommends feh
RUN git clone https://github.com/open-mmlab/mmdeploy.git --depth=1
RUN cd mmdeploy/; python3 tools/scripts/build_ubuntu_x64_ort.py
# Install MMPose
ENV FORCE_CUDA="1"
ENV MMCV_WITH_OPS=1
RUN pip3 install --upgrade --no-cache-dir openmim
RUN mim install mmengine
RUN mim install "mmcv>=2,<2.2.0"
RUN mim install "mmdet>=3"
RUN mim install "mmpose>=1.1.0"
# Fix an error when importing mmpose
RUN pip3 install --upgrade --no-cache-dir "numpy<2" scipy
RUN git clone --depth=1 --branch=main https://github.com/open-mmlab/mmpose.git
RUN echo 'export PYTHONPATH=/mmdeploy/build/lib:$PYTHONPATH' >> ~/.bashrc
RUN echo 'export LD_LIBRARY_PATH=/mmdeploy/../mmdeploy-dep/onnxruntime-linux-x64-1.8.1/lib/:$LD_LIBRARY_PATH' >> ~/.bashrc
# Show images
RUN apt-get update && apt-get install -y --no-install-recommends python3-tk
# Tool for fp16 conversion
RUN pip3 install --upgrade --no-cache-dir onnxconverter_common
# Fix an error when profiling
RUN pip3 install --upgrade --no-cache-dir "onnxruntime-gpu<1.17"
WORKDIR /mmdeploy/
CMD ["/bin/bash"]

2
extras/mmdeploy/exports/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*
!.gitignore

View File

@ -0,0 +1,338 @@
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.ops import roi_align
# ==================================================================================================
base_path = "/RapidPoseTriangulation/extras/mmdeploy/exports/"
det_target_size = (320, 320)
pose_target_size = (384, 288)
# ==================================================================================================
class Letterbox(nn.Module):
def __init__(self, target_size, fill_value=128):
"""Resize and pad image while keeping aspect ratio"""
super(Letterbox, self).__init__()
self.target_size = target_size
self.fill_value = fill_value
def calc_params(self, ishape):
ih, iw = ishape[1], ishape[2]
th, tw = self.target_size
scale = torch.min(tw / iw, th / ih)
nw = torch.round(iw * scale)
nh = torch.round(ih * scale)
pad_w = tw - nw
pad_h = th - nh
pad_left = pad_w // 2
pad_top = pad_h // 2
pad_right = pad_w - pad_left
pad_bottom = pad_h - pad_top
paddings = (pad_left, pad_right, pad_top, pad_bottom)
return paddings, scale, (nw, nh)
def forward(self, img):
paddings, _, (nw, nh) = self.calc_params(img.shape)
# Resize the image
img = img.to(torch.float32)
img = img.permute(0, 3, 1, 2)
img = F.interpolate(
img,
size=(nh, nw),
mode="bilinear",
align_corners=False,
)
img = img.permute(0, 2, 3, 1)
img = img.round()
# Pad the image
img = F.pad(
img.permute(0, 3, 1, 2),
pad=paddings,
mode="constant",
value=self.fill_value,
)
img = img.permute(0, 2, 3, 1)
return img
# ==================================================================================================
class BoxCrop(nn.Module):
def __init__(self, target_size):
"""Crop bounding box from image"""
super(BoxCrop, self).__init__()
self.target_size = target_size
self.padding_scale = 1.25
def calc_params(self, bbox):
start_x, start_y, end_x, end_y = bbox[0, 0], bbox[0, 1], bbox[0, 2], bbox[0, 3]
target_h, target_w = self.target_size
# Calculate original bounding box width, height and center
bbox_w = end_x - start_x
bbox_h = end_y - start_y
center_x = (start_x + end_x) / 2.0
center_y = (start_y + end_y) / 2.0
# Calculate the aspect ratios
bbox_aspect = bbox_w / bbox_h
target_aspect = target_w / target_h
# Adjust the scaled bounding box to match the target aspect ratio
if bbox_aspect > target_aspect:
adjusted_h = bbox_w / target_aspect
adjusted_w = bbox_w
else:
adjusted_w = bbox_h * target_aspect
adjusted_h = bbox_h
# Scale the bounding box by the padding_scale
scaled_bbox_w = adjusted_w * self.padding_scale
scaled_bbox_h = adjusted_h * self.padding_scale
# Calculate scaled bounding box coordinates
new_start_x = center_x - scaled_bbox_w / 2.0
new_start_y = center_y - scaled_bbox_h / 2.0
new_end_x = center_x + scaled_bbox_w / 2.0
new_end_y = center_y + scaled_bbox_h / 2.0
# Define the new box coordinates
new_box = torch.stack((new_start_x, new_start_y, new_end_x, new_end_y), dim=0)
new_box = new_box.unsqueeze(0)
scale = torch.stack(
((target_w / scaled_bbox_w), (target_h / scaled_bbox_h)), dim=0
)
return scale, new_box
def forward(self, img, bbox):
_, bbox = self.calc_params(bbox)
batch_indices = torch.zeros(bbox.shape[0], 1)
rois = torch.cat([batch_indices, bbox], dim=1)
# Resize and crop
img = img.to(torch.float32)
img = img.permute(0, 3, 1, 2)
img = roi_align(
img,
rois,
output_size=self.target_size,
spatial_scale=1.0,
sampling_ratio=0,
)
img = img.permute(0, 2, 3, 1)
img = img.round()
return img
# ==================================================================================================
class DetPreprocess(nn.Module):
def __init__(self, target_size, fill_value=114):
super(DetPreprocess, self).__init__()
self.letterbox = Letterbox(target_size, fill_value)
def forward(self, img):
# img: torch.Tensor of shape [batch, H, W, C], dtype=torch.uint8
img = self.letterbox(img)
return img
# ==================================================================================================
class DetPostprocess(nn.Module):
def __init__(self, target_size):
super(DetPostprocess, self).__init__()
self.target_size = target_size
self.letterbox = Letterbox(target_size)
def forward(self, img, boxes):
paddings, scale, _ = self.letterbox.calc_params(img.shape)
boxes = boxes.float()
boxes[:, :, 0] -= paddings[0]
boxes[:, :, 2] -= paddings[0]
boxes[:, :, 1] -= paddings[2]
boxes[:, :, 3] -= paddings[2]
zero = torch.tensor(0)
boxes = torch.max(boxes, zero)
th, tw = self.target_size
pad_w = paddings[0] + paddings[1]
pad_h = paddings[2] + paddings[3]
max_w = tw - pad_w - 1
max_h = th - pad_h - 1
b0 = boxes[:, :, 0]
b1 = boxes[:, :, 1]
b2 = boxes[:, :, 2]
b3 = boxes[:, :, 3]
b0 = torch.min(b0, max_w)
b1 = torch.min(b1, max_h)
b2 = torch.min(b2, max_w)
b3 = torch.min(b3, max_h)
boxes[:, :, 0] = b0
boxes[:, :, 1] = b1
boxes[:, :, 2] = b2
boxes[:, :, 3] = b3
boxes[:, :, 0:4] /= scale
return boxes
# ==================================================================================================
class PosePreprocess(nn.Module):
def __init__(self, target_size, fill_value=114):
super(PosePreprocess, self).__init__()
self.boxcrop = BoxCrop(target_size)
def forward(self, img, bbox):
# img: torch.Tensor of shape [1, H, W, C], dtype=torch.uint8
# bbox: torch.Tensor of shape [1, 4], dtype=torch.float32
img = self.boxcrop(img, bbox)
return img
# ==================================================================================================
class PosePostprocess(nn.Module):
def __init__(self, target_size):
super(PosePostprocess, self).__init__()
self.boxcrop = BoxCrop(target_size)
self.target_size = target_size
def forward(self, img, bbox, keypoints):
scale, bbox = self.boxcrop.calc_params(bbox)
kp = keypoints.float()
kp[:, :, 0:2] /= scale
kp[:, :, 0] += bbox[0, 0]
kp[:, :, 1] += bbox[0, 1]
zero = torch.tensor(0)
kp = torch.max(kp, zero)
max_w = img.shape[2] - 1
max_h = img.shape[1] - 1
k0 = kp[:, :, 0]
k1 = kp[:, :, 1]
k0 = torch.min(k0, max_w)
k1 = torch.min(k1, max_h)
kp[:, :, 0] = k0
kp[:, :, 1] = k1
return kp
# ==================================================================================================
def main():
img_path = "/RapidPoseTriangulation/scripts/../data/h1/54138969-img_003201.jpg"
image = cv2.imread(img_path, 3)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Initialize the DetPreprocess module
preprocess_model = DetPreprocess(target_size=det_target_size)
det_dummy_input_a0 = torch.from_numpy(image).unsqueeze(0)
# Export to ONNX
torch.onnx.export(
preprocess_model,
det_dummy_input_a0,
base_path + "det_preprocess.onnx",
opset_version=11,
input_names=["input_image"],
output_names=["preprocessed_image"],
dynamic_axes={
"input_image": {0: "batch_size", 1: "height", 2: "width"},
"preprocessed_image": {0: "batch_size"},
},
)
# Initialize the DetPostprocess module
postprocess_model = DetPostprocess(target_size=det_target_size)
det_dummy_input_b0 = torch.from_numpy(image).unsqueeze(0)
det_dummy_input_b1 = torch.rand(1, 10, 5)
# Export to ONNX
torch.onnx.export(
postprocess_model,
(det_dummy_input_b0, det_dummy_input_b1),
base_path + "det_postprocess.onnx",
opset_version=11,
input_names=["input_image", "boxes"],
output_names=["output_boxes"],
dynamic_axes={
"input_image": {0: "batch_size", 1: "height", 2: "width"},
"boxes": {0: "batch_size", 1: "num_boxes"},
"output_boxes": {0: "batch_size", 1: "num_boxes"},
},
)
# Initialize the PosePreprocess module
preprocess_model = PosePreprocess(target_size=pose_target_size)
det_dummy_input_c0 = torch.from_numpy(image).unsqueeze(0)
det_dummy_input_c1 = torch.tensor([[352, 339, 518, 594]]).to(torch.int32)
# Export to ONNX
torch.onnx.export(
preprocess_model,
(det_dummy_input_c0, det_dummy_input_c1),
base_path + "pose_preprocess.onnx",
opset_version=11,
input_names=["input_image", "bbox"],
output_names=["preprocessed_image"],
dynamic_axes={
"input_image": {0: "batch_size", 1: "height", 2: "width"},
"preprocessed_image": {0: "batch_size"},
},
)
# Initialize the PosePostprocess module
postprocess_model = PosePostprocess(target_size=pose_target_size)
det_dummy_input_d0 = torch.from_numpy(image).unsqueeze(0)
det_dummy_input_d1 = torch.tensor([[352, 339, 518, 594]]).to(torch.int32)
det_dummy_input_d2 = torch.rand(1, 17, 2)
# Export to ONNX
torch.onnx.export(
postprocess_model,
(det_dummy_input_d0, det_dummy_input_d1, det_dummy_input_d2),
base_path + "pose_postprocess.onnx",
opset_version=11,
input_names=["input_image", "bbox", "keypoints"],
output_names=["output_keypoints"],
dynamic_axes={
"input_image": {0: "batch_size", 1: "height", 2: "width"},
"output_keypoints": {0: "batch_size"},
},
)
# ==================================================================================================
if __name__ == "__main__":
main()

View File

@ -0,0 +1,9 @@
#! /bin/bash
xhost +
docker run --privileged --rm --network host -it \
--gpus all --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \
--volume "$(pwd)"/:/RapidPoseTriangulation/ \
--volume /tmp/.X11-unix:/tmp/.X11-unix \
--env DISPLAY --env QT_X11_NO_MITSHM=1 \
rpt_mmdeploy

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

File diff suppressed because it is too large Load Diff

View File

@ -4,9 +4,9 @@
// ================================================================================================= // =================================================================================================
// ================================================================================================= // =================================================================================================
Triangulator::Triangulator(float min_score, size_t min_group_size) Triangulator::Triangulator(float min_match_score, size_t min_group_size)
{ {
this->triangulator = new TriangulatorInternal(min_score, min_group_size); this->triangulator = new TriangulatorInternal(min_match_score, min_group_size);
} }
// ================================================================================================= // =================================================================================================

View File

@ -19,11 +19,11 @@ public:
* Triangulator to predict poses from multiple views. * Triangulator to predict poses from multiple views.
* *
* *
* @param min_score Minimum score to consider a triangulated joint as valid. * @param min_match_score Minimum score to consider a triangulated joint as valid.
* @param min_group_size Minimum number of camera pairs that need to see a person. * @param min_group_size Minimum number of camera pairs that need to see a person.
*/ */
Triangulator( Triangulator(
float min_score = 0.95, float min_match_score = 0.95,
size_t min_group_size = 1); size_t min_group_size = 1);
/** /**

View File

@ -102,9 +102,9 @@ void CameraInternal::update_projection_matrix()
// ================================================================================================= // =================================================================================================
// ================================================================================================= // =================================================================================================
TriangulatorInternal::TriangulatorInternal(float min_score, size_t min_group_size) TriangulatorInternal::TriangulatorInternal(float min_match_score, size_t min_group_size)
{ {
this->min_score = min_score; this->min_match_score = min_match_score;
this->min_group_size = min_group_size; this->min_group_size = min_group_size;
} }
@ -241,7 +241,7 @@ std::vector<std::vector<std::array<float, 4>>> TriangulatorInternal::triangulate
stime = std::chrono::high_resolution_clock::now(); stime = std::chrono::high_resolution_clock::now();
// Check matches to old poses // Check matches to old poses
float threshold = min_score - 0.2; float threshold = min_match_score - 0.2;
std::map<size_t, std::map<size_t, std::vector<size_t>>> scored_pasts; std::map<size_t, std::map<size_t, std::vector<size_t>>> scored_pasts;
if (!last_poses_3d.empty()) if (!last_poses_3d.empty())
{ {
@ -447,7 +447,7 @@ std::vector<std::vector<std::array<float, 4>>> TriangulatorInternal::triangulate
size_t num_poses = all_scored_poses.size(); size_t num_poses = all_scored_poses.size();
for (size_t i = num_poses; i > 0; --i) for (size_t i = num_poses; i > 0; --i)
{ {
if (all_scored_poses[i - 1].second < min_score) if (all_scored_poses[i - 1].second < min_match_score)
{ {
all_scored_poses.erase(all_scored_poses.begin() + i - 1); all_scored_poses.erase(all_scored_poses.begin() + i - 1);
all_pairs.erase(all_pairs.begin() + i - 1); all_pairs.erase(all_pairs.begin() + i - 1);
@ -456,7 +456,7 @@ std::vector<std::vector<std::array<float, 4>>> TriangulatorInternal::triangulate
// Group pairs that share a person // Group pairs that share a person
std::vector<std::tuple<cv::Point3f, cv::Mat, std::vector<int>>> groups; std::vector<std::tuple<cv::Point3f, cv::Mat, std::vector<int>>> groups;
groups = calc_grouping(all_pairs, all_scored_poses, min_score); groups = calc_grouping(all_pairs, all_scored_poses, min_match_score);
// Drop groups with too few matches // Drop groups with too few matches
size_t num_groups = groups.size(); size_t num_groups = groups.size();
@ -508,7 +508,7 @@ std::vector<std::vector<std::array<float, 4>>> TriangulatorInternal::triangulate
poses.push_back(all_full_poses[idx]); poses.push_back(all_full_poses[idx]);
} }
auto merged_pose = merge_group(poses, min_score); auto merged_pose = merge_group(poses, min_match_score);
all_merged_poses[i] = (merged_pose); all_merged_poses[i] = (merged_pose);
} }
@ -548,7 +548,7 @@ std::vector<std::vector<std::array<float, 4>>> TriangulatorInternal::triangulate
} }
pose.push_back(point); pose.push_back(point);
if (point[3] > min_score) if (point[3] > min_match_score)
{ {
num_valid++; num_valid++;
} }
@ -1860,7 +1860,7 @@ void TriangulatorInternal::add_missing_joints(
for (size_t j = 0; j < num_joints; ++j) for (size_t j = 0; j < num_joints; ++j)
{ {
float *pose_ptr = pose.ptr<float>(j); float *pose_ptr = pose.ptr<float>(j);
if (pose_ptr[3] > min_score) if (pose_ptr[3] > min_match_score)
{ {
valid_joint_idx.push_back(j); valid_joint_idx.push_back(j);
} }

View File

@ -31,7 +31,7 @@ public:
class TriangulatorInternal class TriangulatorInternal
{ {
public: public:
TriangulatorInternal(float min_score, size_t min_group_size); TriangulatorInternal(float min_match_score, size_t min_group_size);
std::vector<std::vector<std::array<float, 4>>> triangulate_poses( std::vector<std::vector<std::array<float, 4>>> triangulate_poses(
const std::vector<std::vector<std::vector<std::array<float, 3>>>> &poses_2d, const std::vector<std::vector<std::vector<std::array<float, 3>>>> &poses_2d,
@ -43,7 +43,7 @@ public:
void print_stats(); void print_stats();
private: private:
float min_score; float min_match_score;
float min_group_size; float min_group_size;
const std::vector<std::string> core_joints = { const std::vector<std::string> core_joints = {

View File

@ -17,8 +17,8 @@ import rpt
# ================================================================================================== # ==================================================================================================
# dataset_use = "panoptic"
dataset_use = "human36m" dataset_use = "human36m"
# dataset_use = "panoptic"
# dataset_use = "mvor" # dataset_use = "mvor"
# dataset_use = "shelf" # dataset_use = "shelf"
# dataset_use = "campus" # dataset_use = "campus"
@ -26,8 +26,47 @@ dataset_use = "human36m"
# dataset_use = "chi3d" # dataset_use = "chi3d"
# dataset_use = "tsinghua" # dataset_use = "tsinghua"
# dataset_use = "human36m_wb" # dataset_use = "human36m_wb"
# dataset_use = "egohumans" # dataset_use = "egohumans_tagging"
# dataset_use = "egohumans_legoassemble"
# dataset_use = "egohumans_fencing"
# dataset_use = "egohumans_basketball"
# dataset_use = "egohumans_volleyball"
# dataset_use = "egohumans_badminton"
# dataset_use = "egohumans_tennis"
# dataset_use = "ntu"
# dataset_use = "koarob"
# Describes the minimum area as fraction of the image size for a 2D bounding box to be considered
# If the persons are small in the image, use a lower value
default_min_bbox_area = 0.1 * 0.1
# Describes how confident a 2D bounding box needs to be to be considered
# If the persons are small in the image, or poorly recognizable, use a lower value
default_min_bbox_score = 0.3
# Describes how good two 2D poses need to match each other to create a valid triangulation
# If the quality of the 2D detections is poor, use a lower value
default_min_match_score = 0.94
# Describes the minimum number of camera pairs that need to detect the same person
# If the number of cameras is high, and the views are not occluded, use a higher value
default_min_group_size = 1
# Batch poses per image for faster processing
# If most of the time only one person is in a image, disable it, because it is slightly slower then
default_batch_poses = True
datasets = { datasets = {
"human36m": {
"path": "/datasets/human36m/skelda/pose_test.json",
"take_interval": 5,
"min_match_score": 0.94,
"min_group_size": 1,
"min_bbox_score": 0.4,
"min_bbox_area": 0.1 * 0.1,
"batch_poses": False,
},
"panoptic": { "panoptic": {
"path": "/datasets/panoptic/skelda/test.json", "path": "/datasets/panoptic/skelda/test.json",
"cams": ["00_03", "00_06", "00_12", "00_13", "00_23"], "cams": ["00_03", "00_06", "00_12", "00_13", "00_23"],
@ -35,27 +74,33 @@ datasets = {
# "cams": ["00_03", "00_06", "00_12", "00_13", "00_23", "00_15", "00_10", "00_21", "00_09", "00_01"], # "cams": ["00_03", "00_06", "00_12", "00_13", "00_23", "00_15", "00_10", "00_21", "00_09", "00_01"],
"take_interval": 3, "take_interval": 3,
"use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"], "use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"],
}, "min_group_size": 1,
"human36m": { # "min_group_size": 4,
"path": "/datasets/human36m/skelda/pose_test.json", "min_bbox_area": 0.05 * 0.05,
"take_interval": 5,
}, },
"mvor": { "mvor": {
"path": "/datasets/mvor/skelda/all.json", "path": "/datasets/mvor/skelda/all.json",
"take_interval": 1, "take_interval": 1,
"with_depth": False, "with_depth": False,
"min_match_score": 0.85,
"min_bbox_score": 0.25,
}, },
"campus": { "campus": {
"path": "/datasets/campus/skelda/test.json", "path": "/datasets/campus/skelda/test.json",
"take_interval": 1, "take_interval": 1,
"min_bbox_score": 0.5,
}, },
"shelf": { "shelf": {
"path": "/datasets/shelf/skelda/test.json", "path": "/datasets/shelf/skelda/test.json",
"take_interval": 1, "take_interval": 1,
"min_match_score": 0.96,
"min_group_size": 2,
}, },
"ikeaasm": { "ikeaasm": {
"path": "/datasets/ikeaasm/skelda/test.json", "path": "/datasets/ikeaasm/skelda/test.json",
"take_interval": 2, "take_interval": 2,
"min_match_score": 0.92,
"min_bbox_score": 0.20,
}, },
"chi3d": { "chi3d": {
"path": "/datasets/chi3d/skelda/all.json", "path": "/datasets/chi3d/skelda/all.json",
@ -64,21 +109,66 @@ datasets = {
"tsinghua": { "tsinghua": {
"path": "/datasets/tsinghua/skelda/test.json", "path": "/datasets/tsinghua/skelda/test.json",
"take_interval": 3, "take_interval": 3,
"min_group_size": 2,
}, },
"human36m_wb": { "human36m_wb": {
"path": "/datasets/human36m/skelda/wb/test.json", "path": "/datasets/human36m/skelda/wb/test.json",
"take_interval": 100, "take_interval": 100,
"min_bbox_score": 0.4,
"batch_poses": False,
}, },
"egohumans": { "egohumans_tagging": {
"path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2,
"subset": "tagging",
"min_group_size": 2,
"min_bbox_score": 0.25,
"min_bbox_area": 0.05 * 0.05,
},
"egohumans_legoassemble": {
"path": "/datasets/egohumans/skelda/all.json", "path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2, "take_interval": 2,
# "subset": "tagging",
"subset": "legoassemble", "subset": "legoassemble",
# "subset": "fencing", "min_group_size": 2,
# "subset": "basketball", },
# "subset": "volleyball", "egohumans_fencing": {
# "subset": "badminton", "path": "/datasets/egohumans/skelda/all.json",
# "subset": "tennis", "take_interval": 2,
"subset": "fencing",
"min_group_size": 7,
"min_bbox_score": 0.5,
"min_bbox_area": 0.05 * 0.05,
},
"egohumans_basketball": {
"path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2,
"subset": "basketball",
"min_group_size": 7,
"min_bbox_score": 0.25,
"min_bbox_area": 0.025 * 0.025,
},
"egohumans_volleyball": {
"path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2,
"subset": "volleyball",
"min_group_size": 11,
"min_bbox_score": 0.25,
"min_bbox_area": 0.05 * 0.05,
},
"egohumans_badminton": {
"path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2,
"subset": "badminton",
"min_group_size": 7,
"min_bbox_score": 0.25,
"min_bbox_area": 0.05 * 0.05,
},
"egohumans_tennis": {
"path": "/datasets/egohumans/skelda/all.json",
"take_interval": 2,
"subset": "tennis",
"min_group_size": 11,
"min_bbox_area": 0.025 * 0.025,
}, },
} }
@ -99,11 +189,15 @@ eval_joints = [
"ankle_left", "ankle_left",
"ankle_right", "ankle_right",
] ]
if dataset_use in ["human36m", "panoptic"]: if dataset_use == "human36m":
eval_joints[eval_joints.index("head")] = "nose" eval_joints[eval_joints.index("head")] = "nose"
if dataset_use.endswith("_wb"): if dataset_use == "panoptic":
# eval_joints[eval_joints.index("head")] = "nose" eval_joints[eval_joints.index("head")] = "nose"
eval_joints = list(joint_names_2d) if dataset_use == "human36m_wb":
if any((test_triangulate.whole_body.values())):
eval_joints = list(joint_names_2d)
else:
eval_joints[eval_joints.index("head")] = "nose"
# output_dir = "/RapidPoseTriangulation/data/testoutput/" # output_dir = "/RapidPoseTriangulation/data/testoutput/"
output_dir = "" output_dir = ""
@ -191,11 +285,11 @@ def load_labels(dataset: dict):
elif "human36m_wb" in dataset: elif "human36m_wb" in dataset:
labels = load_json(dataset["human36m_wb"]["path"]) labels = load_json(dataset["human36m_wb"]["path"])
elif "egohumans" in dataset: elif any(("egohumans" in key for key in dataset)):
labels = load_json(dataset["egohumans"]["path"]) labels = load_json(dataset[dataset_use]["path"])
labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if "test" in lb["splits"]]
labels = [lb for lb in labels if dataset["egohumans"]["subset"] in lb["seq"]] labels = [lb for lb in labels if dataset[dataset_use]["subset"] in lb["seq"]]
if dataset["egohumans"]["subset"] in ["volleyball", "tennis"]: if dataset[dataset_use]["subset"] in ["volleyball", "tennis"]:
labels = [lb for i, lb in enumerate(labels) if i % 150 < 60] labels = [lb for i, lb in enumerate(labels) if i % 150 < 60]
else: else:
@ -216,11 +310,21 @@ def load_labels(dataset: dict):
def main(): def main():
global joint_names_3d, eval_joints global joint_names_3d, eval_joints
# Load dataset specific parameters
min_match_score = datasets[dataset_use].get(
"min_match_score", default_min_match_score
)
min_group_size = datasets[dataset_use].get("min_group_size", default_min_group_size)
min_bbox_score = datasets[dataset_use].get("min_bbox_score", default_min_bbox_score)
min_bbox_area = datasets[dataset_use].get("min_bbox_area", default_min_bbox_area)
batch_poses = datasets[dataset_use].get("batch_poses", default_batch_poses)
# Load 2D pose model
whole_body = test_triangulate.whole_body whole_body = test_triangulate.whole_body
if any((whole_body[k] for k in whole_body)): if any((whole_body[k] for k in whole_body)):
kpt_model = utils_2d_pose.load_wb_model() kpt_model = utils_2d_pose.load_wb_model()
else: else:
kpt_model = utils_2d_pose.load_model() kpt_model = utils_2d_pose.load_model(min_bbox_score, min_bbox_area, batch_poses)
# Manually set matplotlib backend # Manually set matplotlib backend
try: try:
@ -239,68 +343,19 @@ def main():
# Print a dataset sample for debugging # Print a dataset sample for debugging
print(labels[0]) print(labels[0])
minscores = { print("\nCalculating 2D predictions ...")
# Describes how good two 2D poses need to match each other to create a valid triangulation all_poses_2d = []
# If the quality of the 2D detections is poor, use a lower value
"panoptic": 0.94,
"human36m": 0.94,
"mvor": 0.86,
"campus": 0.96,
"shelf": 0.96,
"ikeaasm": 0.89,
"chi3d": 0.94,
"tsinghua": 0.96,
"egohumans": 0.95,
"human36m_wb": 0.94,
}
minscore = minscores.get(dataset_use, 0.95)
min_group_sizes = {
# Describes the minimum number of camera pairs that need to detect the same person
# If the number of cameras is high, and the views are not occluded, use a higher value
"panoptic": 1,
"shelf": 2,
"chi3d": 1,
"tsinghua": 2,
"egohumans": 4,
}
min_group_size = min_group_sizes.get(dataset_use, 1)
if dataset_use == "panoptic" and len(datasets["panoptic"]["cams"]) == 10:
min_group_size = 4
if dataset_use == "egohumans" and (
"lego" in labels[0]["seq"] or "tagging" in labels[0]["seq"]
):
min_group_size = 2
if dataset_use == "egohumans" and (
"volleyball" in labels[0]["seq"] or "badminton" in labels[0]["seq"]
):
min_group_size = 7
if dataset_use == "egohumans" and "tennis" in labels[0]["seq"]:
min_group_size = 11
print("\nRunning predictions ...")
all_poses = []
all_ids = []
times = [] times = []
triangulator = rpt.Triangulator(min_score=minscore, min_group_size=min_group_size)
old_scene = ""
old_index = -1
for label in tqdm.tqdm(labels): for label in tqdm.tqdm(labels):
images_2d = [] images_2d = []
if old_scene != label.get("scene", "") or (
old_index + datasets[dataset_use]["take_interval"] < label["index"]
):
# Reset last poses if scene changes
old_scene = label.get("scene", "")
triangulator.reset()
try: try:
start = time.time() start = time.time()
for i in range(len(label["imgpaths"])): for i in range(len(label["imgpaths"])):
imgpath = label["imgpaths"][i] imgpath = label["imgpaths"][i]
img = test_triangulate.load_image(imgpath) img = test_triangulate.load_image(imgpath)
images_2d.append(img) images_2d.append(img)
print("IMG time:", time.time() - start) time_imgs = time.time() - start
except cv2.error: except cv2.error:
print("One of the paths not found:", label["imgpaths"]) print("One of the paths not found:", label["imgpaths"])
continue continue
@ -322,7 +377,28 @@ def main():
poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d)
poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d) poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d)
time_2d = time.time() - start time_2d = time.time() - start
print("2D time:", time_2d)
all_poses_2d.append(poses_2d)
times.append([time_imgs, time_2d, 0])
print("\nCalculating 3D predictions ...")
all_poses_3d = []
all_ids = []
triangulator = rpt.Triangulator(
min_match_score=min_match_score, min_group_size=min_group_size
)
old_scene = ""
old_index = -1
for i in tqdm.tqdm(range(len(labels))):
label = labels[i]
poses_2d = all_poses_2d[i]
if old_scene != label.get("scene", "") or (
old_index + datasets[dataset_use]["take_interval"] < label["index"]
):
# Reset last poses if scene changes
old_scene = label.get("scene", "")
triangulator.reset()
start = time.time() start = time.time()
if sum(np.sum(p) for p in poses_2d) == 0: if sum(np.sum(p) for p in poses_2d) == 0:
@ -333,14 +409,12 @@ def main():
poses3D = triangulator.triangulate_poses( poses3D = triangulator.triangulate_poses(
poses_2d, rpt_cameras, roomparams, joint_names_2d poses_2d, rpt_cameras, roomparams, joint_names_2d
) )
time_3d = time.time() - start time_3d = time.time() - start
print("3D time:", time_3d)
old_index = label["index"] old_index = label["index"]
all_poses.append(np.array(poses3D).tolist()) all_poses_3d.append(np.array(poses3D).tolist())
all_ids.append(label["id"]) all_ids.append(label["id"])
times.append((time_2d, time_3d)) times[i][2] = time_3d
# Print per-step triangulation timings # Print per-step triangulation timings
print("") print("")
@ -349,9 +423,11 @@ def main():
warmup_iters = 10 warmup_iters = 10
if len(times) > warmup_iters: if len(times) > warmup_iters:
times = times[warmup_iters:] times = times[warmup_iters:]
avg_time_2d = np.mean([t[0] for t in times]) avg_time_im = np.mean([t[0] for t in times])
avg_time_3d = np.mean([t[1] for t in times]) avg_time_2d = np.mean([t[1] for t in times])
avg_time_3d = np.mean([t[2] for t in times])
tstats = { tstats = {
"img_loading": avg_time_im,
"avg_time_2d": avg_time_2d, "avg_time_2d": avg_time_2d,
"avg_time_3d": avg_time_3d, "avg_time_3d": avg_time_3d,
"avg_fps": 1.0 / (avg_time_2d + avg_time_3d), "avg_fps": 1.0 / (avg_time_2d + avg_time_3d),
@ -361,7 +437,7 @@ def main():
_ = evals.mpjpe.run_eval( _ = evals.mpjpe.run_eval(
labels, labels,
all_poses, all_poses_3d,
all_ids, all_ids,
joint_names_net=joint_names_3d, joint_names_net=joint_names_3d,
joint_names_use=eval_joints, joint_names_use=eval_joints,
@ -369,7 +445,7 @@ def main():
) )
_ = evals.pcp.run_eval( _ = evals.pcp.run_eval(
labels, labels,
all_poses, all_poses_3d,
all_ids, all_ids,
joint_names_net=joint_names_3d, joint_names_net=joint_names_3d,
joint_names_use=eval_joints, joint_names_use=eval_joints,

View File

@ -220,7 +220,7 @@ def update_sample(sample, new_dir=""):
def load_image(path: str): def load_image(path: str):
image = cv2.imread(path, 3) image = cv2.imread(path, 3)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = np.array(image, dtype=np.float32) image = np.asarray(image, dtype=np.uint8)
return image return image
@ -283,7 +283,7 @@ def main():
if any((whole_body[k] for k in whole_body)): if any((whole_body[k] for k in whole_body)):
kpt_model = utils_2d_pose.load_wb_model() kpt_model = utils_2d_pose.load_wb_model()
else: else:
kpt_model = utils_2d_pose.load_model() kpt_model = utils_2d_pose.load_model(min_bbox_score=0.3)
# Manually set matplotlib backend # Manually set matplotlib backend
matplotlib.use("TkAgg") matplotlib.use("TkAgg")
@ -340,7 +340,7 @@ def main():
else: else:
cameras = rpt.convert_cameras(camparams) cameras = rpt.convert_cameras(camparams)
roomp = [roomparams["room_size"], roomparams["room_center"]] roomp = [roomparams["room_size"], roomparams["room_center"]]
triangulator = rpt.Triangulator(min_score=0.95) triangulator = rpt.Triangulator(min_match_score=0.94)
stime = time.time() stime = time.time()
poses_3d = triangulator.triangulate_poses( poses_3d = triangulator.triangulate_poses(

View File

@ -1,42 +1,498 @@
import math
import os import os
from abc import ABC, abstractmethod
from typing import List
import cv2
import numpy as np import numpy as np
from mmpose.apis import MMPoseInferencer import onnxruntime as ort
from tqdm import tqdm
# ==================================================================================================
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
# ================================================================================================== # ==================================================================================================
def load_model(): class BaseModel(ABC):
print("Loading mmpose model ...") def __init__(self, model_path: str, warmup: int):
self.model_path = model_path
self.runtime = ""
model = MMPoseInferencer( if not os.path.exists(model_path):
pose2d="/mmpose/projects/rtmpose/rtmpose/body_2d_keypoint/rtmpose-m_8xb256-420e_coco-384x288.py", raise FileNotFoundError("File not found:", model_path)
pose2d_weights="https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-m_simcc-body7_pt-body7_420e-384x288-65e718c4_20230504.pth",
det_model="/mmpose/projects/rtmpose/rtmdet/person/rtmdet_nano_320-8xb32_coco-person.py", if model_path.endswith(".onnx"):
det_weights="https://download.openmmlab.com/mmpose/v1/projects/rtmpose/rtmdet_nano_8xb32-100e_coco-obj365-person-05d8511e.pth", self.init_onnxruntime(model_path)
det_cat_ids=[0], self.runtime = "ort"
else:
raise ValueError("Unsupported model format:", model_path)
if warmup > 0:
print("Running warmup for '{}' ...".format(self.__class__.__name__))
self.warmup(warmup // 2)
self.warmup(warmup // 2)
def init_onnxruntime(self, model_path):
usetrt = True
usegpu = True
self.opt = ort.SessionOptions()
providers = ort.get_available_providers()
# ort.set_default_logger_severity(1)
self.providers = []
if usetrt and "TensorrtExecutionProvider" in providers:
self.providers.append(
(
"TensorrtExecutionProvider",
{
"trt_engine_cache_enable": True,
"trt_engine_cache_path": "/RapidPoseTriangulation/data/trt_cache/",
},
)
)
elif usegpu and "CUDAExecutionProvider" in providers:
self.providers.append("CUDAExecutionProvider")
else:
self.providers.append("CPUExecutionProvider")
print("Using providers:", self.providers)
self.session = ort.InferenceSession(
model_path, providers=self.providers, sess_options=self.opt
)
self.input_names = [input.name for input in self.session.get_inputs()]
self.input_shapes = [input.shape for input in self.session.get_inputs()]
input_types = [input.type for input in self.session.get_inputs()]
self.input_types = []
for i in range(len(input_types)):
input_type = input_types[i]
if input_type == "tensor(float32)":
itype = np.float32
elif input_type == "tensor(float16)":
itype = np.float16
elif input_type == "tensor(int32)":
itype = np.int32
elif input_type == "tensor(uint8)":
itype = np.uint8
else:
raise ValueError("Undefined input type:", input_type)
self.input_types.append(itype)
@abstractmethod
def preprocess(self, **kwargs):
pass
@abstractmethod
def postprocess(self, **kwargs):
pass
def warmup(self, epoch: int):
np.random.seed(42)
for _ in tqdm(range(epoch)):
inputs = {}
for i in range(len(self.input_names)):
iname = self.input_names[i]
if "image" in iname:
ishape = list(self.input_shapes[i])
if "batch_size" in ishape:
max_batch_size = 10
ishape[0] = np.random.choice(list(range(1, max_batch_size + 1)))
tensor = np.random.random(ishape)
tensor = tensor * 255
else:
raise ValueError("Undefined input type:", iname)
tensor = tensor.astype(self.input_types[i])
inputs[iname] = tensor
self.call_model_ort(list(inputs.values()))
def call_model_ort(self, tensor):
inputs = {}
for i in range(len(self.input_names)):
iname = self.input_names[i]
inputs[iname] = tensor[i]
result = self.session.run(None, inputs)
return result
def __call__(self, **kwargs):
tensor = self.preprocess(**kwargs)
result = self.call_model_ort(tensor)
output = self.postprocess(result=result, **kwargs)
return output
# ==================================================================================================
class LetterBox:
def __init__(self, target_size, fill_value=0):
self.target_size = target_size
self.fill_value = fill_value
def calc_params(self, ishape):
img_h, img_w = ishape[:2]
target_h, target_w = self.target_size
scale = min(target_w / img_w, target_h / img_h)
new_w = round(img_w * scale)
new_h = round(img_h * scale)
pad_w = target_w - new_w
pad_h = target_h - new_h
pad_left = pad_w // 2
pad_top = pad_h // 2
pad_right = pad_w - pad_left
pad_bottom = pad_h - pad_top
paddings = (pad_left, pad_right, pad_top, pad_bottom)
return paddings, scale, (new_w, new_h)
def resize_image(self, image):
paddings, _, new_size = self.calc_params(image.shape)
# Resize the image
new_w, new_h = new_size
resized_img = cv2.resize(
image,
(new_w, new_h),
interpolation=cv2.INTER_NEAREST,
)
# Optionally pad the image
pad_left, pad_right, pad_top, pad_bottom = paddings
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
final_img = resized_img
else:
final_img = cv2.copyMakeBorder(
resized_img,
pad_top,
pad_bottom,
pad_left,
pad_right,
borderType=cv2.BORDER_CONSTANT,
value=[self.fill_value, self.fill_value, self.fill_value],
)
return final_img
# ==================================================================================================
class BoxCrop:
def __init__(self, target_size, padding_scale=1.0, fill_value=0):
self.target_size = target_size
self.padding_scale = padding_scale
self.fill_value = fill_value
def calc_params(self, ishape, bbox):
start_x, start_y, end_x, end_y = bbox[0], bbox[1], bbox[2], bbox[3]
target_h, target_w = self.target_size
# Calculate original bounding box center
center_x = (start_x + end_x) / 2.0
center_y = (start_y + end_y) / 2.0
# Scale the bounding box by the padding_scale
bbox_w = end_x - start_x
bbox_h = end_y - start_y
scaled_w = bbox_w * self.padding_scale
scaled_h = bbox_h * self.padding_scale
# Calculate the aspect ratios
bbox_aspect = scaled_w / scaled_h
target_aspect = target_w / target_h
# Adjust the scaled bounding box to match the target aspect ratio
if bbox_aspect > target_aspect:
adjusted_h = scaled_w / target_aspect
adjusted_w = scaled_w
else:
adjusted_w = scaled_h * target_aspect
adjusted_h = scaled_h
# Calculate scaled bounding box coordinates
bbox_w = adjusted_w
bbox_h = adjusted_h
new_start_x = center_x - bbox_w / 2.0
new_start_y = center_y - bbox_h / 2.0
new_end_x = center_x + bbox_w / 2.0
new_end_y = center_y + bbox_h / 2.0
# Round the box coordinates
start_x = int(math.floor(new_start_x))
start_y = int(math.floor(new_start_y))
end_x = int(math.ceil(new_end_x))
end_y = int(math.ceil(new_end_y))
# Define the new box coordinates
new_start_x = max(0, start_x)
new_start_y = max(0, start_y)
new_end_x = min(ishape[1] - 1, end_x)
new_end_y = min(ishape[0] - 1, end_y)
new_box = [new_start_x, new_start_y, new_end_x, new_end_y]
# Calculate resized crop size
bbox_w = new_box[2] - new_box[0]
bbox_h = new_box[3] - new_box[1]
scale = min(target_w / bbox_w, target_h / bbox_h)
new_w = round(bbox_w * scale)
new_h = round(bbox_h * scale)
# Calculate paddings
pad_w = target_w - new_w
pad_h = target_h - new_h
pad_left, pad_right, pad_top, pad_bottom = 0, 0, 0, 0
if pad_w > 0:
if start_x < 0:
pad_left = pad_w
pad_right = 0
elif end_x > ishape[1]:
pad_left = 0
pad_right = pad_w
else:
# Can be caused by bbox rounding
pad_left = pad_w // 2
pad_right = pad_w - pad_left
if pad_h > 0:
if start_y < 0:
pad_top = pad_h
pad_bottom = 0
elif end_y > ishape[0]:
pad_top = 0
pad_bottom = pad_h
else:
# Can be caused by bbox rounding
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
paddings = (pad_left, pad_right, pad_top, pad_bottom)
return paddings, scale, new_box, (new_w, new_h)
def crop_resize_box(self, image, bbox):
paddings, _, new_box, new_size = self.calc_params(image.shape, bbox)
# Extract the bounding box
cropped_img = image[new_box[1] : new_box[3], new_box[0] : new_box[2]]
# Resize the image
new_w, new_h = new_size
resized_img = cv2.resize(
cropped_img,
(new_w, new_h),
interpolation=cv2.INTER_NEAREST,
)
# Optionally pad the image
pad_left, pad_right, pad_top, pad_bottom = paddings
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
final_img = resized_img
else:
final_img = cv2.copyMakeBorder(
resized_img,
pad_top,
pad_bottom,
pad_left,
pad_right,
borderType=cv2.BORDER_CONSTANT,
value=[self.fill_value, self.fill_value, self.fill_value],
)
return final_img
# ==================================================================================================
class RTMDet(BaseModel):
def __init__(
self,
model_path: str,
conf_threshold: float,
min_area_fraction: float,
warmup: int = 30,
):
super(RTMDet, self).__init__(model_path, warmup)
self.target_size = (320, 320)
self.conf_threshold = conf_threshold
self.letterbox = LetterBox(self.target_size, fill_value=114)
img_area = self.target_size[0] * self.target_size[1]
self.min_area = img_area * min_area_fraction
def preprocess(self, image: np.ndarray):
image = self.letterbox.resize_image(image)
tensor = np.asarray(image).astype(self.input_types[0], copy=False)
tensor = np.expand_dims(tensor, axis=0)
tensor = [tensor]
return tensor
def postprocess(self, result: List[np.ndarray], image: np.ndarray):
boxes = np.squeeze(result[0], axis=0)
classes = np.squeeze(result[1], axis=0)
human_class = classes[:] == 0
boxes = boxes[human_class]
keep = boxes[:, 4] > self.conf_threshold
boxes = boxes[keep]
if len(boxes) == 0:
return np.array([])
# Drop boxes with too small area
boxes = boxes.astype(np.float32)
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
keep = areas >= self.min_area
boxes = boxes[keep]
if len(boxes) == 0:
return np.array([])
paddings, scale, _ = self.letterbox.calc_params(image.shape)
boxes[:, 0] -= paddings[0]
boxes[:, 2] -= paddings[0]
boxes[:, 1] -= paddings[2]
boxes[:, 3] -= paddings[2]
boxes = np.maximum(boxes, 0)
th, tw = self.target_size
pad_w = paddings[0] + paddings[1]
pad_h = paddings[2] + paddings[3]
max_w = tw - pad_w - 1
max_h = th - pad_h - 1
boxes[:, 0] = np.minimum(boxes[:, 0], max_w)
boxes[:, 1] = np.minimum(boxes[:, 1], max_h)
boxes[:, 2] = np.minimum(boxes[:, 2], max_w)
boxes[:, 3] = np.minimum(boxes[:, 3], max_h)
boxes[:, 0:4] /= scale
return boxes
# ==================================================================================================
class RTMPose(BaseModel):
def __init__(self, model_path: str, warmup: int = 30):
super(RTMPose, self).__init__(model_path, warmup)
self.target_size = (384, 288)
self.boxcrop = BoxCrop(self.target_size, padding_scale=1.25, fill_value=0)
def preprocess(self, image: np.ndarray, bboxes: np.ndarray):
cutouts = []
for i in range(len(bboxes)):
bbox = np.asarray(bboxes[i])[0:4]
bbox += np.array([-0.5, -0.5, 0.5 - 1e-8, 0.5 - 1e-8])
bbox = bbox.round().astype(np.int32)
region = self.boxcrop.crop_resize_box(image, bbox)
tensor = np.asarray(region).astype(self.input_types[0], copy=False)
cutouts.append(tensor)
if len(bboxes) == 1:
cutouts = np.expand_dims(cutouts[0], axis=0)
else:
cutouts = np.stack(cutouts, axis=0)
tensor = [cutouts]
return tensor
def postprocess(
self, result: List[np.ndarray], image: np.ndarray, bboxes: np.ndarray
):
kpts = []
for i in range(len(bboxes)):
scores = np.clip(result[1][i], 0, 1)
kp = np.concatenate(
[result[0][i], np.expand_dims(scores, axis=-1)], axis=-1
)
paddings, scale, bbox, _ = self.boxcrop.calc_params(image.shape, bboxes[i])
kp[:, 0] -= paddings[0]
kp[:, 1] -= paddings[2]
kp[:, 0:2] /= scale
kp[:, 0] += bbox[0]
kp[:, 1] += bbox[1]
kp[:, 0:2] = np.maximum(kp[:, 0:2], 0)
max_w = image.shape[1] - 1
max_h = image.shape[0] - 1
kp[:, 0] = np.minimum(kp[:, 0], max_w)
kp[:, 1] = np.minimum(kp[:, 1], max_h)
kpts.append(kp)
return kpts
# ==================================================================================================
class TopDown:
def __init__(
self,
det_model_path: str,
pose_model_path: str,
box_conf_threshold: float,
box_min_area: float,
warmup: int = 30,
):
self.batch_poses = bool("Bx" in pose_model_path)
self.det_model = RTMDet(
det_model_path, box_conf_threshold, box_min_area, warmup
)
self.pose_model = RTMPose(pose_model_path, warmup)
def predict(self, image):
boxes = self.det_model(image=image)
if len(boxes) == 0:
return []
results = []
if self.batch_poses:
results = self.pose_model(image=image, bboxes=boxes)
else:
for i in range(boxes.shape[0]):
kp = self.pose_model(image=image, bboxes=[boxes[i]])
results.append(kp[0])
return results
# ==================================================================================================
def load_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
print("Loading 2D model ...")
model = TopDown(
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_{'B' if batch_poses else '1'}x384x288x3_fp16_extra-steps.onnx",
box_conf_threshold=min_bbox_score,
box_min_area=min_bbox_area,
warmup=30,
) )
print("Loaded mmpose model") print("Loaded 2D model")
return model return model
def load_wb_model(): def load_wb_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
print("Loading mmpose whole body model ...") print("Loading 2D-WB model ...")
model = MMPoseInferencer( # The FP16 pose model is much worse than the FP32 for whole-body keypoints
pose2d="/mmpose/projects/rtmpose/rtmpose/wholebody_2d_keypoint/rtmpose-l_8xb32-270e_coco-wholebody-384x288.py", model = TopDown(
pose2d_weights="https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-l_simcc-coco-wholebody_pt-aic-coco_270e-384x288-eaeb96c8_20230125.pth", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
det_model="/mmpose/projects/rtmpose/rtmdet/person/rtmdet_nano_320-8xb32_coco-person.py", f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_{'B' if batch_poses else '1'}x384x288x3_extra-steps.onnx",
det_weights="https://download.openmmlab.com/mmpose/v1/projects/rtmpose/rtmdet_nano_8xb32-100e_coco-obj365-person-05d8511e.pth", box_conf_threshold=min_bbox_score,
det_cat_ids=[0], box_min_area=min_bbox_area,
warmup=30,
) )
print("Loaded mmpose model") print("Loaded 2D-WB model")
return model return model
@ -44,28 +500,16 @@ def load_wb_model():
def get_2d_pose(model, imgs, num_joints=17): def get_2d_pose(model, imgs, num_joints=17):
"""See: https://mmpose.readthedocs.io/en/latest/user_guides/inference.html#basic-usage"""
result_generator = model(imgs, show=False)
new_poses = [] new_poses = []
for _ in range(len(imgs)): for i in range(len(imgs)):
result = next(result_generator) img = imgs[i]
dets = model.predict(img)
poses = [] if len(dets) == 0:
for i in range(len(result["predictions"][0])): poses = np.zeros([1, num_joints, 3], dtype=float)
kpts = result["predictions"][0][i]["keypoints"] else:
scores = result["predictions"][0][i]["keypoint_scores"] poses = np.asarray(dets, dtype=float)
kpts = np.array(kpts)
scores = np.array(scores).reshape(-1, 1)
scores = np.clip(scores, 0, 1)
pose = np.concatenate((kpts, scores), axis=-1)
poses.append(pose)
if len(poses) == 0:
poses.append(np.zeros([num_joints, 3]))
poses = np.array(poses)
new_poses.append(poses) new_poses.append(poses)
return new_poses return new_poses

View File

@ -1,5 +1,5 @@
# Standard compile options for the C++ executable # Standard compile options for the C++ executable
FLAGS = -fPIC -O3 -march=native -Wall -Werror -flto -fopenmp -fopenmp-simd FLAGS = -fPIC -O3 -march=native -Wall -Werror -flto=auto -fopenmp -fopenmp-simd
# The Python interface through SWIG # The Python interface through SWIG
PYTHON_VERSION = $(shell python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}");') PYTHON_VERSION = $(shell python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}");')

View File

@ -60,7 +60,7 @@ def main():
cameras = rpt.convert_cameras(cams) cameras = rpt.convert_cameras(cams)
# Run triangulation # Run triangulation
triangulator = rpt.Triangulator(min_score=0.95) triangulator = rpt.Triangulator(min_match_score=0.95)
stime = time.time() stime = time.time()
poses_3d = triangulator.triangulate_poses( poses_3d = triangulator.triangulate_poses(
poses_2d, cameras, roomparams, joint_names poses_2d, cameras, roomparams, joint_names