Testing onnx runtime with easypose.

This commit is contained in:
Daniel
2024-11-29 14:21:43 +01:00
parent ae21a3cf29
commit f6d13ea5a7
8 changed files with 718 additions and 0 deletions

View File

@ -0,0 +1,9 @@
# Test ONNX with EasyPose
Code files originally from: https://github.com/Dominic23331/EasyPose.git
```bash
docker build --progress=plain -f extras/easypose/dockerfile -t rpt_easypose .
./extras/easypose/run_container.sh
```

View File

@ -0,0 +1,98 @@
import numpy as np
from typing import List
from .base_model import BaseModel
from .utils import letterbox, nms, xywh2xyxy
class RTMDet(BaseModel):
def __init__(self,
model_path: str,
conf_threshold: float,
iou_threshold: float,
device: str = 'CUDA',
warmup: int = 30):
super(RTMDet, self).__init__(model_path, device, warmup)
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.dx = 0
self.dy = 0
self.scale = 0
def preprocess(self, image: np.ndarray):
th, tw = self.input_shape[2:]
image, self.dx, self.dy, self.scale = letterbox(image, (tw, th))
tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395))
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
boxes = tensor[0]
boxes = np.squeeze(boxes, axis=0)
boxes[..., [4, 5]] = boxes[..., [5, 4]]
boxes = nms(boxes, self.iou_threshold, self.conf_threshold)
if boxes.shape[0] == 0:
return boxes
human_class = boxes[..., -1] == 0
boxes = boxes[human_class][..., :4]
boxes[:, 0] -= self.dx
boxes[:, 2] -= self.dx
boxes[:, 1] -= self.dy
boxes[:, 3] -= self.dy
boxes = np.clip(boxes, a_min=0, a_max=None)
boxes[:, :4] /= self.scale
return boxes
class Yolov8(BaseModel):
def __init__(self,
model_path: str,
conf_threshold: float,
iou_threshold: float,
device: str = 'CUDA',
warmup: int = 30):
super(Yolov8, self).__init__(model_path, device, warmup)
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.dx = 0
self.dy = 0
self.scale = 0
def preprocess(self, image):
th, tw = self.input_shape[2:]
image, self.dx, self.dy, self.scale = letterbox(image, (tw, th))
tensor = image / 255.
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor):
feature_map = tensor[0]
feature_map = np.squeeze(feature_map, axis=0).transpose((1, 0))
pred_class = feature_map[..., 4:]
pred_conf = np.max(pred_class, axis=-1, keepdims=True)
pred_class = np.argmax(pred_class, axis=-1, keepdims=True)
boxes = np.concatenate([feature_map[..., :4], pred_conf, pred_class], axis=-1)
boxes = xywh2xyxy(boxes)
boxes = nms(boxes, self.iou_threshold, self.conf_threshold)
if boxes.shape[0] == 0:
return boxes
human_class = boxes[..., -1] == 0
boxes = boxes[human_class][..., :4]
boxes[:, 0] -= self.dx
boxes[:, 2] -= self.dx
boxes[:, 1] -= self.dy
boxes[:, 3] -= self.dy
boxes = np.clip(boxes, a_min=0, a_max=None)
boxes[:, :4] /= self.scale
return boxes

View File

@ -0,0 +1,10 @@
FROM rapidposetriangulation
WORKDIR /
RUN pip3 install --upgrade --no-cache-dir onnxruntime-gpu
RUN git clone https://github.com/Dominic23331/EasyPose.git --depth=1
RUN cd /EasyPose/; pip install -v -e .
WORKDIR /RapidPoseTriangulation/
CMD ["/bin/bash"]

262
extras/easypose/pipeline.py Normal file
View File

