Files
RapidPoseTriangulation/scripts/utils_2d_pose_ort.py

289 lines
8.9 KiB
Python

from abc import ABC, abstractmethod
from typing import List
import cv2
import numpy as np
import onnxruntime as ort
from tqdm import tqdm
# ==================================================================================================
class BaseModel(ABC):
def __init__(self, model_path: str, warmup: int):
self.opt = ort.SessionOptions()
providers = ort.get_available_providers()
# ort.set_default_logger_severity(1)
provider = ""
if "TensorrtExecutionProvider" in providers:
provider = "TensorrtExecutionProvider"
elif "CUDAExecutionProvider" in providers:
provider = "CUDAExecutionProvider"
else:
provider = "CPUExecutionProvider"
print("Found providers:", providers)
print("Using:", provider)
self.session = ort.InferenceSession(
model_path, providers=[provider], sess_options=self.opt
)
self.input_name = self.session.get_inputs()[0].name
self.input_shape = self.session.get_inputs()[0].shape
if "batch_size" in self.input_shape:
self.input_shape = [1, 500, 500, 3]
input_type = self.session.get_inputs()[0].type
if input_type == "tensor(float16)":
self.input_type = np.float16
elif input_type == "tensor(uint8)":
self.input_type = np.uint8
else:
self.input_type = np.float32
if warmup > 0:
self.warmup(warmup)
@abstractmethod
def preprocess(self, image: np.ndarray, *args, **kwargs):
pass
@abstractmethod
def postprocess(self, tensor: List[np.ndarray], *args, **kwargs):
pass
def warmup(self, epoch: int):
print("Running warmup for '{}' ...".format(self.__class__.__name__))
for _ in tqdm(range(epoch)):
tensor = np.random.random(self.input_shape).astype(self.input_type)
self.session.run(None, {self.input_name: tensor})
def __call__(self, image: np.ndarray, *args, **kwargs):
tensor = self.preprocess(image, *args, **kwargs)
result = self.session.run(None, {self.input_name: tensor})
output = self.postprocess(result, *args, **kwargs)
return output
# ==================================================================================================
class RTMDet(BaseModel):
def __init__(
self,
model_path: str,
conf_threshold: float,
warmup: int = 30,
):
super(RTMDet, self).__init__(model_path, warmup)
self.conf_threshold = conf_threshold
def preprocess(self, image: np.ndarray):
tensor = np.asarray(image).astype(self.input_type, copy=False)
tensor = np.expand_dims(tensor, axis=0)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
boxes = np.squeeze(tensor[1], axis=0)
classes = np.squeeze(tensor[0], axis=0)
human_class = classes[:] == 0
boxes = boxes[human_class]
keep = boxes[:, 4] > self.conf_threshold
boxes = boxes[keep]
return boxes
# ==================================================================================================
class RTMPose(BaseModel):
def __init__(self, model_path: str, warmup: int = 30):
super(RTMPose, self).__init__(model_path, warmup)
self.bbox = None
def region_of_interest_warped(
self,
image: np.ndarray,
box: np.ndarray,
target_size: List[int],
padding_scale: float = 1.25,
):
start_x, start_y, end_x, end_y = box[0:4]
target_w, target_h = target_size
# Calculate original bounding box width and height
bbox_w = end_x - start_x
bbox_h = end_y - start_y
if bbox_w <= 0 or bbox_h <= 0:
raise ValueError("Invalid bounding box!")
# Calculate the aspect ratios
bbox_aspect = bbox_w / bbox_h
target_aspect = target_w / target_h
# Adjust the scaled bounding box to match the target aspect ratio
if bbox_aspect > target_aspect:
adjusted_h = bbox_w / target_aspect
adjusted_w = bbox_w
else:
adjusted_w = bbox_h * target_aspect
adjusted_h = bbox_h
# Scale the bounding box by the padding_scale
scaled_bbox_w = adjusted_w * padding_scale
scaled_bbox_h = adjusted_h * padding_scale
# Calculate the center of the original box
center_x = (start_x + end_x) / 2.0
center_y = (start_y + end_y) / 2.0
# Calculate scaled bounding box coordinates
new_start_x = center_x - scaled_bbox_w / 2.0
new_start_y = center_y - scaled_bbox_h / 2.0
new_end_x = center_x + scaled_bbox_w / 2.0
new_end_y = center_y + scaled_bbox_h / 2.0
# Define the new box coordinates
new_box = np.array(
[new_start_x, new_start_y, new_end_x, new_end_y], dtype=np.float32
)
scale = target_w / scaled_bbox_w
# Define source and destination points for affine transformation
# See: /mmpose/structures/bbox/transforms.py
src_pts = np.array(
[
[center_x, center_y],
[new_start_x, center_y],
[new_start_x, center_y + (center_x - new_start_x)],
],
dtype=np.float32,
)
dst_pts = np.array(
[
[target_w * 0.5, target_h * 0.5],
[0, target_h * 0.5],
[0, target_h * 0.5 + (target_w * 0.5 - 0)],
],
dtype=np.float32,
)
# Compute the affine transformation matrix
M = cv2.getAffineTransform(src_pts, dst_pts)
# Apply affine transformation with border filling
extracted_region = cv2.warpAffine(
image,
M,
target_size,
flags=cv2.INTER_LINEAR,
)
return extracted_region, new_box, scale
def preprocess(self, image: np.ndarray, bbox: np.ndarray):
th, tw = self.input_shape[1:3]
region, self.bbox, _ = self.region_of_interest_warped(image, bbox, (tw, th))
tensor = np.asarray(region).astype(self.input_type, copy=False)
tensor = np.expand_dims(tensor, axis=0)
return tensor
def postprocess(self, tensor: List[np.ndarray], **kwargs):
scores = np.clip(tensor[1][0], 0, 1)
kp = np.concatenate([tensor[0][0], np.expand_dims(scores, axis=-1)], axis=-1)
# See: /mmpose/models/pose_estimators/topdown.py - add_pred_to_datasample()
th, tw = self.input_shape[1:3]
bw, bh = [self.bbox[2] - self.bbox[0], self.bbox[3] - self.bbox[1]]
kp[:, :2] /= np.array([tw, th])
kp[:, :2] *= np.array([bw, bh])
kp[:, :2] += np.array([self.bbox[0] + bw / 2, self.bbox[1] + bh / 2])
kp[:, :2] -= 0.5 * np.array([bw, bh])
return kp
# ==================================================================================================
class TopDown:
def __init__(
self,
det_model_path,
pose_model_path,
box_conf_threshold=0.6,
warmup=30,
):
if (not det_model_path.endswith(".onnx")) or (
not pose_model_path.endswith(".onnx")
):
raise ValueError("Only ONNX models are supported.")
self.det_model = RTMDet(det_model_path, box_conf_threshold, warmup)
self.pose_model = RTMPose(pose_model_path, warmup)
def predict(self, image):
boxes = self.det_model(image)
results = []
for i in range(boxes.shape[0]):
kp = self.pose_model(image, bbox=boxes[i])
results.append(kp)
return results
# ==================================================================================================
def load_model():
print("Loading onnx model ...")
model = TopDown(
# "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_extra-steps.onnx",
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx",
# "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_extra-steps.onnx",
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx",
box_conf_threshold=0.3,
warmup=30,
)
print("Loaded onnx model")
return model
def load_wb_model():
print("Loading mmpose whole body model ...")
model = None
print("Loaded mmpose model")
return model
# ==================================================================================================
def get_2d_pose(model, imgs, num_joints=17):
new_poses = []
for i in range(len(imgs)):
img = imgs[i]
poses = []
dets = model.predict(img)
for pose in dets:
pose = np.asarray(pose)
poses.append(pose)
if len(poses) == 0:
poses.append(np.zeros([num_joints, 3]))
poses = np.array(poses)
new_poses.append(poses)
return new_poses