from abc import ABC, abstractmethod from typing import List import cv2 import numpy as np import onnxruntime as ort from tqdm import tqdm # ================================================================================================== class BaseModel(ABC): def __init__(self, model_path: str, warmup: int): self.opt = ort.SessionOptions() providers = ort.get_available_providers() # ort.set_default_logger_severity(1) provider = "" if "TensorrtExecutionProvider" in providers: provider = "TensorrtExecutionProvider" elif "CUDAExecutionProvider" in providers: provider = "CUDAExecutionProvider" else: provider = "CPUExecutionProvider" print("Found providers:", providers) print("Using:", provider) self.session = ort.InferenceSession( model_path, providers=[provider], sess_options=self.opt ) self.input_name = self.session.get_inputs()[0].name self.input_shape = self.session.get_inputs()[0].shape input_type = self.session.get_inputs()[0].type if input_type == "tensor(float16)": self.input_type = np.float16 elif input_type == "tensor(uint8)": self.input_type = np.uint8 else: self.input_type = np.float32 if warmup > 0: self.warmup(warmup) @abstractmethod def preprocess(self, image: np.ndarray, *args, **kwargs): pass @abstractmethod def postprocess(self, tensor: List[np.ndarray], *args, **kwargs): pass def warmup(self, epoch: int): print("Running warmup for '{}' ...".format(self.__class__.__name__)) for _ in tqdm(range(epoch)): tensor = np.random.random(self.input_shape).astype(self.input_type) self.session.run(None, {self.input_name: tensor}) def __call__(self, image: np.ndarray, *args, **kwargs): tensor = self.preprocess(image, *args, **kwargs) result = self.session.run(None, {self.input_name: tensor}) output = self.postprocess(result, *args, **kwargs) return output # ================================================================================================== class RTMDet(BaseModel): def __init__( self, model_path: str, conf_threshold: float, iou_threshold: float, warmup: int = 30, ): super(RTMDet, self).__init__(model_path, warmup) self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold self.dx = 0 self.dy = 0 self.scale = 0 def letterbox(self, img: np.ndarray, target_size: List[int], fill_value: int = 128): h, w = img.shape[:2] tw, th = target_size scale = min(tw / w, th / h) nw, nh = int(w * scale), int(h * scale) dx, dy = (tw - nw) // 2, (th - nh) // 2 canvas = np.full((th, tw, img.shape[2]), fill_value, dtype=img.dtype) canvas[dy : dy + nh, dx : dx + nw, :] = cv2.resize( img, (nw, nh), interpolation=cv2.INTER_LINEAR ) return canvas, dx, dy, scale def nms_optimized( self, boxes: np.ndarray, iou_threshold: float, conf_threshold: float ): """ Perform Non-Maximum Suppression (NMS) on bounding boxes for a single class. """ # Filter out boxes with low confidence scores scores = boxes[:, 4] keep = scores > conf_threshold boxes = boxes[keep] scores = scores[keep] if boxes.shape[0] == 0: return np.empty((0, 5), dtype=boxes.dtype) # Compute the area of the bounding boxes x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] areas = (x2 - x1 + 1) * (y2 - y1 + 1) # Sort the boxes by scores in descending order order = scores.argsort()[::-1] keep_indices = [] while order.size > 0: i = order[0] keep_indices.append(i) # Compute IoU of the current box with the rest xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) # Compute width and height of the overlapping area w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) # Compute the area of the intersection inter = w * h # Compute the IoU iou = inter / (areas[i] + areas[order[1:]] - inter) # Keep boxes with IoU less than the threshold inds = np.where(iou <= iou_threshold)[0] # Update the order array order = order[inds + 1] # Return the boxes that are kept return boxes[keep_indices] def preprocess(self, image: np.ndarray): th, tw = self.input_shape[1:3] image, self.dx, self.dy, self.scale = self.letterbox( image, (tw, th), fill_value=114 ) tensor = np.asarray(image).astype(self.input_type, copy=False)[..., ::-1] tensor = np.expand_dims(tensor, axis=0) return tensor def postprocess(self, tensor: List[np.ndarray]): boxes = np.squeeze(tensor[0], axis=0) classes = np.expand_dims(np.squeeze(tensor[1], axis=0), axis=-1) boxes = np.concatenate([boxes, classes], axis=-1) boxes = self.nms_optimized(boxes, self.iou_threshold, self.conf_threshold) if boxes.shape[0] == 0: return boxes human_class = boxes[..., -1] == 0 boxes = boxes[human_class][..., :4] boxes[:, 0] -= self.dx boxes[:, 2] -= self.dx boxes[:, 1] -= self.dy boxes[:, 3] -= self.dy boxes = np.clip(boxes, a_min=0, a_max=None) boxes[:, :4] /= self.scale return boxes # ================================================================================================== class RTMPose(BaseModel): def __init__(self, model_path: str, warmup: int = 30): super(RTMPose, self).__init__(model_path, warmup) self.bbox = None def region_of_interest_warped( self, image: np.ndarray, box: np.ndarray, target_size: List[int], padding_scale: float = 1.25, ): start_x, start_y, end_x, end_y = box target_w, target_h = target_size # Calculate original bounding box width and height bbox_w = end_x - start_x bbox_h = end_y - start_y if bbox_w <= 0 or bbox_h <= 0: raise ValueError("Invalid bounding box!") # Calculate the aspect ratios bbox_aspect = bbox_w / bbox_h target_aspect = target_w / target_h # Adjust the scaled bounding box to match the target aspect ratio if bbox_aspect > target_aspect: adjusted_h = bbox_w / target_aspect adjusted_w = bbox_w else: adjusted_w = bbox_h * target_aspect adjusted_h = bbox_h # Scale the bounding box by the padding_scale scaled_bbox_w = adjusted_w * padding_scale scaled_bbox_h = adjusted_h * padding_scale # Calculate the center of the original box center_x = (start_x + end_x) / 2.0 center_y = (start_y + end_y) / 2.0 # Calculate scaled bounding box coordinates new_start_x = center_x - scaled_bbox_w / 2.0 new_start_y = center_y - scaled_bbox_h / 2.0 new_end_x = center_x + scaled_bbox_w / 2.0 new_end_y = center_y + scaled_bbox_h / 2.0 # Define the new box coordinates new_box = np.array( [new_start_x, new_start_y, new_end_x, new_end_y], dtype=np.float32 ) scale = target_w / scaled_bbox_w # Define source and destination points for affine transformation # See: /mmpose/structures/bbox/transforms.py src_pts = np.array( [ [center_x, center_y], [new_start_x, center_y], [new_start_x, center_y + (center_x - new_start_x)], ], dtype=np.float32, ) dst_pts = np.array( [ [target_w * 0.5, target_h * 0.5], [0, target_h * 0.5], [0, target_h * 0.5 + (target_w * 0.5 - 0)], ], dtype=np.float32, ) # Compute the affine transformation matrix M = cv2.getAffineTransform(src_pts, dst_pts) # Apply affine transformation with border filling extracted_region = cv2.warpAffine( image, M, target_size, flags=cv2.INTER_LINEAR, ) return extracted_region, new_box, scale def preprocess(self, image: np.ndarray, bbox: np.ndarray): th, tw = self.input_shape[1:3] region, self.bbox, _ = self.region_of_interest_warped(image, bbox, (tw, th)) tensor = np.asarray(region).astype(self.input_type, copy=False) tensor = np.expand_dims(tensor, axis=0) return tensor def postprocess(self, tensor: List[np.ndarray], **kwargs): scores = np.clip(tensor[1][0], 0, 1) kp = np.concatenate([tensor[0][0], np.expand_dims(scores, axis=-1)], axis=-1) # See: /mmpose/models/pose_estimators/topdown.py - add_pred_to_datasample() th, tw = self.input_shape[1:3] bw, bh = [self.bbox[2] - self.bbox[0], self.bbox[3] - self.bbox[1]] kp[:, :2] /= np.array([tw, th]) kp[:, :2] *= np.array([bw, bh]) kp[:, :2] += np.array([self.bbox[0] + bw / 2, self.bbox[1] + bh / 2]) kp[:, :2] -= 0.5 * np.array([bw, bh]) return kp # ================================================================================================== class TopDown: def __init__( self, det_model_path, pose_model_path, conf_threshold=0.6, iou_threshold=0.6, warmup=30, ): if (not det_model_path.endswith(".onnx")) or ( not pose_model_path.endswith(".onnx") ): raise ValueError("Only ONNX models are supported.") self.det_model = RTMDet(det_model_path, conf_threshold, iou_threshold, warmup) self.pose_model = RTMPose(pose_model_path, warmup) def predict(self, image): boxes = self.det_model(image) results = [] for i in range(boxes.shape[0]): kp = self.pose_model(image, bbox=boxes[i]) results.append(kp) return results # ================================================================================================== def load_model(): print("Loading onnx model ...") model = TopDown( # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_extra-steps.onnx", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx", # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_extra-steps.onnx", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx", conf_threshold=0.3, iou_threshold=0.3, warmup=30, ) print("Loaded onnx model") return model def load_wb_model(): print("Loading mmpose whole body model ...") model = None print("Loaded mmpose model") return model # ================================================================================================== def get_2d_pose(model, imgs, num_joints=17): new_poses = [] for i in range(len(imgs)): img = imgs[i] img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) poses = [] dets = model.predict(img) for pose in dets: pose = np.asarray(pose) poses.append(pose) if len(poses) == 0: poses.append(np.zeros([num_joints, 3])) poses = np.array(poses) new_poses.append(poses) return new_poses