@ -0,0 +1,262 @@
import os
import numpy as np
from easypose import model
from easypose.model import detection
from easypose.model import pose
from .download import get_url, get_model_path, download
from .consts import AvailablePoseModels, AvailableDetModels
from .common import Person, region_of_interest, restore_keypoints
def get_pose_model(pose_model_path, pose_model_decoder, device, warmup):
if pose_model_decoder == 'Dark':
pose_model = pose.Heatmap(pose_model_path, dark=True, device=device, warmup=warmup)
else:
pose_model = getattr(pose, pose_model_decoder)(pose_model_path, device=device, warmup=warmup)
return pose_model
def get_det_model(det_model_path, model_type, conf_thre, iou_thre, device, warmup):
det_model = getattr(detection, model_type)(det_model_path, conf_thre, iou_thre, device, warmup)
return det_model
class TopDown:
def __init__(self,
pose_model_name,
pose_model_decoder,
det_model_name,
conf_threshold=0.6,
iou_threshold=0.6,
device='CUDA',
warmup=30):
if pose_model_name not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model_name))
if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder,
pose_model_name))
if det_model_name not in AvailableDetModels.DET_MODELS:
raise ValueError(
'The {} detection model is not in the model repository.'.format(det_model_name))
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder])
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
det_model_path = os.path.join(det_model_dir,
AvailableDetModels.DET_MODELS[det_model_name]['file_name'])
det_model_type = AvailableDetModels.DET_MODELS[det_model_name]['model_type']
if os.path.exists(det_model_path):
try:
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
except Exception:
url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
def predict(self, image):
boxes = self.det_model(image)
results = []
for i in range(boxes.shape[0]):
p = Person()
p.box = boxes[i]
region = region_of_interest(image, p.box)
kp = self.pose_model(region)
p.keypoints = restore_keypoints(p.box, kp)
results.append(p)
return results
class Pose:
def __init__(self,
pose_model_name,
pose_model_decoder,
device='CUDA',
warmup=30):
if pose_model_name not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model_name))
if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder,
pose_model_name))
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder])
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup)
def predict(self, image):
p = Person()
box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0])
p.box = box
p.keypoints = self.pose_model(image)
return p
class CustomTopDown:
def __init__(self,
pose_model,
det_model,
pose_decoder=None,
device='CUDA',
iou_threshold=0.6,
conf_threshold=0.6,
warmup=30):
if isinstance(pose_model, model.BaseModel):
self.pose_model = pose_model
elif isinstance(pose_model, str):
if pose_model not in AvailablePoseModels.POSE_MODELS:
raise ValueError(
'The {} human pose estimation model is not in the model repository.'.format(pose_model))
if pose_model not in AvailablePoseModels.POSE_MODELS[pose_model]:
raise ValueError(
'No {} decoding head for the {} model was found in the model repository.'.format(pose_decoder,
pose_model))
pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
pose_model_path = os.path.join(pose_model_dir,
AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder])
if os.path.exists(pose_model_path):
try:
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
except Exception:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
else:
url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder],
detection_model=False)
download(url, pose_model_dir)
self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup)
else:
raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.")
if isinstance(det_model, model.BaseModel):
self.det_model = det_model
elif isinstance(det_model, str):
if det_model not in AvailableDetModels.DET_MODELS:
raise ValueError(
'The {} detection model is not in the model repository.'.format(det_model))
det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
det_model_path = os.path.join(det_model_dir,
AvailableDetModels.DET_MODELS[det_model]['file_name'])
det_model_type = AvailableDetModels.DET_MODELS[det_model]['model_type']
if os.path.exists(det_model_path):
try:
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
except Exception:
url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'],
detection_model=True)
download(url, det_model_dir)
self.det_model = get_det_model(det_model_path,
det_model_type,
conf_threshold,
iou_threshold,
device,
warmup)
else:
raise TypeError("Invalid type for detection model, Please write a custom model based on 'BaseModel'.")
def predict(self, image):
boxes = self.det_model(image)
results = []
for i in range(boxes.shape[0]):
p = Person()
p.box = boxes[i]
region = region_of_interest(image, p.box)
kp = self.pose_model(region)
p.keypoints = restore_keypoints(p.box, kp)
results.append(p)
return results
class CustomSinglePose:
def __init__(self, pose_model):
if isinstance(pose_model, model.BaseModel):
self.pose_model = pose_model
else:
raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.")
def predict(self, image):
p = Person()
box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0])
p.box = box
p.keypoints = self.pose_model(image)
return p

64
extras/easypose/pose.py Normal file
View File

