Moved pose pre/post-processing into onnx graph.
This commit is contained in:
@ -1,7 +1,6 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
from tqdm import tqdm
|
||||
@ -16,12 +15,11 @@ class BaseModel(ABC):
|
||||
# ort.set_default_logger_severity(1)
|
||||
|
||||
provider = ""
|
||||
if "TensorrtExecutionProvider" in providers:
|
||||
provider = "TensorrtExecutionProvider"
|
||||
elif "CUDAExecutionProvider" in providers:
|
||||
if "CUDAExecutionProvider" in providers:
|
||||
provider = "CUDAExecutionProvider"
|
||||
else:
|
||||
provider = "CPUExecutionProvider"
|
||||
self.provider = provider
|
||||
print("Found providers:", providers)
|
||||
print("Using:", provider)
|
||||
|
||||
@ -29,18 +27,22 @@ class BaseModel(ABC):
|
||||
model_path, providers=[provider], sess_options=self.opt
|
||||
)
|
||||
|
||||
self.input_name = self.session.get_inputs()[0].name
|
||||
self.input_shape = self.session.get_inputs()[0].shape
|
||||
if "batch_size" in self.input_shape:
|
||||
self.input_shape = [1, 500, 500, 3]
|
||||
self.input_names = [input.name for input in self.session.get_inputs()]
|
||||
self.input_shapes = [input.shape for input in self.session.get_inputs()]
|
||||
|
||||
input_type = self.session.get_inputs()[0].type
|
||||
if input_type == "tensor(float16)":
|
||||
self.input_type = np.float16
|
||||
elif input_type == "tensor(uint8)":
|
||||
self.input_type = np.uint8
|
||||
else:
|
||||
self.input_type = np.float32
|
||||
input_types = [input.type for input in self.session.get_inputs()]
|
||||
self.input_types = []
|
||||
for i in range(len(input_types)):
|
||||
input_type = input_types[i]
|
||||
if input_type == "tensor(float16)":
|
||||
itype = np.float16
|
||||
elif input_type == "tensor(uint8)":
|
||||
itype = np.uint8
|
||||
elif input_type == "tensor(int32)":
|
||||
itype = np.int32
|
||||
else:
|
||||
itype = np.float32
|
||||
self.input_types.append(itype)
|
||||
|
||||
if warmup > 0:
|
||||
self.warmup(warmup)
|
||||
@ -56,12 +58,51 @@ class BaseModel(ABC):
|
||||
def warmup(self, epoch: int):
|
||||
print("Running warmup for '{}' ...".format(self.__class__.__name__))
|
||||
for _ in tqdm(range(epoch)):
|
||||
tensor = np.random.random(self.input_shape).astype(self.input_type)
|
||||
self.session.run(None, {self.input_name: tensor})
|
||||
inputs = {}
|
||||
for i in range(len(self.input_names)):
|
||||
iname = self.input_names[i]
|
||||
|
||||
if "image" in iname:
|
||||
ishape = self.input_shapes[i]
|
||||
if "batch_size" in ishape:
|
||||
if self.provider == "TensorrtExecutionProvider":
|
||||
# Using different images sizes for TensorRT warmup takes too long
|
||||
ishape = [1, 1000, 1000, 3]
|
||||
else:
|
||||
ishape = [
|
||||
1,
|
||||
np.random.randint(300, 1000),
|
||||
np.random.randint(300, 1000),
|
||||
3,
|
||||
]
|
||||
tensor = np.random.random(ishape)
|
||||
tensor = tensor * 255
|
||||
elif "bbox" in iname:
|
||||
tensor = np.array(
|
||||
[
|
||||
[
|
||||
np.random.randint(30, 100),
|
||||
np.random.randint(30, 100),
|
||||
np.random.randint(200, 300),
|
||||
np.random.randint(200, 300),
|
||||
]
|
||||
]
|
||||
)
|
||||
else:
|
||||
raise ValueError("Undefined input type")
|
||||
|
||||
tensor = tensor.astype(self.input_types[i])
|
||||
inputs[iname] = tensor
|
||||
|
||||
self.session.run(None, inputs)
|
||||
|
||||
def __call__(self, image: np.ndarray, *args, **kwargs):
|
||||
tensor = self.preprocess(image, *args, **kwargs)
|
||||
result = self.session.run(None, {self.input_name: tensor})
|
||||
inputs = {}
|
||||
for i in range(len(self.input_names)):
|
||||
iname = self.input_names[i]
|
||||
inputs[iname] = tensor[i]
|
||||
result = self.session.run(None, inputs)
|
||||
output = self.postprocess(result, *args, **kwargs)
|
||||
return output
|
||||
|
||||
@ -80,8 +121,9 @@ class RTMDet(BaseModel):
|
||||
self.conf_threshold = conf_threshold
|
||||
|
||||
def preprocess(self, image: np.ndarray):
|
||||
tensor = np.asarray(image).astype(self.input_type, copy=False)
|
||||
tensor = np.asarray(image).astype(self.input_types[0], copy=False)
|
||||
tensor = np.expand_dims(tensor, axis=0)
|
||||
tensor = [tensor]
|
||||
return tensor
|
||||
|
||||
def postprocess(self, tensor: List[np.ndarray]):
|
||||
@ -105,106 +147,19 @@ class RTMPose(BaseModel):
|
||||
super(RTMPose, self).__init__(model_path, warmup)
|
||||
self.bbox = None
|
||||
|
||||
def region_of_interest_warped(
|
||||
self,
|
||||
image: np.ndarray,
|
||||
box: np.ndarray,
|
||||
target_size: List[int],
|
||||
padding_scale: float = 1.25,
|
||||
):
|
||||
start_x, start_y, end_x, end_y = box[0:4]
|
||||
target_w, target_h = target_size
|
||||
|
||||
# Calculate original bounding box width and height
|
||||
bbox_w = end_x - start_x
|
||||
bbox_h = end_y - start_y
|
||||
|
||||
if bbox_w <= 0 or bbox_h <= 0:
|
||||
raise ValueError("Invalid bounding box!")
|
||||
|
||||
# Calculate the aspect ratios
|
||||
bbox_aspect = bbox_w / bbox_h
|
||||
target_aspect = target_w / target_h
|
||||
|
||||
# Adjust the scaled bounding box to match the target aspect ratio
|
||||
if bbox_aspect > target_aspect:
|
||||
adjusted_h = bbox_w / target_aspect
|
||||
adjusted_w = bbox_w
|
||||
else:
|
||||
adjusted_w = bbox_h * target_aspect
|
||||
adjusted_h = bbox_h
|
||||
|
||||
# Scale the bounding box by the padding_scale
|
||||
scaled_bbox_w = adjusted_w * padding_scale
|
||||
scaled_bbox_h = adjusted_h * padding_scale
|
||||
|
||||
# Calculate the center of the original box
|
||||
center_x = (start_x + end_x) / 2.0
|
||||
center_y = (start_y + end_y) / 2.0
|
||||
|
||||
# Calculate scaled bounding box coordinates
|
||||
new_start_x = center_x - scaled_bbox_w / 2.0
|
||||
new_start_y = center_y - scaled_bbox_h / 2.0
|
||||
new_end_x = center_x + scaled_bbox_w / 2.0
|
||||
new_end_y = center_y + scaled_bbox_h / 2.0
|
||||
|
||||
# Define the new box coordinates
|
||||
new_box = np.array(
|
||||
[new_start_x, new_start_y, new_end_x, new_end_y], dtype=np.float32
|
||||
)
|
||||
scale = target_w / scaled_bbox_w
|
||||
|
||||
# Define source and destination points for affine transformation
|
||||
# See: /mmpose/structures/bbox/transforms.py
|
||||
src_pts = np.array(
|
||||
[
|
||||
[center_x, center_y],
|
||||
[new_start_x, center_y],
|
||||
[new_start_x, center_y + (center_x - new_start_x)],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
dst_pts = np.array(
|
||||
[
|
||||
[target_w * 0.5, target_h * 0.5],
|
||||
[0, target_h * 0.5],
|
||||
[0, target_h * 0.5 + (target_w * 0.5 - 0)],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Compute the affine transformation matrix
|
||||
M = cv2.getAffineTransform(src_pts, dst_pts)
|
||||
|
||||
# Apply affine transformation with border filling
|
||||
extracted_region = cv2.warpAffine(
|
||||
image,
|
||||
M,
|
||||
target_size,
|
||||
flags=cv2.INTER_LINEAR,
|
||||
)
|
||||
|
||||
return extracted_region, new_box, scale
|
||||
|
||||
def preprocess(self, image: np.ndarray, bbox: np.ndarray):
|
||||
th, tw = self.input_shape[1:3]
|
||||
region, self.bbox, _ = self.region_of_interest_warped(image, bbox, (tw, th))
|
||||
tensor = np.asarray(region).astype(self.input_type, copy=False)
|
||||
tensor = np.asarray(image).astype(self.input_types[0], copy=False)
|
||||
tensor = np.expand_dims(tensor, axis=0)
|
||||
bbox = np.asarray(bbox)[0:4]
|
||||
bbox += np.array([-0.5, -0.5, 0.5 - 1e-8, 0.5 - 1e-8])
|
||||
bbox = bbox.round().astype(np.int32)
|
||||
bbox = np.expand_dims(bbox, axis=0)
|
||||
tensor = [tensor, bbox]
|
||||
return tensor
|
||||
|
||||
def postprocess(self, tensor: List[np.ndarray], **kwargs):
|
||||
scores = np.clip(tensor[1][0], 0, 1)
|
||||
kp = np.concatenate([tensor[0][0], np.expand_dims(scores, axis=-1)], axis=-1)
|
||||
|
||||
# See: /mmpose/models/pose_estimators/topdown.py - add_pred_to_datasample()
|
||||
th, tw = self.input_shape[1:3]
|
||||
bw, bh = [self.bbox[2] - self.bbox[0], self.bbox[3] - self.bbox[1]]
|
||||
kp[:, :2] /= np.array([tw, th])
|
||||
kp[:, :2] *= np.array([bw, bh])
|
||||
kp[:, :2] += np.array([self.bbox[0] + bw / 2, self.bbox[1] + bh / 2])
|
||||
kp[:, :2] -= 0.5 * np.array([bw, bh])
|
||||
|
||||
scores = np.clip(tensor[0][0], 0, 1)
|
||||
kp = np.concatenate([tensor[1][0], np.expand_dims(scores, axis=-1)], axis=-1)
|
||||
return kp
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user