from abc import ABC, abstractmethod from typing import List import numpy as np import onnxruntime as ort from tqdm import tqdm # ================================================================================================== class BaseModel(ABC): def __init__( self, model_path: str, warmup: int, usetrt: bool = True, usegpu: bool = True ): self.opt = ort.SessionOptions() providers = ort.get_available_providers() # ort.set_default_logger_severity(1) self.providers = [] if usetrt and "TensorrtExecutionProvider" in providers: self.providers.append("TensorrtExecutionProvider") if usegpu and "CUDAExecutionProvider" in providers: self.providers.append("CUDAExecutionProvider") self.providers.append("CPUExecutionProvider") print("Using providers:", self.providers) self.session = ort.InferenceSession( model_path, providers=self.providers, sess_options=self.opt ) self.input_names = [input.name for input in self.session.get_inputs()] self.input_shapes = [input.shape for input in self.session.get_inputs()] input_types = [input.type for input in self.session.get_inputs()] self.input_types = [] for i in range(len(input_types)): input_type = input_types[i] if input_type == "tensor(float16)": itype = np.float16 elif input_type == "tensor(uint8)": itype = np.uint8 elif input_type == "tensor(int32)": itype = np.int32 else: itype = np.float32 self.input_types.append(itype) if warmup > 0: self.warmup(warmup) @abstractmethod def preprocess(self, image: np.ndarray, *args, **kwargs): pass @abstractmethod def postprocess(self, tensor: List[np.ndarray], *args, **kwargs): pass def warmup(self, epoch: int): print("Running warmup for '{}' ...".format(self.__class__.__name__)) for _ in tqdm(range(epoch)): inputs = {} for i in range(len(self.input_names)): iname = self.input_names[i] if "image" in iname: ishape = self.input_shapes[i] if "batch_size" in ishape: if "TensorrtExecutionProvider" in self.providers: # Using different images sizes for TensorRT warmup takes too long ishape = [1, 1000, 1000, 3] else: ishape = [ 1, np.random.randint(300, 1000), np.random.randint(300, 1000), 3, ] tensor = np.random.random(ishape) tensor = tensor * 255 elif "bbox" in iname: tensor = np.array( [ [ np.random.randint(30, 100), np.random.randint(30, 100), np.random.randint(200, 300), np.random.randint(200, 300), ] ] ) else: raise ValueError("Undefined input type:", iname) tensor = tensor.astype(self.input_types[i]) inputs[iname] = tensor self.session.run(None, inputs) def __call__(self, image: np.ndarray, *args, **kwargs): tensor = self.preprocess(image, *args, **kwargs) inputs = {} for i in range(len(self.input_names)): iname = self.input_names[i] inputs[iname] = tensor[i] result = self.session.run(None, inputs) output = self.postprocess(result, *args, **kwargs) return output # ================================================================================================== class RTMDet(BaseModel): def __init__( self, model_path: str, conf_threshold: float, warmup: int = 30, ): super(RTMDet, self).__init__(model_path, warmup) self.conf_threshold = conf_threshold def preprocess(self, image: np.ndarray): tensor = np.asarray(image).astype(self.input_types[0], copy=False) tensor = np.expand_dims(tensor, axis=0) tensor = [tensor] return tensor def postprocess(self, tensor: List[np.ndarray]): boxes = np.squeeze(tensor[1], axis=0) classes = np.squeeze(tensor[0], axis=0) human_class = classes[:] == 0 boxes = boxes[human_class] keep = boxes[:, 4] > self.conf_threshold boxes = boxes[keep] return boxes # ================================================================================================== class RTMPose(BaseModel): def __init__(self, model_path: str, warmup: int = 30): super(RTMPose, self).__init__(model_path, warmup) self.bbox = None def preprocess(self, image: np.ndarray, bbox: np.ndarray): tensor = np.asarray(image).astype(self.input_types[0], copy=False) tensor = np.expand_dims(tensor, axis=0) bbox = np.asarray(bbox)[0:4] bbox += np.array([-0.5, -0.5, 0.5 - 1e-8, 0.5 - 1e-8]) bbox = bbox.round().astype(np.int32) bbox = np.expand_dims(bbox, axis=0) tensor = [tensor, bbox] return tensor def postprocess(self, tensor: List[np.ndarray], **kwargs): scores = np.clip(tensor[0][0], 0, 1) kp = np.concatenate([tensor[1][0], np.expand_dims(scores, axis=-1)], axis=-1) return kp # ================================================================================================== class TopDown: def __init__( self, det_model_path, pose_model_path, box_conf_threshold=0.6, warmup=30, ): if (not det_model_path.endswith(".onnx")) or ( not pose_model_path.endswith(".onnx") ): raise ValueError("Only ONNX models are supported.") self.det_model = RTMDet(det_model_path, box_conf_threshold, warmup) self.pose_model = RTMPose(pose_model_path, warmup) def predict(self, image): boxes = self.det_model(image) results = [] for i in range(boxes.shape[0]): kp = self.pose_model(image, bbox=boxes[i]) results.append(kp) return results # ================================================================================================== def load_model(): print("Loading onnx model ...") model = TopDown( # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_extra-steps.onnx", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx", # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_extra-steps.onnx", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx", box_conf_threshold=0.3, warmup=30, ) print("Loaded onnx model") return model def load_wb_model(): print("Loading mmpose whole body model ...") model = None print("Loaded mmpose model") return model # ================================================================================================== def get_2d_pose(model, imgs, num_joints=17): new_poses = [] for i in range(len(imgs)): img = imgs[i] poses = [] dets = model.predict(img) for pose in dets: pose = np.asarray(pose) poses.append(pose) if len(poses) == 0: poses.append(np.zeros([num_joints, 3])) poses = np.array(poses) new_poses.append(poses) return new_poses