From f6d13ea5a7f8518dd76d314e49bf91ed906bb5e6 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 29 Nov 2024 14:21:43 +0100 Subject: [PATCH] Testing onnx runtime with easypose. --- extras/easypose/README.md | 9 ++ extras/easypose/detection.py | 98 ++++++++++++ extras/easypose/dockerfile | 10 ++ extras/easypose/pipeline.py | 262 +++++++++++++++++++++++++++++++ extras/easypose/pose.py | 64 ++++++++ extras/easypose/run_container.sh | 11 ++ extras/easypose/utils.py | 203 ++++++++++++++++++++++++ scripts/utils_2d_pose_ep.py | 61 +++++++ 8 files changed, 718 insertions(+) create mode 100644 extras/easypose/README.md create mode 100644 extras/easypose/detection.py create mode 100644 extras/easypose/dockerfile create mode 100644 extras/easypose/pipeline.py create mode 100644 extras/easypose/pose.py create mode 100644 extras/easypose/run_container.sh create mode 100644 extras/easypose/utils.py create mode 100644 scripts/utils_2d_pose_ep.py diff --git a/extras/easypose/README.md b/extras/easypose/README.md new file mode 100644 index 0000000..af15c02 --- /dev/null +++ b/extras/easypose/README.md @@ -0,0 +1,9 @@ +# Test ONNX with EasyPose + +Code files originally from: https://github.com/Dominic23331/EasyPose.git + +```bash +docker build --progress=plain -f extras/easypose/dockerfile -t rpt_easypose . + +./extras/easypose/run_container.sh +``` diff --git a/extras/easypose/detection.py b/extras/easypose/detection.py new file mode 100644 index 0000000..d15e229 --- /dev/null +++ b/extras/easypose/detection.py @@ -0,0 +1,98 @@ +import numpy as np +from typing import List + +from .base_model import BaseModel +from .utils import letterbox, nms, xywh2xyxy + + +class RTMDet(BaseModel): + def __init__(self, + model_path: str, + conf_threshold: float, + iou_threshold: float, + device: str = 'CUDA', + warmup: int = 30): + super(RTMDet, self).__init__(model_path, device, warmup) + self.conf_threshold = conf_threshold + self.iou_threshold = iou_threshold + self.dx = 0 + self.dy = 0 + self.scale = 0 + + def preprocess(self, image: np.ndarray): + th, tw = self.input_shape[2:] + image, self.dx, self.dy, self.scale = letterbox(image, (tw, th)) + tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395)) + tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) + return tensor + + def postprocess(self, tensor: List[np.ndarray]): + boxes = tensor[0] + boxes = np.squeeze(boxes, axis=0) + boxes[..., [4, 5]] = boxes[..., [5, 4]] + + boxes = nms(boxes, self.iou_threshold, self.conf_threshold) + + if boxes.shape[0] == 0: + return boxes + + human_class = boxes[..., -1] == 0 + boxes = boxes[human_class][..., :4] + + boxes[:, 0] -= self.dx + boxes[:, 2] -= self.dx + boxes[:, 1] -= self.dy + boxes[:, 3] -= self.dy + + boxes = np.clip(boxes, a_min=0, a_max=None) + boxes[:, :4] /= self.scale + + return boxes + + +class Yolov8(BaseModel): + def __init__(self, + model_path: str, + conf_threshold: float, + iou_threshold: float, + device: str = 'CUDA', + warmup: int = 30): + super(Yolov8, self).__init__(model_path, device, warmup) + self.conf_threshold = conf_threshold + self.iou_threshold = iou_threshold + self.dx = 0 + self.dy = 0 + self.scale = 0 + + def preprocess(self, image): + th, tw = self.input_shape[2:] + image, self.dx, self.dy, self.scale = letterbox(image, (tw, th)) + tensor = image / 255. + tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) + return tensor + + def postprocess(self, tensor): + feature_map = tensor[0] + feature_map = np.squeeze(feature_map, axis=0).transpose((1, 0)) + + pred_class = feature_map[..., 4:] + pred_conf = np.max(pred_class, axis=-1, keepdims=True) + pred_class = np.argmax(pred_class, axis=-1, keepdims=True) + boxes = np.concatenate([feature_map[..., :4], pred_conf, pred_class], axis=-1) + + boxes = xywh2xyxy(boxes) + boxes = nms(boxes, self.iou_threshold, self.conf_threshold) + + if boxes.shape[0] == 0: + return boxes + + human_class = boxes[..., -1] == 0 + boxes = boxes[human_class][..., :4] + + boxes[:, 0] -= self.dx + boxes[:, 2] -= self.dx + boxes[:, 1] -= self.dy + boxes[:, 3] -= self.dy + boxes = np.clip(boxes, a_min=0, a_max=None) + boxes[:, :4] /= self.scale + return boxes diff --git a/extras/easypose/dockerfile b/extras/easypose/dockerfile new file mode 100644 index 0000000..91dd978 --- /dev/null +++ b/extras/easypose/dockerfile @@ -0,0 +1,10 @@ +FROM rapidposetriangulation + +WORKDIR / + +RUN pip3 install --upgrade --no-cache-dir onnxruntime-gpu +RUN git clone https://github.com/Dominic23331/EasyPose.git --depth=1 +RUN cd /EasyPose/; pip install -v -e . + +WORKDIR /RapidPoseTriangulation/ +CMD ["/bin/bash"] diff --git a/extras/easypose/pipeline.py b/extras/easypose/pipeline.py new file mode 100644 index 0000000..e661d1b --- /dev/null +++ b/extras/easypose/pipeline.py @@ -0,0 +1,262 @@ +import os + +import numpy as np + +from easypose import model +from easypose.model import detection +from easypose.model import pose +from .download import get_url, get_model_path, download +from .consts import AvailablePoseModels, AvailableDetModels +from .common import Person, region_of_interest, restore_keypoints + + +def get_pose_model(pose_model_path, pose_model_decoder, device, warmup): + if pose_model_decoder == 'Dark': + pose_model = pose.Heatmap(pose_model_path, dark=True, device=device, warmup=warmup) + else: + pose_model = getattr(pose, pose_model_decoder)(pose_model_path, device=device, warmup=warmup) + return pose_model + + +def get_det_model(det_model_path, model_type, conf_thre, iou_thre, device, warmup): + det_model = getattr(detection, model_type)(det_model_path, conf_thre, iou_thre, device, warmup) + return det_model + + +class TopDown: + def __init__(self, + pose_model_name, + pose_model_decoder, + det_model_name, + conf_threshold=0.6, + iou_threshold=0.6, + device='CUDA', + warmup=30): + if pose_model_name not in AvailablePoseModels.POSE_MODELS: + raise ValueError( + 'The {} human pose estimation model is not in the model repository.'.format(pose_model_name)) + if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]: + raise ValueError( + 'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder, + pose_model_name)) + if det_model_name not in AvailableDetModels.DET_MODELS: + raise ValueError( + 'The {} detection model is not in the model repository.'.format(det_model_name)) + + pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + pose_model_path = os.path.join(pose_model_dir, + AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder]) + + if os.path.exists(pose_model_path): + try: + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + except Exception: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + else: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + + det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], + detection_model=True) + det_model_path = os.path.join(det_model_dir, + AvailableDetModels.DET_MODELS[det_model_name]['file_name']) + det_model_type = AvailableDetModels.DET_MODELS[det_model_name]['model_type'] + if os.path.exists(det_model_path): + try: + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + except Exception: + url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], + detection_model=True) + download(url, det_model_dir) + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + else: + url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], + detection_model=True) + download(url, det_model_dir) + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + + def predict(self, image): + boxes = self.det_model(image) + results = [] + for i in range(boxes.shape[0]): + p = Person() + p.box = boxes[i] + region = region_of_interest(image, p.box) + kp = self.pose_model(region) + p.keypoints = restore_keypoints(p.box, kp) + results.append(p) + return results + + +class Pose: + def __init__(self, + pose_model_name, + pose_model_decoder, + device='CUDA', + warmup=30): + if pose_model_name not in AvailablePoseModels.POSE_MODELS: + raise ValueError( + 'The {} human pose estimation model is not in the model repository.'.format(pose_model_name)) + if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]: + raise ValueError( + 'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder, + pose_model_name)) + + pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + pose_model_path = os.path.join(pose_model_dir, + AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder]) + + if os.path.exists(pose_model_path): + try: + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + except Exception: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + else: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) + + def predict(self, image): + p = Person() + box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0]) + p.box = box + p.keypoints = self.pose_model(image) + return p + + +class CustomTopDown: + def __init__(self, + pose_model, + det_model, + pose_decoder=None, + device='CUDA', + iou_threshold=0.6, + conf_threshold=0.6, + warmup=30): + if isinstance(pose_model, model.BaseModel): + self.pose_model = pose_model + elif isinstance(pose_model, str): + if pose_model not in AvailablePoseModels.POSE_MODELS: + raise ValueError( + 'The {} human pose estimation model is not in the model repository.'.format(pose_model)) + if pose_model not in AvailablePoseModels.POSE_MODELS[pose_model]: + raise ValueError( + 'No {} decoding head for the {} model was found in the model repository.'.format(pose_decoder, + pose_model)) + + pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], + detection_model=False) + pose_model_path = os.path.join(pose_model_dir, + AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder]) + + if os.path.exists(pose_model_path): + try: + self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) + except Exception: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) + else: + url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], + detection_model=False) + download(url, pose_model_dir) + self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) + else: + raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.") + + if isinstance(det_model, model.BaseModel): + self.det_model = det_model + elif isinstance(det_model, str): + if det_model not in AvailableDetModels.DET_MODELS: + raise ValueError( + 'The {} detection model is not in the model repository.'.format(det_model)) + + det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model]['file_name'], + detection_model=True) + det_model_path = os.path.join(det_model_dir, + AvailableDetModels.DET_MODELS[det_model]['file_name']) + det_model_type = AvailableDetModels.DET_MODELS[det_model]['model_type'] + if os.path.exists(det_model_path): + try: + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + except Exception: + url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'], + detection_model=True) + download(url, det_model_dir) + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + else: + url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'], + detection_model=True) + download(url, det_model_dir) + self.det_model = get_det_model(det_model_path, + det_model_type, + conf_threshold, + iou_threshold, + device, + warmup) + else: + raise TypeError("Invalid type for detection model, Please write a custom model based on 'BaseModel'.") + + def predict(self, image): + boxes = self.det_model(image) + results = [] + for i in range(boxes.shape[0]): + p = Person() + p.box = boxes[i] + region = region_of_interest(image, p.box) + kp = self.pose_model(region) + p.keypoints = restore_keypoints(p.box, kp) + results.append(p) + return results + + +class CustomSinglePose: + def __init__(self, pose_model): + if isinstance(pose_model, model.BaseModel): + self.pose_model = pose_model + else: + raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.") + + def predict(self, image): + p = Person() + box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0]) + p.box = box + p.keypoints = self.pose_model(image) + return p diff --git a/extras/easypose/pose.py b/extras/easypose/pose.py new file mode 100644 index 0000000..ca33247 --- /dev/null +++ b/extras/easypose/pose.py @@ -0,0 +1,64 @@ +import numpy as np +from typing import List + +from .base_model import BaseModel +from .utils import letterbox, get_heatmap_points, \ + get_real_keypoints, refine_keypoints_dark, refine_keypoints, simcc_decoder + + +class Heatmap(BaseModel): + def __init__(self, + model_path: str, + dark: bool = False, + device: str = 'CUDA', + warmup: int = 30): + super(Heatmap, self).__init__(model_path, device, warmup) + self.use_dark = dark + self.img_size = () + + def preprocess(self, image: np.ndarray): + th, tw = self.input_shape[2:] + self.img_size = image.shape[:2] + image, _, _, _ = letterbox(image, (tw, th)) + tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395)) + tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) + return tensor + + def postprocess(self, tensor: List[np.ndarray]): + heatmaps = tensor[0] + heatmaps = np.squeeze(heatmaps, axis=0) + keypoints = get_heatmap_points(heatmaps) + if self.use_dark: + keypoints = refine_keypoints_dark(keypoints, heatmaps, 11) + else: + keypoints = refine_keypoints(keypoints, heatmaps) + keypoints = get_real_keypoints(keypoints, heatmaps, self.img_size) + return keypoints + + +class SimCC(BaseModel): + def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30): + super(SimCC, self).__init__(model_path, device, warmup) + self.dx = 0 + self.dy = 0 + self.scale = 0 + + def preprocess(self, image: np.ndarray): + th, tw = self.input_shape[2:] + image, self.dx, self.dy, self.scale = letterbox(image, (tw, th)) + tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395)) + tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) + return tensor + + def postprocess(self, tensor: List[np.ndarray]): + simcc_x, simcc_y = tensor + simcc_x = np.squeeze(simcc_x, axis=0) + simcc_y = np.squeeze(simcc_y, axis=0) + keypoints = simcc_decoder(simcc_x, + simcc_y, + self.input_shape[2:], + self.dx, + self.dy, + self.scale) + + return keypoints diff --git a/extras/easypose/run_container.sh b/extras/easypose/run_container.sh new file mode 100644 index 0000000..f8811f6 --- /dev/null +++ b/extras/easypose/run_container.sh @@ -0,0 +1,11 @@ +#! /bin/bash + +xhost + +docker run --privileged --rm --network host -it \ + --gpus all --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \ + --volume "$(pwd)"/:/RapidPoseTriangulation/ \ + --volume "$(pwd)"/../datasets/:/datasets/ \ + --volume "$(pwd)"/../skelda/:/skelda/ \ + --volume /tmp/.X11-unix:/tmp/.X11-unix \ + --env DISPLAY --env QT_X11_NO_MITSHM=1 \ + rpt_easypose diff --git a/extras/easypose/utils.py b/extras/easypose/utils.py new file mode 100644 index 0000000..c268963 --- /dev/null +++ b/extras/easypose/utils.py @@ -0,0 +1,203 @@ +from itertools import product +from typing import Sequence + +import cv2 +import numpy as np + + +def letterbox(img: np.ndarray, target_size: Sequence[int], fill_value: int = 128): + h, w = img.shape[:2] + tw, th = target_size + + scale = min(tw / w, th / h) + nw, nh = int(w * scale), int(h * scale) + + resized_img = cv2.resize(img, (nw, nh)) + + canvas = np.full((th, tw, img.shape[2]), fill_value, dtype=img.dtype) + + dx, dy = (tw - nw) // 2, (th - nh) // 2 + canvas[dy:dy + nh, dx:dx + nw, :] = resized_img + + return canvas, dx, dy, scale + + +def intersection_over_union(box1: np.ndarray, box2: np.ndarray): + area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) + area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) + + x1 = max(box1[0], box2[0]) + y1 = max(box1[1], box2[1]) + x2 = min(box1[2], box2[2]) + y2 = min(box1[3], box2[3]) + + intersection = (x2 - x1) * (y2 - y1) + union = area1 + area2 - intersection + iou = intersection / (union + 1e-6) + + return iou + + +def xywh2xyxy(boxes): + boxes[:, 0] -= boxes[:, 2] / 2 + boxes[:, 1] -= boxes[:, 3] / 2 + boxes[:, 2] += boxes[:, 0] + boxes[:, 3] += boxes[:, 1] + return boxes + +def nms(boxes: np.ndarray, iou_threshold: float, conf_threshold: float): + conf = boxes[..., 4] > conf_threshold + boxes = boxes[conf] + boxes = list(boxes) + boxes.sort(reverse=True, key=lambda x: x[4]) + + result = [] + while boxes: + chosen_box = boxes.pop() + + b = [] + for box in boxes: + if box[-1] != chosen_box[-1] or \ + intersection_over_union(chosen_box, box) \ + < iou_threshold: + b.append(box) + + result.append(chosen_box) + boxes = b + + return np.array(result) + + +def get_heatmap_points(heatmap: np.ndarray): + keypoints = np.zeros([1, heatmap.shape[0], 3], dtype=np.float32) + for i in range(heatmap.shape[0]): + h, w = np.nonzero(heatmap[i] == heatmap[i].max()) + h, w = h[0], w[0] + h_fixed = h + 0.5 + w_fixed = w + 0.5 + score = heatmap[i][h][w] + keypoints[0][i][0] = w_fixed + keypoints[0][i][1] = h_fixed + keypoints[0][i][2] = score + return keypoints + + +def gaussian_blur(heatmaps: np.ndarray, kernel: int = 11): + assert kernel % 2 == 1 + + border = (kernel - 1) // 2 + K, H, W = heatmaps.shape + + for k in range(K): + origin_max = np.max(heatmaps[k]) + dr = np.zeros((H + 2 * border, W + 2 * border), dtype=np.float32) + dr[border:-border, border:-border] = heatmaps[k].copy() + dr = cv2.GaussianBlur(dr, (kernel, kernel), 0) + heatmaps[k] = dr[border:-border, border:-border].copy() + heatmaps[k] *= origin_max / np.max(heatmaps[k]) + return heatmaps + + +def refine_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray): + N, K = keypoints.shape[:2] + H, W = heatmaps.shape[:2] + + for n, k in product(range(N), range(K)): + x, y = keypoints[n, k, :2].astype(int) + + if 1 < x < W - 1 and 0 < y < H: + dx = heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1] + else: + dx = 0. + + if 1 < y < H - 1 and 0 < x < W: + dy = heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x] + else: + dy = 0. + + keypoints[n, k] += np.sign([dx, dy, 0], dtype=np.float32) * 0.25 + + return keypoints + + +def refine_keypoints_dark(keypoints: np.ndarray, heatmaps: np.ndarray, blur_kernel_size: int = 11): + N, K = keypoints.shape[:2] + H, W = heatmaps.shape[1:] + + # modulate heatmaps + heatmaps = gaussian_blur(heatmaps, blur_kernel_size) + np.maximum(heatmaps, 1e-10, heatmaps) + np.log(heatmaps, heatmaps) + + for n, k in product(range(N), range(K)): + x, y = keypoints[n, k, :2].astype(int) + if 1 < x < W - 2 and 1 < y < H - 2: + dx = 0.5 * (heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1]) + dy = 0.5 * (heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x]) + + dxx = 0.25 * ( + heatmaps[k, y, x + 2] - 2 * heatmaps[k, y, x] + + heatmaps[k, y, x - 2]) + dxy = 0.25 * ( + heatmaps[k, y + 1, x + 1] - heatmaps[k, y - 1, x + 1] - + heatmaps[k, y + 1, x - 1] + heatmaps[k, y - 1, x - 1]) + dyy = 0.25 * ( + heatmaps[k, y + 2, x] - 2 * heatmaps[k, y, x] + + heatmaps[k, y - 2, x]) + derivative = np.array([[dx], [dy]]) + hessian = np.array([[dxx, dxy], [dxy, dyy]]) + if dxx * dyy - dxy ** 2 != 0: + hessianinv = np.linalg.inv(hessian) + offset = -hessianinv @ derivative + offset = np.squeeze(np.array(offset.T), axis=0) + keypoints[n, k, :2] += offset + return keypoints + + +def get_real_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray, img_size: Sequence[int]): + img_h, img_w = img_size + heatmap_h, heatmap_w = heatmaps.shape[1:] + heatmap_ratio = heatmaps.shape[1] / heatmaps.shape[2] + img_ratio = img_h / img_w + if heatmap_ratio > img_ratio: + resize_w = img_w + resize_h = int(img_w * heatmap_ratio) + elif heatmap_ratio < img_ratio: + resize_h = img_h + resize_w = int(img_h / heatmap_ratio) + else: + resize_w = img_w + resize_h = img_h + + keypoints[:, :, 0] = (keypoints[:, :, 0] / heatmap_w) * resize_w - (resize_w - img_w) / 2 + keypoints[:, :, 1] = (keypoints[:, :, 1] / heatmap_h) * resize_h - (resize_h - img_h) / 2 + + keypoints = np.squeeze(keypoints, axis=0) + + return keypoints + + +def simcc_decoder(simcc_x: np.ndarray, + simcc_y: np.ndarray, + input_size: Sequence[int], + dx: int, + dy: int, + scale: float): + x = np.argmax(simcc_x, axis=-1, keepdims=True).astype(np.float32) + y = np.argmax(simcc_y, axis=-1, keepdims=True).astype(np.float32) + + x_conf = np.max(simcc_x, axis=-1, keepdims=True) + y_conf = np.max(simcc_y, axis=-1, keepdims=True) + conf = (x_conf + y_conf) / 2 + + x /= simcc_x.shape[-1] + y /= simcc_y.shape[-1] + x *= input_size[1] + y *= input_size[0] + + keypoints = np.concatenate([x, y, conf], axis=-1) + keypoints[..., 0] -= dx + keypoints[..., 1] -= dy + keypoints[..., :2] /= scale + + return keypoints diff --git a/scripts/utils_2d_pose_ep.py b/scripts/utils_2d_pose_ep.py new file mode 100644 index 0000000..8c4b3d9 --- /dev/null +++ b/scripts/utils_2d_pose_ep.py @@ -0,0 +1,61 @@ +import os + +import cv2 +import numpy as np +import easypose as ep + +# ================================================================================================== + +filepath = os.path.dirname(os.path.realpath(__file__)) + "/" + +# ================================================================================================== + + +def load_model(): + print("Loading mmpose model ...") + + model = ep.TopDown("rtmpose_m", "SimCC", "rtmdet_s") + + print("Loaded mmpose model") + return model + + +def load_wb_model(): + print("Loading mmpose whole body model ...") + + model = None + + print("Loaded mmpose model") + return model + + +# ================================================================================================== + + +def get_2d_pose(model, imgs, num_joints=17): + """See: https://mmpose.readthedocs.io/en/latest/user_guides/inference.html#basic-usage""" + + new_poses = [] + for i in range(len(imgs)): + img = imgs[i] + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + + poses = [] + dets = model.predict(img) + for pose in dets: + pose = pose.keypoints + pose = np.asarray(pose) + + scores = pose[:, 2].reshape(-1, 1) + scores = np.clip(scores, 0, 1) + pose = np.concatenate((pose[:, :2], scores), axis=-1) + + poses.append(pose) + + if len(poses) == 0: + poses.append(np.zeros([num_joints, 3])) + + poses = np.array(poses) + new_poses.append(poses) + + return new_poses