518 lines
16 KiB
Python
518 lines
16 KiB
Python
import math
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
from typing import List
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import onnxruntime as ort
|
|
from tqdm import tqdm
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class BaseModel(ABC):
|
|
def __init__(self, model_path: str, warmup: int):
|
|
self.model_path = model_path
|
|
self.runtime = ""
|
|
|
|
if not os.path.exists(model_path):
|
|
raise FileNotFoundError("File not found:", model_path)
|
|
|
|
if model_path.endswith(".onnx"):
|
|
self.init_onnxruntime(model_path)
|
|
self.runtime = "ort"
|
|
else:
|
|
raise ValueError("Unsupported model format:", model_path)
|
|
|
|
if warmup > 0:
|
|
print("Running warmup for '{}' ...".format(self.__class__.__name__))
|
|
self.warmup(warmup // 2)
|
|
self.warmup(warmup // 2)
|
|
|
|
def init_onnxruntime(self, model_path):
|
|
usetrt = True
|
|
usegpu = True
|
|
|
|
self.opt = ort.SessionOptions()
|
|
providers = ort.get_available_providers()
|
|
# ort.set_default_logger_severity(1)
|
|
|
|
self.providers = []
|
|
if usetrt and "TensorrtExecutionProvider" in providers:
|
|
self.providers.append(
|
|
(
|
|
"TensorrtExecutionProvider",
|
|
{
|
|
"trt_engine_cache_enable": True,
|
|
"trt_engine_cache_path": "/RapidPoseTriangulation/data/trt_cache/",
|
|
},
|
|
)
|
|
)
|
|
elif usegpu and "CUDAExecutionProvider" in providers:
|
|
self.providers.append("CUDAExecutionProvider")
|
|
else:
|
|
self.providers.append("CPUExecutionProvider")
|
|
print("Using providers:", self.providers)
|
|
|
|
self.session = ort.InferenceSession(
|
|
model_path, providers=self.providers, sess_options=self.opt
|
|
)
|
|
|
|
self.input_names = [input.name for input in self.session.get_inputs()]
|
|
self.input_shapes = [input.shape for input in self.session.get_inputs()]
|
|
|
|
input_types = [input.type for input in self.session.get_inputs()]
|
|
self.input_types = []
|
|
for i in range(len(input_types)):
|
|
input_type = input_types[i]
|
|
if input_type == "tensor(float32)":
|
|
itype = np.float32
|
|
elif input_type == "tensor(float16)":
|
|
itype = np.float16
|
|
elif input_type == "tensor(int32)":
|
|
itype = np.int32
|
|
elif input_type == "tensor(uint8)":
|
|
itype = np.uint8
|
|
else:
|
|
raise ValueError("Undefined input type:", input_type)
|
|
self.input_types.append(itype)
|
|
|
|
@abstractmethod
|
|
def preprocess(self, **kwargs):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def postprocess(self, **kwargs):
|
|
pass
|
|
|
|
def warmup(self, epoch: int):
|
|
np.random.seed(42)
|
|
|
|
for _ in tqdm(range(epoch)):
|
|
inputs = {}
|
|
for i in range(len(self.input_names)):
|
|
iname = self.input_names[i]
|
|
|
|
if "image" in iname:
|
|
ishape = list(self.input_shapes[i])
|
|
if "batch_size" in ishape:
|
|
max_batch_size = 10
|
|
ishape[0] = np.random.choice(list(range(1, max_batch_size + 1)))
|
|
tensor = np.random.random(ishape)
|
|
tensor = tensor * 255
|
|
else:
|
|
raise ValueError("Undefined input type:", iname)
|
|
|
|
tensor = tensor.astype(self.input_types[i])
|
|
inputs[iname] = tensor
|
|
|
|
self.call_model_ort(list(inputs.values()))
|
|
|
|
def call_model_ort(self, tensor):
|
|
inputs = {}
|
|
for i in range(len(self.input_names)):
|
|
iname = self.input_names[i]
|
|
inputs[iname] = tensor[i]
|
|
result = self.session.run(None, inputs)
|
|
return result
|
|
|
|
def __call__(self, **kwargs):
|
|
tensor = self.preprocess(**kwargs)
|
|
result = self.call_model_ort(tensor)
|
|
output = self.postprocess(result=result, **kwargs)
|
|
return output
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class LetterBox:
|
|
def __init__(self, target_size, fill_value=0):
|
|
self.target_size = target_size
|
|
self.fill_value = fill_value
|
|
|
|
def calc_params(self, ishape):
|
|
img_h, img_w = ishape[:2]
|
|
target_h, target_w = self.target_size
|
|
|
|
scale = min(target_w / img_w, target_h / img_h)
|
|
new_w = round(img_w * scale)
|
|
new_h = round(img_h * scale)
|
|
|
|
pad_w = target_w - new_w
|
|
pad_h = target_h - new_h
|
|
pad_left = pad_w // 2
|
|
pad_top = pad_h // 2
|
|
pad_right = pad_w - pad_left
|
|
pad_bottom = pad_h - pad_top
|
|
paddings = (pad_left, pad_right, pad_top, pad_bottom)
|
|
|
|
return paddings, scale, (new_w, new_h)
|
|
|
|
def resize_image(self, image):
|
|
paddings, _, new_size = self.calc_params(image.shape)
|
|
|
|
# Resize the image
|
|
new_w, new_h = new_size
|
|
resized_img = cv2.resize(
|
|
image,
|
|
(new_w, new_h),
|
|
interpolation=cv2.INTER_NEAREST,
|
|
)
|
|
|
|
# Optionally pad the image
|
|
pad_left, pad_right, pad_top, pad_bottom = paddings
|
|
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
|
|
final_img = resized_img
|
|
else:
|
|
final_img = cv2.copyMakeBorder(
|
|
resized_img,
|
|
pad_top,
|
|
pad_bottom,
|
|
pad_left,
|
|
pad_right,
|
|
borderType=cv2.BORDER_CONSTANT,
|
|
value=[self.fill_value, self.fill_value, self.fill_value],
|
|
)
|
|
|
|
return final_img
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class BoxCrop:
|
|
def __init__(self, target_size, padding_scale=1.0, fill_value=0):
|
|
self.target_size = target_size
|
|
self.padding_scale = padding_scale
|
|
self.fill_value = fill_value
|
|
|
|
def calc_params(self, ishape, bbox):
|
|
img_h, img_w = ishape[:2]
|
|
target_h, target_w = self.target_size
|
|
|
|
# Round the bounding box coordinates
|
|
start_x = math.floor(bbox[0])
|
|
start_y = math.floor(bbox[1])
|
|
end_x = math.ceil(bbox[2])
|
|
end_y = math.ceil(bbox[3])
|
|
|
|
# Calculate original bounding box center
|
|
center_x = (start_x + end_x) / 2.0
|
|
center_y = (start_y + end_y) / 2.0
|
|
|
|
# Scale the bounding box by the padding_scale
|
|
bbox_w = end_x - start_x
|
|
bbox_h = end_y - start_y
|
|
scaled_w = bbox_w * self.padding_scale
|
|
scaled_h = bbox_h * self.padding_scale
|
|
|
|
# Calculate the aspect ratios
|
|
bbox_aspect = scaled_w / scaled_h
|
|
target_aspect = target_w / target_h
|
|
|
|
# Adjust the scaled bounding box to match the target aspect ratio
|
|
if bbox_aspect > target_aspect:
|
|
adjusted_h = scaled_w / target_aspect
|
|
adjusted_w = scaled_w
|
|
else:
|
|
adjusted_w = scaled_h * target_aspect
|
|
adjusted_h = scaled_h
|
|
|
|
# Calculate scaled bounding box coordinates
|
|
bbox_w = adjusted_w
|
|
bbox_h = adjusted_h
|
|
new_start_x = center_x - bbox_w / 2.0
|
|
new_start_y = center_y - bbox_h / 2.0
|
|
new_end_x = center_x + bbox_w / 2.0
|
|
new_end_y = center_y + bbox_h / 2.0
|
|
|
|
# Round the box coordinates
|
|
start_x = int(math.floor(new_start_x))
|
|
start_y = int(math.floor(new_start_y))
|
|
end_x = int(math.ceil(new_end_x))
|
|
end_y = int(math.ceil(new_end_y))
|
|
|
|
# Define the new box coordinates
|
|
new_start_x = max(0, start_x)
|
|
new_start_y = max(0, start_y)
|
|
new_end_x = min(img_w - 1, end_x)
|
|
new_end_y = min(img_h - 1, end_y)
|
|
new_box = [new_start_x, new_start_y, new_end_x, new_end_y]
|
|
|
|
# Calculate resized crop size
|
|
bbox_w = new_box[2] - new_box[0]
|
|
bbox_h = new_box[3] - new_box[1]
|
|
scale = min(target_w / bbox_w, target_h / bbox_h)
|
|
new_w = round(bbox_w * scale)
|
|
new_h = round(bbox_h * scale)
|
|
|
|
# Calculate paddings
|
|
pad_w = target_w - new_w
|
|
pad_h = target_h - new_h
|
|
pad_left, pad_right, pad_top, pad_bottom = 0, 0, 0, 0
|
|
if pad_w > 0:
|
|
if start_x < 0:
|
|
pad_left = pad_w
|
|
pad_right = 0
|
|
elif end_x > ishape[1]:
|
|
pad_left = 0
|
|
pad_right = pad_w
|
|
else:
|
|
# Can be caused by bbox rounding
|
|
pad_left = pad_w // 2
|
|
pad_right = pad_w - pad_left
|
|
if pad_h > 0:
|
|
if start_y < 0:
|
|
pad_top = pad_h
|
|
pad_bottom = 0
|
|
elif end_y > ishape[0]:
|
|
pad_top = 0
|
|
pad_bottom = pad_h
|
|
else:
|
|
# Can be caused by bbox rounding
|
|
pad_top = pad_h // 2
|
|
pad_bottom = pad_h - pad_top
|
|
paddings = (pad_left, pad_right, pad_top, pad_bottom)
|
|
|
|
return paddings, scale, new_box, (new_w, new_h)
|
|
|
|
def crop_resize_box(self, image, bbox):
|
|
paddings, _, new_box, new_size = self.calc_params(image.shape, bbox)
|
|
|
|
# Extract the bounding box
|
|
cropped_img = image[new_box[1] : new_box[3], new_box[0] : new_box[2]]
|
|
|
|
# Resize the image
|
|
new_w, new_h = new_size
|
|
resized_img = cv2.resize(
|
|
cropped_img,
|
|
(new_w, new_h),
|
|
interpolation=cv2.INTER_NEAREST,
|
|
)
|
|
|
|
# Optionally pad the image
|
|
pad_left, pad_right, pad_top, pad_bottom = paddings
|
|
if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0:
|
|
final_img = resized_img
|
|
else:
|
|
final_img = cv2.copyMakeBorder(
|
|
resized_img,
|
|
pad_top,
|
|
pad_bottom,
|
|
pad_left,
|
|
pad_right,
|
|
borderType=cv2.BORDER_CONSTANT,
|
|
value=[self.fill_value, self.fill_value, self.fill_value],
|
|
)
|
|
|
|
return final_img
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class RTMDet(BaseModel):
|
|
def __init__(
|
|
self,
|
|
model_path: str,
|
|
conf_threshold: float,
|
|
min_area_fraction: float,
|
|
warmup: int = 30,
|
|
):
|
|
super(RTMDet, self).__init__(model_path, warmup)
|
|
self.target_size = (320, 320)
|
|
self.conf_threshold = conf_threshold
|
|
self.letterbox = LetterBox(self.target_size, fill_value=114)
|
|
|
|
img_area = self.target_size[0] * self.target_size[1]
|
|
self.min_area = img_area * min_area_fraction
|
|
|
|
def preprocess(self, image: np.ndarray):
|
|
image = self.letterbox.resize_image(image)
|
|
tensor = np.asarray(image).astype(self.input_types[0], copy=False)
|
|
tensor = np.expand_dims(tensor, axis=0)
|
|
tensor = [tensor]
|
|
return tensor
|
|
|
|
def postprocess(self, result: List[np.ndarray], image: np.ndarray):
|
|
boxes = np.squeeze(result[0], axis=0)
|
|
classes = np.squeeze(result[1], axis=0)
|
|
|
|
human_class = classes[:] == 0
|
|
boxes = boxes[human_class]
|
|
|
|
keep = boxes[:, 4] > self.conf_threshold
|
|
boxes = boxes[keep]
|
|
|
|
if len(boxes) == 0:
|
|
return np.array([])
|
|
|
|
# Drop boxes with too small area
|
|
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
|
|
keep = areas >= self.min_area
|
|
boxes = boxes[keep]
|
|
|
|
if len(boxes) == 0:
|
|
return np.array([])
|
|
|
|
paddings, scale, _ = self.letterbox.calc_params(image.shape)
|
|
|
|
boxes[:, 0] -= paddings[0]
|
|
boxes[:, 2] -= paddings[0]
|
|
boxes[:, 1] -= paddings[2]
|
|
boxes[:, 3] -= paddings[2]
|
|
|
|
boxes = np.maximum(boxes, 0)
|
|
th, tw = self.target_size
|
|
pad_w = paddings[0] + paddings[1]
|
|
pad_h = paddings[2] + paddings[3]
|
|
max_w = tw - pad_w - 1
|
|
max_h = th - pad_h - 1
|
|
boxes[:, 0] = np.minimum(boxes[:, 0], max_w)
|
|
boxes[:, 1] = np.minimum(boxes[:, 1], max_h)
|
|
boxes[:, 2] = np.minimum(boxes[:, 2], max_w)
|
|
boxes[:, 3] = np.minimum(boxes[:, 3], max_h)
|
|
|
|
boxes[:, 0:4] /= scale
|
|
return boxes
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class RTMPose(BaseModel):
|
|
def __init__(self, model_path: str, warmup: int = 30):
|
|
super(RTMPose, self).__init__(model_path, warmup)
|
|
self.target_size = (384, 288)
|
|
self.boxcrop = BoxCrop(self.target_size, padding_scale=1.25, fill_value=0)
|
|
|
|
def preprocess(self, image: np.ndarray, bboxes: np.ndarray):
|
|
cutouts = []
|
|
for i in range(len(bboxes)):
|
|
region = self.boxcrop.crop_resize_box(image, bboxes[i])
|
|
tensor = np.asarray(region).astype(self.input_types[0], copy=False)
|
|
cutouts.append(tensor)
|
|
|
|
if len(bboxes) == 1:
|
|
cutouts = np.expand_dims(cutouts[0], axis=0)
|
|
else:
|
|
cutouts = np.stack(cutouts, axis=0)
|
|
|
|
tensor = [cutouts]
|
|
return tensor
|
|
|
|
def postprocess(
|
|
self, result: List[np.ndarray], image: np.ndarray, bboxes: np.ndarray
|
|
):
|
|
kpts = []
|
|
for i in range(len(bboxes)):
|
|
scores = np.clip(result[1][i], 0, 1)
|
|
kp = np.concatenate(
|
|
[result[0][i], np.expand_dims(scores, axis=-1)], axis=-1
|
|
)
|
|
|
|
paddings, scale, bbox, _ = self.boxcrop.calc_params(image.shape, bboxes[i])
|
|
kp[:, 0] -= paddings[0]
|
|
kp[:, 1] -= paddings[2]
|
|
kp[:, 0:2] /= scale
|
|
kp[:, 0] += bbox[0]
|
|
kp[:, 1] += bbox[1]
|
|
kp[:, 0:2] = np.maximum(kp[:, 0:2], 0)
|
|
max_w = image.shape[1] - 1
|
|
max_h = image.shape[0] - 1
|
|
kp[:, 0] = np.minimum(kp[:, 0], max_w)
|
|
kp[:, 1] = np.minimum(kp[:, 1], max_h)
|
|
kpts.append(kp)
|
|
|
|
return kpts
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
class TopDown:
|
|
def __init__(
|
|
self,
|
|
det_model_path: str,
|
|
pose_model_path: str,
|
|
box_conf_threshold: float,
|
|
box_min_area: float,
|
|
warmup: int = 30,
|
|
):
|
|
self.batch_poses = bool("Bx" in pose_model_path)
|
|
|
|
self.det_model = RTMDet(
|
|
det_model_path, box_conf_threshold, box_min_area, warmup
|
|
)
|
|
self.pose_model = RTMPose(pose_model_path, warmup)
|
|
|
|
def predict(self, image):
|
|
boxes = self.det_model(image=image)
|
|
if len(boxes) == 0:
|
|
return []
|
|
|
|
results = []
|
|
if self.batch_poses:
|
|
results = self.pose_model(image=image, bboxes=boxes)
|
|
else:
|
|
for i in range(boxes.shape[0]):
|
|
kp = self.pose_model(image=image, bboxes=[boxes[i]])
|
|
results.append(kp[0])
|
|
|
|
return results
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
def load_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
|
|
print("Loading 2D model ...")
|
|
|
|
model = TopDown(
|
|
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
|
|
f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_{'B' if batch_poses else '1'}x384x288x3_fp16_extra-steps.onnx",
|
|
box_conf_threshold=min_bbox_score,
|
|
box_min_area=min_bbox_area,
|
|
warmup=30,
|
|
)
|
|
|
|
print("Loaded 2D model")
|
|
return model
|
|
|
|
|
|
def load_wb_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False):
|
|
print("Loading 2D-WB model ...")
|
|
|
|
# The FP16 pose model is much worse than the FP32 for whole-body keypoints
|
|
model = TopDown(
|
|
"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx",
|
|
f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_{'B' if batch_poses else '1'}x384x288x3_extra-steps.onnx",
|
|
box_conf_threshold=min_bbox_score,
|
|
box_min_area=min_bbox_area,
|
|
warmup=30,
|
|
)
|
|
|
|
print("Loaded 2D-WB model")
|
|
return model
|
|
|
|
|
|
# ==================================================================================================
|
|
|
|
|
|
def get_2d_pose(model, imgs, num_joints=17):
|
|
|
|
new_poses = []
|
|
for i in range(len(imgs)):
|
|
img = imgs[i]
|
|
dets = model.predict(img)
|
|
|
|
if len(dets) == 0:
|
|
poses = np.zeros([1, num_joints, 3], dtype=float)
|
|
else:
|
|
poses = np.asarray(dets, dtype=float)
|
|
new_poses.append(poses)
|
|
|
|
return new_poses
|