import math import os from abc import ABC, abstractmethod from typing import List import cv2 import numpy as np import onnxruntime as ort import pycuda.autoinit # noqa: F401 import pycuda.driver as cuda import tensorrt as trt from tqdm import tqdm # ================================================================================================== class BaseModel(ABC): def __init__(self, model_path: str, warmup: int): self.model_path = model_path self.runtime = "" if not os.path.exists(model_path): raise FileNotFoundError("File not found:", model_path) if model_path.endswith(".engine"): self.init_trt_engine(model_path) self.runtime = "trt" elif model_path.endswith(".onnx"): self.init_onnxruntime(model_path) self.runtime = "ort" else: raise ValueError("Unsupported model format:", model_path) if warmup > 0: print("Running warmup for '{}' ...".format(self.__class__.__name__)) self.warmup(warmup // 2) self.warmup(warmup // 2) def init_onnxruntime(self, model_path): usetrt = True usegpu = True self.opt = ort.SessionOptions() providers = ort.get_available_providers() # ort.set_default_logger_severity(1) self.providers = [] if usetrt and "TensorrtExecutionProvider" in providers: self.providers.append("TensorrtExecutionProvider") if usegpu and "CUDAExecutionProvider" in providers: self.providers.append("CUDAExecutionProvider") self.providers.append("CPUExecutionProvider") print("Using providers:", self.providers) self.session = ort.InferenceSession( model_path, providers=self.providers, sess_options=self.opt ) self.input_names = [input.name for input in self.session.get_inputs()] self.input_shapes = [input.shape for input in self.session.get_inputs()] input_types = [input.type for input in self.session.get_inputs()] self.input_types = [] for i in range(len(input_types)): input_type = input_types[i] if input_type == "tensor(float32)": itype = np.float32 elif input_type == "tensor(float16)": itype = np.float16 elif input_type == "tensor(int32)": itype = np.int32 elif input_type == "tensor(uint8)": itype = np.uint8 else: raise ValueError("Undefined input type:", input_type) self.input_types.append(itype) def init_trt_engine(self, engine_path): # https://docs.nvidia.com/deeplearning/tensorrt/developer-guide/index.html#python_topics # https://stackoverflow.com/a/79076885 self.trt_logger = trt.Logger(trt.Logger.WARNING) with open(engine_path, "rb") as f: runtime = trt.Runtime(self.trt_logger) self.engine = runtime.deserialize_cuda_engine(f.read()) self.context = self.engine.create_execution_context() self.stream = cuda.Stream() self.inputs, self.outputs, self.bindings = [], [], [] self.input_names = [] self.input_shapes = [] self.input_types = [] for i in range(self.engine.num_io_tensors): tensor_name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(tensor_name) dtype = trt.nptype(self.engine.get_tensor_dtype(tensor_name)) if -1 in shape: print("WARNING: Replacing dynamic shape with fixed for:", tensor_name) shape[list(shape).index(-1)] = 10 # Allocate host and device buffers size = trt.volume(shape) host_mem = cuda.pagelocked_empty(size, dtype) device_mem = cuda.mem_alloc(host_mem.nbytes) self.bindings.append(int(device_mem)) # Append to the appropriate input/output list if self.engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT: self.inputs.append((host_mem, device_mem, shape)) self.input_names.append(tensor_name) self.input_shapes.append(shape) self.input_types.append(dtype) else: self.outputs.append((host_mem, device_mem, shape)) # Set tensor address self.context.set_tensor_address( self.engine.get_tensor_name(i), self.bindings[i] ) @abstractmethod def preprocess(self, **kwargs): pass @abstractmethod def postprocess(self, **kwargs): pass def warmup(self, epoch: int): np.random.seed(42) for _ in tqdm(range(epoch)): inputs = {} for i in range(len(self.input_names)): iname = self.input_names[i] if "image" in iname: ishape = list(self.input_shapes[i]) if "batch_size" in ishape: if "TensorrtExecutionProvider" in self.providers: # Using different images sizes for TensorRT warmup takes too long ishape = [1, 1000, 1000, 3] else: ishape = [ 1, np.random.randint(300, 1000), np.random.randint(300, 1000), 3, ] tensor = np.random.random(ishape) tensor = tensor * 255 elif "bbox" in iname: tensor = np.array( [ [ np.random.randint(30, 100), np.random.randint(30, 100), np.random.randint(200, 300), np.random.randint(200, 300), ] ] ) else: raise ValueError("Undefined input type:", iname) tensor = tensor.astype(self.input_types[i]) inputs[iname] = tensor self.call_model(list(inputs.values())) def call_model_ort(self, tensor): inputs = {} for i in range(len(self.input_names)): iname = self.input_names[i] inputs[iname] = tensor[i] result = self.session.run(None, inputs) return result def call_model_trt(self, tensor): # Transfer input data to device for i, input_data in enumerate(tensor): np.copyto(self.inputs[i][0], input_data.ravel()) cuda.memcpy_htod_async(self.inputs[i][1], self.inputs[i][0], self.stream) # Empty the output buffers for i in range(len(self.outputs)): self.outputs[i][0].fill(0) cuda.memcpy_htod_async(self.outputs[i][1], self.outputs[i][0], self.stream) # Run inference self.context.execute_async_v3(stream_handle=self.stream.handle) # Transfer predictions back for i in range(len(self.outputs)): cuda.memcpy_dtoh_async(self.outputs[i][0], self.outputs[i][1], self.stream) # Synchronize the stream self.stream.synchronize() # Un-flatten the outputs outputs = [] for i in range(len(self.outputs)): output = self.outputs[i][0].reshape(self.outputs[i][2]) outputs.append(output) return outputs def call_model(self, tensor): if self.runtime == "trt": result = self.call_model_trt(tensor) elif self.runtime == "ort": result = self.call_model_ort(tensor) return result def __call__(self, **kwargs): tensor = self.preprocess(**kwargs) result = self.call_model(tensor) output = self.postprocess(result=result, **kwargs) return output # ================================================================================================== class LetterBox: def __init__(self, target_size, fill_value=0): self.target_size = target_size self.fill_value = fill_value def calc_params(self, ishape): img_h, img_w = ishape[:2] target_h, target_w = self.target_size scale = min(target_w / img_w, target_h / img_h) new_w = round(img_w * scale) new_h = round(img_h * scale) pad_w = target_w - new_w pad_h = target_h - new_h pad_left = pad_w // 2 pad_top = pad_h // 2 pad_right = pad_w - pad_left pad_bottom = pad_h - pad_top paddings = (pad_left, pad_right, pad_top, pad_bottom) return paddings, scale, (new_w, new_h) def resize_image(self, image): paddings, _, new_size = self.calc_params(image.shape) # Resize the image new_w, new_h = new_size resized_img = cv2.resize( image, (new_w, new_h), interpolation=cv2.INTER_NEAREST, ) # Optionally pad the image pad_left, pad_right, pad_top, pad_bottom = paddings if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0: final_img = resized_img else: final_img = cv2.copyMakeBorder( resized_img, pad_top, pad_bottom, pad_left, pad_right, borderType=cv2.BORDER_CONSTANT, value=[self.fill_value, self.fill_value, self.fill_value], ) return final_img # ================================================================================================== class BoxCrop: def __init__(self, target_size, padding_scale=1.0, fill_value=0): self.target_size = target_size self.padding_scale = padding_scale self.fill_value = fill_value def calc_params(self, ishape, bbox): start_x, start_y, end_x, end_y = bbox[0], bbox[1], bbox[2], bbox[3] target_h, target_w = self.target_size # Calculate original bounding box center center_x = (start_x + end_x) / 2.0 center_y = (start_y + end_y) / 2.0 # Scale the bounding box by the padding_scale bbox_w = end_x - start_x bbox_h = end_y - start_y scaled_w = bbox_w * self.padding_scale scaled_h = bbox_h * self.padding_scale # Calculate the aspect ratios bbox_aspect = scaled_w / scaled_h target_aspect = target_w / target_h # Adjust the scaled bounding box to match the target aspect ratio if bbox_aspect > target_aspect: adjusted_h = scaled_w / target_aspect adjusted_w = scaled_w else: adjusted_w = scaled_h * target_aspect adjusted_h = scaled_h # Calculate scaled bounding box coordinates bbox_w = adjusted_w bbox_h = adjusted_h new_start_x = center_x - bbox_w / 2.0 new_start_y = center_y - bbox_h / 2.0 new_end_x = center_x + bbox_w / 2.0 new_end_y = center_y + bbox_h / 2.0 # Round the box coordinates start_x = int(math.floor(new_start_x)) start_y = int(math.floor(new_start_y)) end_x = int(math.ceil(new_end_x)) end_y = int(math.ceil(new_end_y)) # Define the new box coordinates new_start_x = max(0, start_x) new_start_y = max(0, start_y) new_end_x = min(ishape[1] - 1, end_x) new_end_y = min(ishape[0] - 1, end_y) new_box = [new_start_x, new_start_y, new_end_x, new_end_y] # Calculate resized crop size bbox_w = new_box[2] - new_box[0] bbox_h = new_box[3] - new_box[1] scale = min(target_w / bbox_w, target_h / bbox_h) new_w = round(bbox_w * scale) new_h = round(bbox_h * scale) # Calculate paddings pad_w = target_w - new_w pad_h = target_h - new_h pad_left, pad_right, pad_top, pad_bottom = 0, 0, 0, 0 if pad_w > 0: if start_x < 0: pad_left = pad_w pad_right = 0 elif end_x > ishape[1]: pad_left = 0 pad_right = pad_w else: # Can be caused by bbox rounding pad_left = pad_w // 2 pad_right = pad_w - pad_left if pad_h > 0: if start_y < 0: pad_top = pad_h pad_bottom = 0 elif end_y > ishape[0]: pad_top = 0 pad_bottom = pad_h else: # Can be caused by bbox rounding pad_top = pad_h // 2 pad_bottom = pad_h - pad_top paddings = (pad_left, pad_right, pad_top, pad_bottom) return paddings, scale, new_box, (new_w, new_h) def crop_resize_box(self, image, bbox): paddings, _, new_box, new_size = self.calc_params(image.shape, bbox) # Extract the bounding box cropped_img = image[new_box[1] : new_box[3], new_box[0] : new_box[2]] # Resize the image new_w, new_h = new_size resized_img = cv2.resize( cropped_img, (new_w, new_h), interpolation=cv2.INTER_NEAREST, ) # Optionally pad the image pad_left, pad_right, pad_top, pad_bottom = paddings if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0: final_img = resized_img else: final_img = cv2.copyMakeBorder( resized_img, pad_top, pad_bottom, pad_left, pad_right, borderType=cv2.BORDER_CONSTANT, value=[self.fill_value, self.fill_value, self.fill_value], ) return final_img # ================================================================================================== class RTMDet(BaseModel): def __init__( self, model_path: str, conf_threshold: float, min_area_fraction: float, warmup: int = 30, ): super(RTMDet, self).__init__(model_path, warmup) self.target_size = (320, 320) self.conf_threshold = conf_threshold self.letterbox = LetterBox(self.target_size, fill_value=114) img_area = self.target_size[0] * self.target_size[1] self.min_area = img_area * min_area_fraction def preprocess(self, image: np.ndarray): image = self.letterbox.resize_image(image) tensor = np.asarray(image).astype(self.input_types[0], copy=False) tensor = np.expand_dims(tensor, axis=0) tensor = [tensor] return tensor def postprocess(self, result: List[np.ndarray], image: np.ndarray): boxes = np.squeeze(result[0], axis=0) classes = np.squeeze(result[1], axis=0) human_class = classes[:] == 0 boxes = boxes[human_class] keep = boxes[:, 4] > self.conf_threshold boxes = boxes[keep] if len(boxes) == 0: return np.array([]) # Drop boxes with too small area boxes = boxes.astype(np.float32) areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) keep = areas >= self.min_area boxes = boxes[keep] if len(boxes) == 0: return np.array([]) paddings, scale, _ = self.letterbox.calc_params(image.shape) boxes[:, 0] -= paddings[0] boxes[:, 2] -= paddings[0] boxes[:, 1] -= paddings[2] boxes[:, 3] -= paddings[2] boxes = np.maximum(boxes, 0) th, tw = self.target_size pad_w = paddings[0] + paddings[1] pad_h = paddings[2] + paddings[3] max_w = tw - pad_w - 1 max_h = th - pad_h - 1 boxes[:, 0] = np.minimum(boxes[:, 0], max_w) boxes[:, 1] = np.minimum(boxes[:, 1], max_h) boxes[:, 2] = np.minimum(boxes[:, 2], max_w) boxes[:, 3] = np.minimum(boxes[:, 3], max_h) boxes[:, 0:4] /= scale return boxes # ================================================================================================== class RTMPose(BaseModel): def __init__(self, model_path: str, warmup: int = 30): super(RTMPose, self).__init__(model_path, warmup) self.target_size = (384, 288) self.boxcrop = BoxCrop(self.target_size, padding_scale=1.25, fill_value=0) def preprocess(self, image: np.ndarray, bbox: np.ndarray): bbox = np.asarray(bbox)[0:4] bbox += np.array([-0.5, -0.5, 0.5 - 1e-8, 0.5 - 1e-8]) bbox = bbox.round().astype(np.int32) region = self.boxcrop.crop_resize_box(image, bbox) tensor = np.asarray(region).astype(self.input_types[0], copy=False) tensor = np.expand_dims(tensor, axis=0) tensor = [tensor] return tensor def postprocess( self, result: List[np.ndarray], image: np.ndarray, bbox: np.ndarray ): scores = np.clip(result[1][0], 0, 1) kp = np.concatenate([result[0][0], np.expand_dims(scores, axis=-1)], axis=-1) paddings, scale, bbox, _ = self.boxcrop.calc_params(image.shape, bbox) kp[:, 0] -= paddings[0] kp[:, 1] -= paddings[2] kp[:, 0:2] /= scale kp[:, 0] += bbox[0] kp[:, 1] += bbox[1] kp[:, 0:2] = np.maximum(kp[:, 0:2], 0) max_w = image.shape[1] - 1 max_h = image.shape[0] - 1 kp[:, 0] = np.minimum(kp[:, 0], max_w) kp[:, 1] = np.minimum(kp[:, 1], max_h) return kp # ================================================================================================== class TopDown: def __init__( self, det_model_path: str, pose_model_path: str, box_conf_threshold: float, box_min_area: float, warmup: int = 30, ): self.det_model = RTMDet( det_model_path, box_conf_threshold, box_min_area, warmup ) self.pose_model = RTMPose(pose_model_path, warmup) def predict(self, image): boxes = self.det_model(image=image) results = [] for i in range(boxes.shape[0]): kp = self.pose_model(image=image, bbox=boxes[i]) results.append(kp) return results # ================================================================================================== def load_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1): print("Loading 2D model ...") model = TopDown( # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx", # "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.engine", "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_1x384x288x3_fp16_extra-steps.engine", box_conf_threshold=min_bbox_score, box_min_area=min_bbox_area, warmup=30, ) print("Loaded 2D model") return model def load_wb_model(): print("Loading mmpose whole body model ...") model = None print("Loaded mmpose model") return model # ================================================================================================== def get_2d_pose(model, imgs, num_joints=17): new_poses = [] for i in range(len(imgs)): img = imgs[i] poses = [] dets = model.predict(img) for pose in dets: pose = np.asarray(pose) poses.append(pose) if len(poses) == 0: poses.append(np.zeros([num_joints, 3])) poses = np.array(poses) new_poses.append(poses) return new_poses