Files
RapidPoseTriangulation/scripts/utils_2d_pose_ort.py
2024-12-19 11:12:13 +01:00

515 lines
16 KiB
Python

import math
import os
from abc import ABC, abstractmethod
from typing import List
import cv2
import numpy as np
import onnxruntime as ort
from tqdm import tqdm
# ==================================================================================================
class BaseModel(ABC):
def __init__(self, model_path: str, warmup: int):
self.model_path = model_path
self.runtime = ""
if not os.path.exists(model_path):
raise FileNotFoundError("File not found:", model_path)
if model_path.endswith(".onnx"):
self.init_onnxruntime(model_path)
self.runtime = "ort"
else:
raise ValueError("Unsupported model format:", model_path)
if warmup > 0:
print("Running warmup for '{}' ...".format(self.__class__.__name__))
self.warmup(warmup // 2)
self.warmup(warmup // 2)
def init_onnxruntime(self, model_path):
usetrt = True
usegpu = True
self.opt = ort.SessionOptions()
providers = ort.get_available_providers()
# ort.set_default_logger_severity(1)
self.providers = []
if usetrt and "TensorrtExecutionProvider" in providers:
self.providers.append(
(
"TensorrtExecutionProvider",
{
"trt_engine_cache_enable": True,
"trt_engine_cache_path": "/RapidPoseTriangulation/data/trt_cache/",
},
)
)
if usegpu and "CUDAExecutionProvider" in providers:
self.providers.append("CUDAExecutionProvider")
self.providers.append("CPUExecutionProvider")
print("Using providers:", self.providers)
self.session = ort.InferenceSession(
model_path, providers=self.providers, sess_options=self.opt
)
self.input_names = [input.name for input in self.session.get_inputs()]
self.input_shapes = [input.shape for input in self.session.get_inputs()]
input_types = [input.type for input in self.session.get_inputs()]
self.input_types = []
for i in range(len(input_types)):
input_type = input_types[i]
if input_type == "tensor(float32)":
itype = np.float32
elif input_type == "tensor(float16)":
itype = np.float16
elif input_type == "tensor(int32)":
itype = np.int32
elif input_type == "tensor(uint8)":
itype = np.uint8
else:
raise ValueError("Undefined input type:", input_type)
self.input_types.append(itype)
@abstractmethod
def preprocess(self, **kwargs):
pass
@abstractmethod
def postprocess(self, **kwargs):
pass
def warmup(self, epoch: int):
np.random.seed(42)
for _ in tqdm(range(epoch)):
inputs = {}
for i in range(len(self.input_names)):
iname = self.input_names[i]
if "image" in iname:
ishape = list(self.input_shapes[i])
if "batch_size" in ishape:
max_batch_size = 10
ishape[0] = np.random.choice(list(range(1, max_batch_size + 1)))
tensor = np.random.random(ishape)
tensor = tensor * 255
else:
raise ValueError("Undefined input type:", iname)
tensor = tensor.astype(self.input_types[i])
inputs[iname] = tensor
self.call_model_ort(list(inputs.values()))
def call_model_ort(self, tensor):
inputs = {}
for i in range(len(self.input_names)):
iname = self.input_names[i]
inputs[iname] = tensor[i]
result = self.session.run(None, inputs)
return result
def __call__(self, **kwargs):
tensor = self.preprocess(**kwargs)
result = self.call_model_ort(tensor)
output = self.postprocess(result=result, **kwargs)
return output
# ==================================================================================================
class LetterBox:
def __init__(self, target_size, fill_value=0):
self.target_size = target_size
self.fill_value = fill_value
def calc_params(self, ishape):
img_h, img_w = ishape[:2]
target_h, target_w = self.target_size
scale = min(target_w / img_w, target_h / img_h)
new_w = round(img_w * scale)
new_h = round(img_h * scale)
pad_w = target_w - new_w
pad_h = target_h - new_h
pad_left = pad_w // 2
pad_top = pad_h // 2
pad_right = pad_w - pad_left
pad_bottom = pad_h - pad_top
paddings = (pad_left, pad_right, pad_top, pad_bottom)
return paddings, scale, (new_w, new_h)
def resize_image(self, image):
paddings, _, new_size = self.calc_params(image.shape)
# Resize the image
new_w, new_h = new_size
resized_img = cv2.resize(
image,
(new_w, new_h),
interpolation=cv2.INTER_NEAREST,
)
# Optionally pad the image
pad_left, pad_right, pad_top, pad_bottom = paddings
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
final_img = resized_img
else:
final_img = cv2.copyMakeBorder(
resized_img,
pad_top,
pad_bottom,
pad_left,
pad_right,
borderType=cv2.BORDER_CONSTANT,
value=[self.fill_value, self.fill_value, self.fill_value],
)
return final_img
# ==================================================================================================
class BoxCrop:
def __init__(self, target_size, padding_scale=1.0, fill_value=0):
self.target_size = target_size
self.padding_scale = padding_scale
self.fill_value = fill_value
def calc_params(self, ishape, bbox):
start_x, start_y, end_x, end_y = bbox[0], bbox[1], bbox[2], bbox[3]
target_h, target_w = self.target_size
# Calculate original bounding box center
center_x = (start_x + end_x) / 2.0
center_y = (start_y + end_y) / 2.0
# Scale the bounding box by the padding_scale
bbox_w = end_x - start_x
bbox_h = end_y - start_y
scaled_w = bbox_w * self.padding_scale
scaled_h = bbox_h * self.padding_scale
# Calculate the aspect ratios
bbox_aspect = scaled_w / scaled_h
target_aspect = target_w / target_h
# Adjust the scaled bounding box to match the target aspect ratio
if bbox_aspect > target_aspect:
adjusted_h = scaled_w / target_aspect
adjusted_w = scaled_w
else:
adjusted_w = scaled_h * target_aspect
adjusted_h = scaled_h
# Calculate scaled bounding box coordinates
bbox_w = adjusted_w
bbox_h = adjusted_h
new_start_x = center_x - bbox_w / 2.0
new_start_y = center_y - bbox_h / 2.0
new_end_x = center_x + bbox_w / 2.0
new_end_y = center_y + bbox_h / 2.0
# Round the box coordinates
start_x = int(math.floor(new_start_x))
start_y = int(math.floor(new_start_y))
end_x = int(math.ceil(new_end_x))
end_y = int(math.ceil(new_end_y))
# Define the new box coordinates
new_start_x = max(0, start_x)
new_start_y = max(0, start_y)
new_end_x = min(ishape[1] - 1, end_x)
new_end_y = min(ishape[0] - 1, end_y)
new_box = [new_start_x, new_start_y, new_end_x, new_end_y]
# Calculate resized crop size
bbox_w = new_box[2] - new_box[0]
bbox_h = new_box[3] - new_box[1]
scale = min(target_w / bbox_w, target_h / bbox_h)
new_w = round(bbox_w * scale)
new_h = round(bbox_h * scale)
# Calculate paddings
pad_w = target_w - new_w
pad_h = target_h - new_h
pad_left, pad_right, pad_top, pad_bottom = 0, 0, 0, 0
if pad_w > 0:
if start_x < 0:
pad_left = pad_w
pad_right = 0
elif end_x > ishape[1]:
pad_left = 0
pad_right = pad_w
else:
# Can be caused by bbox rounding
pad_left = pad_w // 2
pad_right = pad_w - pad_left
if pad_h > 0:
if start_y < 0:
pad_top = pad_h
pad_bottom = 0
elif end_y > ishape[0]:
pad_top = 0
pad_bottom = pad_h
else:
# Can be caused by bbox rounding
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
paddings = (pad_left, pad_right, pad_top, pad_bottom)
return paddings, scale, new_box, (new_w, new_h)
def crop_resize_box(self, image, bbox):
paddings, _, new_box, new_size = self.calc_params(image.shape, bbox)
# Extract the bounding box
cropped_img = image[new_box[1] : new_box[3], new_box[0] : new_box[2]]
# Resize the image
new_w, new_h = new_size
resized_img = cv2.resize(
cropped_img,
(new_w, new_h),
interpolation=cv2.INTER_NEAREST,
)
# Optionally pad the image
pad_left, pad_right, pad_top, pad_bottom = paddings
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
final_img = resized_img
else:
final_img = cv2.copyMakeBorder(
resized_img,
pad_top,
pad_bottom,
pad_left,
pad_right,
borderType=cv2.BORDER_CONSTANT,
value=[self.fill_value, self.fill_value, self.fill_value],
)
return final_img
# ==================================================================================================
class RTMDet(BaseModel):
def __init__(
self,
model_path: str,
conf_threshold: float,
min_area_fraction: float,
warmup: int = 30,
):
super(RTMDet, self).__init__(model_path, warmup)
self.target_size = (320, 320)
self.conf_threshold = conf_threshold
self.letterbox = LetterBox(self.target_size, fill_value=114)
img_area = self.target_size[0] * self.target_size[1]
self.min_area = img_area * min_area_fraction
def preprocess(self, image: np.ndarray):
image = self.letterbox.resize_image(image)
tensor = np.asarray(image).astype(self.input_types[0], copy=False)
tensor = np.expand_dims(tensor, axis=0)
tensor = [tensor]
return tensor
def postprocess(self, result: List[np.ndarray], image: np.ndarray):
boxes = np.squeeze(result[0], axis=0)
classes = np.squeeze(result[1], axis=0)
human_class = classes[:] == 0
boxes = boxes[human_class]
keep = boxes[:, 4] > self.conf_threshold
boxes = boxes[keep]
if len(boxes) == 0:
return np.array([])
# Drop boxes with too small area
boxes = boxes.astype(np.float32)
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
keep = areas >= self.min_area
boxes = boxes[keep]
if len(boxes) == 0:
return np.array([])
paddings, scale, _ = self.letterbox.calc_params(image.shape)
boxes[:, 0] -= paddings[0]
boxes[:, 2] -= paddings[0]
boxes[:, 1] -= paddings[2]
boxes[:, 3] -= paddings[2]
boxes = np.maximum(boxes, 0)
th, tw = self.target_size
pad_w = paddings[0] + paddings[1]
pad_h = paddings[2] + paddings[3]
max_w = tw - pad_w - 1
max_h = th - pad_h - 1
boxes[:, 0] = np.minimum(boxes[:, 0], max_w)
boxes[:, 1] = np.minimum(boxes[:, 1], max_h)
boxes[:, 2] = np.minimum(boxes[:, 2], max_w)
boxes[:, 3] = np.minimum(boxes[:, 3], max_h)
boxes[:, 0:4] /= scale
return boxes
# ==================================================================================================
class RTMPose(BaseModel):
def __init__(self, model_path: str, warmup: int = 30):
super(RTMPose, self).__init__(model_path, warmup)
self.target_size = (384, 288)
self.boxcrop = BoxCrop(self.target_size, padding_scale=1.25, fill_value=0)
def preprocess(self, image: np.ndarray, bboxes: np.ndarray):
cutouts = []
for i in range(len(bboxes)):
bbox = np.asarray(bboxes[i])[0:4]
bbox += np.array([-0.5, -0.5, 0.5 - 1e-8, 0.5 - 1e-8])
bbox = bbox.round().astype(np.int32)
region = self.boxcrop.crop_resize_box(image, bbox)
tensor = np.asarray(region).astype(self.input_types[0], copy=False)
cutouts.append(tensor)
if len(bboxes) == 1:
cutouts = np.expand_dims(cutouts[0], axis=0)
else:
cutouts = np.stack(cutouts, axis=0)
tensor = [cutouts]
return tensor
def postprocess(
self, result: List[np.ndarray], image: np.ndarray, bboxes: np.ndarray
):
kpts = []
for i in range(len(bboxes)):
scores = np.clip(result[1][i], 0, 1)
kp = np.concatenate(
[result[0][i], np.expand_dims(scores, axis=-1)], axis=-1
)
paddings, scale, bbox, _ = self.boxcrop.calc_params(image.shape, bboxes[i])
kp[:, 0] -= paddings[0]
kp[:, 1] -= paddings[2]
kp[:, 0:2] /= scale
kp[:, 0] += bbox[0]
kp[:, 1] += bbox[1]
kp[:, 0:2] = np.maximum(kp[:, 0:2], 0)
max_w = image.shape[1] - 1
max_h = image.shape[0] - 1
kp[:, 0] = np.minimum(kp[:, 0], max_w)
kp[:, 1] = np.minimum(kp[:, 1], max_h)
kpts.append(kp)
return kpts
# ==================================================================================================
class TopDown:
def __init__(
self,
det_model_path: str,
pose_model_path: str,
box_conf_threshold: float,
box_min_area: float,
warmup: int = 30,
):
self.batch_poses = bool("Bx" in pose_model_path)
self.det_model = RTMDet(
det_model_path, box_conf_threshold, box_min_area, warmup
)
self.pose_model = RTMPose(pose_model_path, warmup)
def predict(self, image):
boxes = self.det_model(image=image)
if len(boxes) == 0:
return []
results = []
if self.batch_poses:
results = self.pose_model(image=image, bboxes=boxes)
else:
for i in range(boxes.shape[0]):
kp = self.pose_model(image=image, bboxes=[boxes[i]])
results.append(kp[0])
return results
# ==================================================================================================
def load_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
print("Loading 2D model ...")
model = TopDown(
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_{'B' if batch_poses else '1'}x384x288x3_fp16_extra-steps.onnx",
box_conf_threshold=min_bbox_score,
box_min_area=min_bbox_area,
warmup=30,
)
print("Loaded 2D model")
return model
def load_wb_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
print("Loading 2D-WB model ...")
# The FP16 pose model is much worse than the FP32 for whole-body keypoints
model = TopDown(
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_{'B' if batch_poses else '1'}x384x288x3_extra-steps.onnx",
box_conf_threshold=min_bbox_score,
box_min_area=min_bbox_area,
warmup=30,
)
print("Loaded 2D-WB model")
return model
# ==================================================================================================
def get_2d_pose(model, imgs, num_joints=17):
new_poses = []
for i in range(len(imgs)):
img = imgs[i]
dets = model.predict(img)
if len(dets) == 0:
poses = np.zeros([1, num_joints, 3], dtype=float)
else:
poses = np.asarray(dets, dtype=float)
new_poses.append(poses)
return new_poses