@ -0,0 +1,64 @@
import numpy as np
from typing import List
from .base_model import BaseModel
from .utils import letterbox, get_heatmap_points, \
get_real_keypoints, refine_keypoints_dark, refine_keypoints, simcc_decoder
class Heatmap(BaseModel):
def __init__(self,
model_path: str,
dark: bool = False,
device: str = 'CUDA',
warmup: int = 30):
super(Heatmap, self).__init__(model_path, device, warmup)
self.use_dark = dark
self.img_size = ()
def preprocess(self, image: np.ndarray):
th, tw = self.input_shape[2:]
self.img_size = image.shape[:2]
image, _, _, _ = letterbox(image, (tw, th))
tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395))
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
heatmaps = tensor[0]
heatmaps = np.squeeze(heatmaps, axis=0)
keypoints = get_heatmap_points(heatmaps)
if self.use_dark:
keypoints = refine_keypoints_dark(keypoints, heatmaps, 11)
else:
keypoints = refine_keypoints(keypoints, heatmaps)
keypoints = get_real_keypoints(keypoints, heatmaps, self.img_size)
return keypoints
class SimCC(BaseModel):
def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30):
super(SimCC, self).__init__(model_path, device, warmup)
self.dx = 0
self.dy = 0
self.scale = 0
def preprocess(self, image: np.ndarray):
th, tw = self.input_shape[2:]
image, self.dx, self.dy, self.scale = letterbox(image, (tw, th))
tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395))
tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32)
return tensor
def postprocess(self, tensor: List[np.ndarray]):
simcc_x, simcc_y = tensor
simcc_x = np.squeeze(simcc_x, axis=0)
simcc_y = np.squeeze(simcc_y, axis=0)
keypoints = simcc_decoder(simcc_x,
simcc_y,
self.input_shape[2:],
self.dx,
self.dy,
self.scale)
return keypoints

View File

@ -0,0 +1,11 @@
#! /bin/bash
xhost +
docker run --privileged --rm --network host -it \
--gpus all --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \
--volume "$(pwd)"/:/RapidPoseTriangulation/ \
--volume "$(pwd)"/../datasets/:/datasets/ \
--volume "$(pwd)"/../skelda/:/skelda/ \
--volume /tmp/.X11-unix:/tmp/.X11-unix \
--env DISPLAY --env QT_X11_NO_MITSHM=1 \
rpt_easypose

203
extras/easypose/utils.py Normal file
View File

@ -0,0 +1,203 @@
from itertools import product
from typing import Sequence
import cv2
import numpy as np
def letterbox(img: np.ndarray, target_size: Sequence[int], fill_value: int = 128):
h, w = img.shape[:2]
tw, th = target_size
scale = min(tw / w, th / h)
nw, nh = int(w * scale), int(h * scale)
resized_img = cv2.resize(img, (nw, nh))
canvas = np.full((th, tw, img.shape[2]), fill_value, dtype=img.dtype)
dx, dy = (tw - nw) // 2, (th - nh) // 2
canvas[dy:dy + nh, dx:dx + nw, :] = resized_img
return canvas, dx, dy, scale
def intersection_over_union(box1: np.ndarray, box2: np.ndarray):
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
intersection = (x2 - x1) * (y2 - y1)
union = area1 + area2 - intersection
iou = intersection / (union + 1e-6)
return iou
def xywh2xyxy(boxes):
boxes[:, 0] -= boxes[:, 2] / 2
boxes[:, 1] -= boxes[:, 3] / 2
boxes[:, 2] += boxes[:, 0]
boxes[:, 3] += boxes[:, 1]
return boxes
def nms(boxes: np.ndarray, iou_threshold: float, conf_threshold: float):
conf = boxes[..., 4] > conf_threshold
boxes = boxes[conf]
boxes = list(boxes)
boxes.sort(reverse=True, key=lambda x: x[4])
result = []
while boxes:
chosen_box = boxes.pop()
b = []
for box in boxes:
if box[-1] != chosen_box[-1] or \
intersection_over_union(chosen_box, box) \
< iou_threshold:
b.append(box)
result.append(chosen_box)
boxes = b
return np.array(result)
def get_heatmap_points(heatmap: np.ndarray):
keypoints = np.zeros([1, heatmap.shape[0], 3], dtype=np.float32)
for i in range(heatmap.shape[0]):
h, w = np.nonzero(heatmap[i] == heatmap[i].max())
h, w = h[0], w[0]
h_fixed = h + 0.5
w_fixed = w + 0.5
score = heatmap[i][h][w]
keypoints[0][i][0] = w_fixed
keypoints[0][i][1] = h_fixed
keypoints[0][i][2] = score
return keypoints
def gaussian_blur(heatmaps: np.ndarray, kernel: int = 11):
assert kernel % 2 == 1
border = (kernel - 1) // 2
K, H, W = heatmaps.shape
for k in range(K):
origin_max = np.max(heatmaps[k])
dr = np.zeros((H + 2 * border, W + 2 * border), dtype=np.float32)
dr[border:-border, border:-border] = heatmaps[k].copy()
dr = cv2.GaussianBlur(dr, (kernel, kernel), 0)
heatmaps[k] = dr[border:-border, border:-border].copy()
heatmaps[k] *= origin_max / np.max(heatmaps[k])
return heatmaps
def refine_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray):
N, K = keypoints.shape[:2]
H, W = heatmaps.shape[:2]
for n, k in product(range(N), range(K)):
x, y = keypoints[n, k, :2].astype(int)
if 1 < x < W - 1 and 0 < y < H:
dx = heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1]
else:
dx = 0.
if 1 < y < H - 1 and 0 < x < W:
dy = heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x]
else:
dy = 0.
keypoints[n, k] += np.sign([dx, dy, 0], dtype=np.float32) * 0.25
return keypoints
def refine_keypoints_dark(keypoints: np.ndarray, heatmaps: np.ndarray, blur_kernel_size: int = 11):
N, K = keypoints.shape[:2]
H, W = heatmaps.shape[1:]
# modulate heatmaps
heatmaps = gaussian_blur(heatmaps, blur_kernel_size)
np.maximum(heatmaps, 1e-10, heatmaps)
np.log(heatmaps, heatmaps)
for n, k in product(range(N), range(K)):
x, y = keypoints[n, k, :2].astype(int)
if 1 < x < W - 2 and 1 < y < H - 2:
dx = 0.5 * (heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1])
dy = 0.5 * (heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x])
dxx = 0.25 * (
heatmaps[k, y, x + 2] - 2 * heatmaps[k, y, x] +
heatmaps[k, y, x - 2])
dxy = 0.25 * (
heatmaps[k, y + 1, x + 1] - heatmaps[k, y - 1, x + 1] -
heatmaps[k, y + 1, x - 1] + heatmaps[k, y - 1, x - 1])
dyy = 0.25 * (
heatmaps[k, y + 2, x] - 2 * heatmaps[k, y, x] +
heatmaps[k, y - 2, x])
derivative = np.array([[dx], [dy]])
hessian = np.array([[dxx, dxy], [dxy, dyy]])
if dxx * dyy - dxy ** 2 != 0:
hessianinv = np.linalg.inv(hessian)
offset = -hessianinv @ derivative
offset = np.squeeze(np.array(offset.T), axis=0)
keypoints[n, k, :2] += offset
return keypoints
def get_real_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray, img_size: Sequence[int]):
img_h, img_w = img_size
heatmap_h, heatmap_w = heatmaps.shape[1:]
heatmap_ratio = heatmaps.shape[1] / heatmaps.shape[2]
img_ratio = img_h / img_w
if heatmap_ratio > img_ratio:
resize_w = img_w
resize_h = int(img_w * heatmap_ratio)
elif heatmap_ratio < img_ratio:
resize_h = img_h
resize_w = int(img_h / heatmap_ratio)
else:
resize_w = img_w
resize_h = img_h
keypoints[:, :, 0] = (keypoints[:, :, 0] / heatmap_w) * resize_w - (resize_w - img_w) / 2
keypoints[:, :, 1] = (keypoints[:, :, 1] / heatmap_h) * resize_h - (resize_h - img_h) / 2
keypoints = np.squeeze(keypoints, axis=0)
return keypoints
def simcc_decoder(simcc_x: np.ndarray,
simcc_y: np.ndarray,
input_size: Sequence[int],
dx: int,
dy: int,
scale: float):
x = np.argmax(simcc_x, axis=-1, keepdims=True).astype(np.float32)
y = np.argmax(simcc_y, axis=-1, keepdims=True).astype(np.float32)
x_conf = np.max(simcc_x, axis=-1, keepdims=True)
y_conf = np.max(simcc_y, axis=-1, keepdims=True)
conf = (x_conf + y_conf) / 2
x /= simcc_x.shape[-1]
y /= simcc_y.shape[-1]
x *= input_size[1]
y *= input_size[0]
keypoints = np.concatenate([x, y, conf], axis=-1)
keypoints[..., 0] -= dx
keypoints[..., 1] -= dy
keypoints[..., :2] /= scale
return keypoints