From 9746a03bb3b36ffbb54942ad052a1b3024f5a252 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 26 May 2025 14:17:39 +0200 Subject: [PATCH] Deleted some outdated code. --- extras/easypose/README.md | 18 -- extras/easypose/base_model.py | 65 ----- extras/easypose/detection.py | 100 -------- extras/easypose/dockerfile | 10 - extras/easypose/pipeline.py | 362 ---------------------------- extras/easypose/pose.py | 52 ---- extras/easypose/run_container.sh | 16 -- extras/easypose/utils.py | 259 -------------------- extras/easypose/utils_2d_pose_ep.py | 68 ------ 9 files changed, 950 deletions(-) delete mode 100644 extras/easypose/README.md delete mode 100644 extras/easypose/base_model.py delete mode 100644 extras/easypose/detection.py delete mode 100644 extras/easypose/dockerfile delete mode 100644 extras/easypose/pipeline.py delete mode 100644 extras/easypose/pose.py delete mode 100755 extras/easypose/run_container.sh delete mode 100644 extras/easypose/utils.py delete mode 100644 extras/easypose/utils_2d_pose_ep.py diff --git a/extras/easypose/README.md b/extras/easypose/README.md deleted file mode 100644 index 2c08ccd..0000000 --- a/extras/easypose/README.md +++ /dev/null @@ -1,18 +0,0 @@ -# Test ONNX with EasyPose - -Code files originally from: - -
- -```bash -docker build --progress=plain -f extras/easypose/dockerfile -t rpt_easypose . - -./extras/easypose/run_container.sh -``` - -```bash -export CUDA_VISIBLE_DEVICES=0 - -python3 /RapidPoseTriangulation/scripts/test_triangulate.py -python3 /RapidPoseTriangulation/scripts/test_skelda_dataset.py -``` diff --git a/extras/easypose/base_model.py b/extras/easypose/base_model.py deleted file mode 100644 index 4eab24d..0000000 --- a/extras/easypose/base_model.py +++ /dev/null @@ -1,65 +0,0 @@ -import warnings -from abc import ABC, abstractmethod -from typing import List -import time -import numpy as np -import onnxruntime as ort -from tqdm import tqdm - - -class BaseModel(ABC): - def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30): - self.opt = ort.SessionOptions() - - if device == 'CUDA': - provider = 'CUDAExecutionProvider' - if provider not in ort.get_available_providers(): - warnings.warn("No CUDAExecutionProvider found, switched to CPUExecutionProvider.", UserWarning) - provider = 'CPUExecutionProvider' - elif device == 'CPU': - provider = 'CPUExecutionProvider' - else: - raise ValueError('Provider {} does not exist.'.format(device)) - - self.session = ort.InferenceSession(model_path, - providers=[provider], - sess_options=self.opt) - - self.input_name = self.session.get_inputs()[0].name - self.input_shape = self.session.get_inputs()[0].shape - - input_type = self.session.get_inputs()[0].type - if input_type == 'tensor(float32)': - self.input_type = np.float32 - elif input_type == 'tensor(float16)': - self.input_type = np.float16 - elif input_type == 'tensor(uint8)': - self.input_type = np.uint8 - else: - raise ValueError('Unknown input type: ', input_type) - - if warmup > 0: - self.warmup(warmup) - - @abstractmethod - def preprocess(self, image: np.ndarray): - pass - - @abstractmethod - def postprocess(self, tensor: List[np.ndarray]): - pass - - def forward(self, image: np.ndarray): - tensor = self.preprocess(image) - result = self.session.run(None, {self.input_name: tensor}) - output = self.postprocess(result) - return output - - def warmup(self, epoch: int = 30): - print('{} start warmup!'.format(self.__class__.__name__)) - tensor = np.random.random(self.input_shape).astype(self.input_type) - for _ in tqdm(range(epoch)): - self.session.run(None, {self.input_name: tensor}) - - def __call__(self, image: np.ndarray, *args, **kwargs): - return self.forward(image) diff --git a/extras/easypose/detection.py b/extras/easypose/detection.py deleted file mode 100644 index e8cf15a..0000000 --- a/extras/easypose/detection.py +++ /dev/null @@ -1,100 +0,0 @@ -import numpy as np -from typing import List - -from .base_model import BaseModel -from .utils import letterbox, nms_optimized, xywh2xyxy - - -class RTMDet(BaseModel): - def __init__(self, - model_path: str, - conf_threshold: float, - iou_threshold: float, - device: str = 'CUDA', - warmup: int = 30): - super(RTMDet, self).__init__(model_path, device, warmup) - self.conf_threshold = conf_threshold - self.iou_threshold = iou_threshold - self.dx = 0 - self.dy = 0 - self.scale = 0 - - def preprocess(self, image: np.ndarray): - th, tw = self.input_shape[1:3] - image, self.dx, self.dy, self.scale = letterbox( - image, (tw, th), fill_value=114 - ) - tensor = np.asarray(image).astype(self.input_type, copy=False)[..., ::-1] - tensor = np.expand_dims(tensor, axis=0) - return tensor - - def postprocess(self, tensor: List[np.ndarray]): - boxes = np.squeeze(tensor[0], axis=0) - classes = np.expand_dims(np.squeeze(tensor[1], axis=0), axis=-1) - boxes = np.concatenate([boxes, classes], axis=-1) - - boxes = nms_optimized(boxes, self.iou_threshold, self.conf_threshold) - - if boxes.shape[0] == 0: - return boxes - - human_class = boxes[..., -1] == 0 - boxes = boxes[human_class][..., :4] - - boxes[:, 0] -= self.dx - boxes[:, 2] -= self.dx - boxes[:, 1] -= self.dy - boxes[:, 3] -= self.dy - - boxes = np.clip(boxes, a_min=0, a_max=None) - boxes[:, :4] /= self.scale - - return boxes - - -class Yolov8(BaseModel): - def __init__(self, - model_path: str, - conf_threshold: float, - iou_threshold: float, - device: str = 'CUDA', - warmup: int = 30): - super(Yolov8, self).__init__(model_path, device, warmup) - self.conf_threshold = conf_threshold - self.iou_threshold = iou_threshold - self.dx = 0 - self.dy = 0 - self.scale = 0 - - def preprocess(self, image): - th, tw = self.input_shape[2:] - image, self.dx, self.dy, self.scale = letterbox(image, (tw, th)) - tensor = image / 255. - tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) - return tensor - - def postprocess(self, tensor): - feature_map = tensor[0] - feature_map = np.squeeze(feature_map, axis=0).transpose((1, 0)) - - pred_class = feature_map[..., 4:] - pred_conf = np.max(pred_class, axis=-1, keepdims=True) - pred_class = np.argmax(pred_class, axis=-1, keepdims=True) - boxes = np.concatenate([feature_map[..., :4], pred_conf, pred_class], axis=-1) - - boxes = xywh2xyxy(boxes) - boxes = nms(boxes, self.iou_threshold, self.conf_threshold) - - if boxes.shape[0] == 0: - return boxes - - human_class = boxes[..., -1] == 0 - boxes = boxes[human_class][..., :4] - - boxes[:, 0] -= self.dx - boxes[:, 2] -= self.dx - boxes[:, 1] -= self.dy - boxes[:, 3] -= self.dy - boxes = np.clip(boxes, a_min=0, a_max=None) - boxes[:, :4] /= self.scale - return boxes diff --git a/extras/easypose/dockerfile b/extras/easypose/dockerfile deleted file mode 100644 index 91dd978..0000000 --- a/extras/easypose/dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM rapidposetriangulation - -WORKDIR / - -RUN pip3 install --upgrade --no-cache-dir onnxruntime-gpu -RUN git clone https://github.com/Dominic23331/EasyPose.git --depth=1 -RUN cd /EasyPose/; pip install -v -e . - -WORKDIR /RapidPoseTriangulation/ -CMD ["/bin/bash"] diff --git a/extras/easypose/pipeline.py b/extras/easypose/pipeline.py deleted file mode 100644 index 333e46f..0000000 --- a/extras/easypose/pipeline.py +++ /dev/null @@ -1,362 +0,0 @@ -import os - -import cv2 -import numpy as np - -from easypose import model -from easypose.model import detection -from easypose.model import pose -from .download import get_url, get_model_path, download -from .consts import AvailablePoseModels, AvailableDetModels -from .common import Person, region_of_interest, restore_keypoints - - -def get_pose_model(pose_model_path, pose_model_decoder, device, warmup): - if pose_model_decoder == 'Dark': - pose_model = pose.Heatmap(pose_model_path, dark=True, device=device, warmup=warmup) - else: - pose_model = getattr(pose, pose_model_decoder)(pose_model_path, device=device, warmup=warmup) - return pose_model - - -def get_det_model(det_model_path, model_type, conf_thre, iou_thre, device, warmup): - det_model = getattr(detection, model_type)(det_model_path, conf_thre, iou_thre, device, warmup) - return det_model - - -def region_of_interest_warped( - image: np.ndarray, - box: np.ndarray, - target_size=(288, 384), - padding_scale: float = 1.25, -): - start_x, start_y, end_x, end_y = box - target_w, target_h = target_size - - # Calculate original bounding box width and height - bbox_w = end_x - start_x - bbox_h = end_y - start_y - - if bbox_w <= 0 or bbox_h <= 0: - raise ValueError("Invalid bounding box!") - - # Calculate the aspect ratios - bbox_aspect = bbox_w / bbox_h - target_aspect = target_w / target_h - - # Adjust the scaled bounding box to match the target aspect ratio - if bbox_aspect > target_aspect: - adjusted_h = bbox_w / target_aspect - adjusted_w = bbox_w - else: - adjusted_w = bbox_h * target_aspect - adjusted_h = bbox_h - - # Scale the bounding box by the padding_scale - scaled_bbox_w = adjusted_w * padding_scale - scaled_bbox_h = adjusted_h * padding_scale - - # Calculate the center of the original box - center_x = (start_x + end_x) / 2.0 - center_y = (start_y + end_y) / 2.0 - - # Calculate scaled bounding box coordinates - new_start_x = center_x - scaled_bbox_w / 2.0 - new_start_y = center_y - scaled_bbox_h / 2.0 - new_end_x = center_x + scaled_bbox_w / 2.0 - new_end_y = center_y + scaled_bbox_h / 2.0 - - # Define the new box coordinates - new_box = np.array( - [new_start_x, new_start_y, new_end_x, new_end_y], dtype=np.float32 - ) - scale = target_w / scaled_bbox_w - - # Define source and destination points for affine transformation - # See: /mmpose/structures/bbox/transforms.py - src_pts = np.array( - [ - [center_x, center_y], - [new_start_x, center_y], - [new_start_x, center_y + (center_x - new_start_x)], - ], - dtype=np.float32, - ) - dst_pts = np.array( - [ - [target_w * 0.5, target_h * 0.5], - [0, target_h * 0.5], - [0, target_h * 0.5 + (target_w * 0.5 - 0)], - ], - dtype=np.float32, - ) - - # Compute the affine transformation matrix - M = cv2.getAffineTransform(src_pts, dst_pts) - - # Apply affine transformation with border filling - extracted_region = cv2.warpAffine( - image, - M, - target_size, - flags=cv2.INTER_LINEAR, - ) - - return extracted_region, new_box, scale - - -class TopDown: - def __init__(self, - pose_model_name, - pose_model_decoder, - det_model_name, - conf_threshold=0.6, - iou_threshold=0.6, - device='CUDA', - warmup=30): - if not pose_model_name.endswith('.onnx') and pose_model_name not in AvailablePoseModels.POSE_MODELS: - raise ValueError( - 'The {} human pose estimation model is not in the model repository.'.format(pose_model_name)) - if not pose_model_name.endswith('.onnx') and pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]: - raise ValueError( - 'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder, - pose_model_name)) - if not pose_model_name.endswith('.onnx') and det_model_name not in AvailableDetModels.DET_MODELS: - raise ValueError( - 'The {} detection model is not in the model repository.'.format(det_model_name)) - - if not pose_model_name.endswith('.onnx'): - pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - pose_model_path = os.path.join(pose_model_dir, - AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder]) - else: - pose_model_path = pose_model_name - - if os.path.exists(pose_model_path): - try: - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - except Exception: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - else: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - - if not det_model_name.endswith('.onnx'): - det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], - detection_model=True) - det_model_path = os.path.join(det_model_dir, - AvailableDetModels.DET_MODELS[det_model_name]['file_name']) - det_model_type = AvailableDetModels.DET_MODELS[det_model_name]['model_type'] - else: - det_model_path = det_model_name - if "rtmdet" in det_model_name: - det_model_type = 'RTMDet' - - if os.path.exists(det_model_path): - try: - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - except Exception: - url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], - detection_model=True) - download(url, det_model_dir) - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - else: - url = get_url(AvailableDetModels.DET_MODELS[det_model_name]['file_name'], - detection_model=True) - download(url, det_model_dir) - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - - def predict(self, image): - boxes = self.det_model(image) - results = [] - for i in range(boxes.shape[0]): - p = Person() - p.box = boxes[i] - region, p.box, _ = region_of_interest_warped(image, p.box) - kp = self.pose_model(region) - - # See: /mmpose/models/pose_estimators/topdown.py - add_pred_to_datasample() - th, tw = region.shape[:2] - bw, bh = [p.box[2] - p.box[0], p.box[3] - p.box[1]] - kp[:, :2] /= np.array([tw, th]) - kp[:, :2] *= np.array([bw, bh]) - kp[:, :2] += np.array([p.box[0] + bw / 2, p.box[1] + bh / 2]) - kp[:, :2] -= 0.5 * np.array([bw, bh]) - - p.keypoints = kp - results.append(p) - return results - - -class Pose: - def __init__(self, - pose_model_name, - pose_model_decoder, - device='CUDA', - warmup=30): - if pose_model_name not in AvailablePoseModels.POSE_MODELS: - raise ValueError( - 'The {} human pose estimation model is not in the model repository.'.format(pose_model_name)) - if pose_model_decoder not in AvailablePoseModels.POSE_MODELS[pose_model_name]: - raise ValueError( - 'No {} decoding head for the {} model was found in the model repository.'.format(pose_model_decoder, - pose_model_name)) - - pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - pose_model_path = os.path.join(pose_model_dir, - AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder]) - - if os.path.exists(pose_model_path): - try: - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - except Exception: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - else: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model_name][pose_model_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_model_decoder, device, warmup) - - def predict(self, image): - p = Person() - box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0]) - p.box = box - p.keypoints = self.pose_model(image) - return p - - -class CustomTopDown: - def __init__(self, - pose_model, - det_model, - pose_decoder=None, - device='CUDA', - iou_threshold=0.6, - conf_threshold=0.6, - warmup=30): - if isinstance(pose_model, model.BaseModel): - self.pose_model = pose_model - elif isinstance(pose_model, str): - if pose_model not in AvailablePoseModels.POSE_MODELS: - raise ValueError( - 'The {} human pose estimation model is not in the model repository.'.format(pose_model)) - if pose_model not in AvailablePoseModels.POSE_MODELS[pose_model]: - raise ValueError( - 'No {} decoding head for the {} model was found in the model repository.'.format(pose_decoder, - pose_model)) - - pose_model_dir = get_model_path(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], - detection_model=False) - pose_model_path = os.path.join(pose_model_dir, - AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder]) - - if os.path.exists(pose_model_path): - try: - self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) - except Exception: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) - else: - url = get_url(AvailablePoseModels.POSE_MODELS[pose_model][pose_decoder], - detection_model=False) - download(url, pose_model_dir) - self.pose_model = get_pose_model(pose_model_path, pose_decoder, device, warmup) - else: - raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.") - - if isinstance(det_model, model.BaseModel): - self.det_model = det_model - elif isinstance(det_model, str): - if det_model not in AvailableDetModels.DET_MODELS: - raise ValueError( - 'The {} detection model is not in the model repository.'.format(det_model)) - - det_model_dir = get_model_path(AvailableDetModels.DET_MODELS[det_model]['file_name'], - detection_model=True) - det_model_path = os.path.join(det_model_dir, - AvailableDetModels.DET_MODELS[det_model]['file_name']) - det_model_type = AvailableDetModels.DET_MODELS[det_model]['model_type'] - if os.path.exists(det_model_path): - try: - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - except Exception: - url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'], - detection_model=True) - download(url, det_model_dir) - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - else: - url = get_url(AvailableDetModels.DET_MODELS[det_model]['file_name'], - detection_model=True) - download(url, det_model_dir) - self.det_model = get_det_model(det_model_path, - det_model_type, - conf_threshold, - iou_threshold, - device, - warmup) - else: - raise TypeError("Invalid type for detection model, Please write a custom model based on 'BaseModel'.") - - def predict(self, image): - boxes = self.det_model(image) - results = [] - for i in range(boxes.shape[0]): - p = Person() - p.box = boxes[i] - region = region_of_interest(image, p.box) - kp = self.pose_model(region) - p.keypoints = restore_keypoints(p.box, kp) - results.append(p) - return results - - -class CustomSinglePose: - def __init__(self, pose_model): - if isinstance(pose_model, model.BaseModel): - self.pose_model = pose_model - else: - raise TypeError("Invalid type for pose model, Please write a custom model based on 'BaseModel'.") - - def predict(self, image): - p = Person() - box = np.array([0, 0, image.shape[3], image.shape[2], 1, 0]) - p.box = box - p.keypoints = self.pose_model(image) - return p diff --git a/extras/easypose/pose.py b/extras/easypose/pose.py deleted file mode 100644 index b0e6327..0000000 --- a/extras/easypose/pose.py +++ /dev/null @@ -1,52 +0,0 @@ -import numpy as np -from typing import List - -from .base_model import BaseModel -from .utils import letterbox, get_heatmap_points, \ - get_real_keypoints, refine_keypoints_dark, refine_keypoints, simcc_decoder - - -class Heatmap(BaseModel): - def __init__(self, - model_path: str, - dark: bool = False, - device: str = 'CUDA', - warmup: int = 30): - super(Heatmap, self).__init__(model_path, device, warmup) - self.use_dark = dark - self.img_size = () - - def preprocess(self, image: np.ndarray): - th, tw = self.input_shape[2:] - self.img_size = image.shape[:2] - image, _, _, _ = letterbox(image, (tw, th)) - tensor = (image - np.array((103.53, 116.28, 123.675))) / np.array((57.375, 57.12, 58.395)) - tensor = np.expand_dims(tensor, axis=0).transpose((0, 3, 1, 2)).astype(np.float32) - return tensor - - def postprocess(self, tensor: List[np.ndarray]): - heatmaps = tensor[0] - heatmaps = np.squeeze(heatmaps, axis=0) - keypoints = get_heatmap_points(heatmaps) - if self.use_dark: - keypoints = refine_keypoints_dark(keypoints, heatmaps, 11) - else: - keypoints = refine_keypoints(keypoints, heatmaps) - keypoints = get_real_keypoints(keypoints, heatmaps, self.img_size) - return keypoints - - -class SimCC(BaseModel): - def __init__(self, model_path: str, device: str = 'CUDA', warmup: int = 30): - super(SimCC, self).__init__(model_path, device, warmup) - - def preprocess(self, image: np.ndarray): - tensor = np.asarray(image).astype(self.input_type, copy=False) - tensor = np.expand_dims(tensor, axis=0) - return tensor - - def postprocess(self, tensor: List[np.ndarray]): - keypoints = np.concatenate( - [tensor[0][0], np.expand_dims(tensor[1][0], axis=-1)], axis=-1 - ) - return keypoints diff --git a/extras/easypose/run_container.sh b/extras/easypose/run_container.sh deleted file mode 100755 index e08d6bf..0000000 --- a/extras/easypose/run_container.sh +++ /dev/null @@ -1,16 +0,0 @@ -#! /bin/bash - -xhost + -docker run --privileged --rm --network host -it \ - --gpus all --shm-size=16g --ulimit memlock=-1 --ulimit stack=67108864 \ - --volume "$(pwd)"/:/RapidPoseTriangulation/ \ - --volume "$(pwd)"/extras/easypose/pipeline.py:/EasyPose/easypose/pipeline.py \ - --volume "$(pwd)"/extras/easypose/base_model.py:/EasyPose/easypose/model/base_model.py \ - --volume "$(pwd)"/extras/easypose/detection.py:/EasyPose/easypose/model/detection.py \ - --volume "$(pwd)"/extras/easypose/pose.py:/EasyPose/easypose/model/pose.py \ - --volume "$(pwd)"/extras/easypose/utils.py:/EasyPose/easypose/model/utils.py \ - --volume "$(pwd)"/../datasets/:/datasets/ \ - --volume "$(pwd)"/skelda/:/skelda/ \ - --volume /tmp/.X11-unix:/tmp/.X11-unix \ - --env DISPLAY --env QT_X11_NO_MITSHM=1 \ - rpt_easypose diff --git a/extras/easypose/utils.py b/extras/easypose/utils.py deleted file mode 100644 index cac76b0..0000000 --- a/extras/easypose/utils.py +++ /dev/null @@ -1,259 +0,0 @@ -from itertools import product -from typing import Sequence - -import cv2 -import numpy as np - - -def letterbox(img: np.ndarray, target_size: Sequence[int], fill_value: int = 128): - h, w = img.shape[:2] - tw, th = target_size - - scale = min(tw / w, th / h) - nw, nh = int(w * scale), int(h * scale) - dx, dy = (tw - nw) // 2, (th - nh) // 2 - - canvas = np.full((th, tw, img.shape[2]), fill_value, dtype=img.dtype) - canvas[dy:dy + nh, dx:dx + nw, :] = cv2.resize(img, (nw, nh)) - - return canvas, dx, dy, scale - - -def intersection_over_union(box1: np.ndarray, box2: np.ndarray): - area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) - area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) - - x1 = max(box1[0], box2[0]) - y1 = max(box1[1], box2[1]) - x2 = min(box1[2], box2[2]) - y2 = min(box1[3], box2[3]) - - intersection = (x2 - x1) * (y2 - y1) - union = area1 + area2 - intersection - iou = intersection / (union + 1e-6) - - return iou - - -def xywh2xyxy(boxes): - boxes[:, 0] -= boxes[:, 2] / 2 - boxes[:, 1] -= boxes[:, 3] / 2 - boxes[:, 2] += boxes[:, 0] - boxes[:, 3] += boxes[:, 1] - return boxes - -def nms(boxes: np.ndarray, iou_threshold: float, conf_threshold: float): - conf = boxes[..., 4] > conf_threshold - boxes = boxes[conf] - boxes = list(boxes) - boxes.sort(reverse=True, key=lambda x: x[4]) - - result = [] - while boxes: - chosen_box = boxes.pop() - - b = [] - for box in boxes: - if box[-1] != chosen_box[-1] or \ - intersection_over_union(chosen_box, box) \ - < iou_threshold: - b.append(box) - - result.append(chosen_box) - boxes = b - - return np.array(result) - - -def nms_optimized(boxes: np.ndarray, iou_threshold: float, conf_threshold: float): - """ - Perform Non-Maximum Suppression (NMS) on bounding boxes for a single class. - """ - - # Filter out boxes with low confidence scores - scores = boxes[:, 4] - keep = scores > conf_threshold - boxes = boxes[keep] - scores = scores[keep] - - if boxes.shape[0] == 0: - return np.empty((0, 5), dtype=boxes.dtype) - - # Compute the area of the bounding boxes - x1 = boxes[:, 0] - y1 = boxes[:, 1] - x2 = boxes[:, 2] - y2 = boxes[:, 3] - areas = (x2 - x1 + 1) * (y2 - y1 + 1) - - # Sort the boxes by scores in descending order - order = scores.argsort()[::-1] - - keep_indices = [] - while order.size > 0: - i = order[0] - keep_indices.append(i) - - # Compute IoU of the current box with the rest - xx1 = np.maximum(x1[i], x1[order[1:]]) - yy1 = np.maximum(y1[i], y1[order[1:]]) - xx2 = np.minimum(x2[i], x2[order[1:]]) - yy2 = np.minimum(y2[i], y2[order[1:]]) - - # Compute width and height of the overlapping area - w = np.maximum(0.0, xx2 - xx1 + 1) - h = np.maximum(0.0, yy2 - yy1 + 1) - - # Compute the area of the intersection - inter = w * h - - # Compute the IoU - iou = inter / (areas[i] + areas[order[1:]] - inter) - - # Keep boxes with IoU less than the threshold - inds = np.where(iou <= iou_threshold)[0] - - # Update the order array - order = order[inds + 1] - - # Return the boxes that are kept - return boxes[keep_indices] - - -def get_heatmap_points(heatmap: np.ndarray): - keypoints = np.zeros([1, heatmap.shape[0], 3], dtype=np.float32) - for i in range(heatmap.shape[0]): - h, w = np.nonzero(heatmap[i] == heatmap[i].max()) - h, w = h[0], w[0] - h_fixed = h + 0.5 - w_fixed = w + 0.5 - score = heatmap[i][h][w] - keypoints[0][i][0] = w_fixed - keypoints[0][i][1] = h_fixed - keypoints[0][i][2] = score - return keypoints - - -def gaussian_blur(heatmaps: np.ndarray, kernel: int = 11): - assert kernel % 2 == 1 - - border = (kernel - 1) // 2 - K, H, W = heatmaps.shape - - for k in range(K): - origin_max = np.max(heatmaps[k]) - dr = np.zeros((H + 2 * border, W + 2 * border), dtype=np.float32) - dr[border:-border, border:-border] = heatmaps[k].copy() - dr = cv2.GaussianBlur(dr, (kernel, kernel), 0) - heatmaps[k] = dr[border:-border, border:-border].copy() - heatmaps[k] *= origin_max / np.max(heatmaps[k]) - return heatmaps - - -def refine_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray): - N, K = keypoints.shape[:2] - H, W = heatmaps.shape[:2] - - for n, k in product(range(N), range(K)): - x, y = keypoints[n, k, :2].astype(int) - - if 1 < x < W - 1 and 0 < y < H: - dx = heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1] - else: - dx = 0. - - if 1 < y < H - 1 and 0 < x < W: - dy = heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x] - else: - dy = 0. - - keypoints[n, k] += np.sign([dx, dy, 0], dtype=np.float32) * 0.25 - - return keypoints - - -def refine_keypoints_dark(keypoints: np.ndarray, heatmaps: np.ndarray, blur_kernel_size: int = 11): - N, K = keypoints.shape[:2] - H, W = heatmaps.shape[1:] - - # modulate heatmaps - heatmaps = gaussian_blur(heatmaps, blur_kernel_size) - np.maximum(heatmaps, 1e-10, heatmaps) - np.log(heatmaps, heatmaps) - - for n, k in product(range(N), range(K)): - x, y = keypoints[n, k, :2].astype(int) - if 1 < x < W - 2 and 1 < y < H - 2: - dx = 0.5 * (heatmaps[k, y, x + 1] - heatmaps[k, y, x - 1]) - dy = 0.5 * (heatmaps[k, y + 1, x] - heatmaps[k, y - 1, x]) - - dxx = 0.25 * ( - heatmaps[k, y, x + 2] - 2 * heatmaps[k, y, x] + - heatmaps[k, y, x - 2]) - dxy = 0.25 * ( - heatmaps[k, y + 1, x + 1] - heatmaps[k, y - 1, x + 1] - - heatmaps[k, y + 1, x - 1] + heatmaps[k, y - 1, x - 1]) - dyy = 0.25 * ( - heatmaps[k, y + 2, x] - 2 * heatmaps[k, y, x] + - heatmaps[k, y - 2, x]) - derivative = np.array([[dx], [dy]]) - hessian = np.array([[dxx, dxy], [dxy, dyy]]) - if dxx * dyy - dxy ** 2 != 0: - hessianinv = np.linalg.inv(hessian) - offset = -hessianinv @ derivative - offset = np.squeeze(np.array(offset.T), axis=0) - keypoints[n, k, :2] += offset - return keypoints - - -def get_real_keypoints(keypoints: np.ndarray, heatmaps: np.ndarray, img_size: Sequence[int]): - img_h, img_w = img_size - heatmap_h, heatmap_w = heatmaps.shape[1:] - heatmap_ratio = heatmaps.shape[1] / heatmaps.shape[2] - img_ratio = img_h / img_w - if heatmap_ratio > img_ratio: - resize_w = img_w - resize_h = int(img_w * heatmap_ratio) - elif heatmap_ratio < img_ratio: - resize_h = img_h - resize_w = int(img_h / heatmap_ratio) - else: - resize_w = img_w - resize_h = img_h - - keypoints[:, :, 0] = (keypoints[:, :, 0] / heatmap_w) * resize_w - (resize_w - img_w) / 2 - keypoints[:, :, 1] = (keypoints[:, :, 1] / heatmap_h) * resize_h - (resize_h - img_h) / 2 - - keypoints = np.squeeze(keypoints, axis=0) - - return keypoints - - -def simcc_decoder( - simcc_x: np.ndarray, - simcc_y: np.ndarray, - input_size: Sequence[int], - dx: int, - dy: int, - scale: float, -): - # See: /mmpose/codecs/utils/post_processing.py - get_simcc_maximum() - - x = np.argmax(simcc_x, axis=-1, keepdims=True).astype(np.float32) - y = np.argmax(simcc_y, axis=-1, keepdims=True).astype(np.float32) - - x_conf = np.max(simcc_x, axis=-1, keepdims=True) - y_conf = np.max(simcc_y, axis=-1, keepdims=True) - conf = np.minimum(x_conf, y_conf) - - x /= simcc_x.shape[-1] - y /= simcc_y.shape[-1] - x *= input_size[1] - y *= input_size[0] - - keypoints = np.concatenate([x, y, conf], axis=-1) - keypoints[..., 0] -= dx - keypoints[..., 1] -= dy - keypoints[..., :2] /= scale - - return keypoints diff --git a/extras/easypose/utils_2d_pose_ep.py b/extras/easypose/utils_2d_pose_ep.py deleted file mode 100644 index 7b95251..0000000 --- a/extras/easypose/utils_2d_pose_ep.py +++ /dev/null @@ -1,68 +0,0 @@ -import os - -import cv2 -import easypose as ep -import numpy as np - -# ================================================================================================== - -filepath = os.path.dirname(os.path.realpath(__file__)) + "/" - -# ================================================================================================== - - -def load_model(): - print("Loading mmpose model ...") - - model = ep.TopDown( - "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_384x288_fp16_extra-steps.onnx", - "SimCC", - "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_320x320_fp16_extra-steps.onnx", - conf_threshold=0.3, - iou_threshold=0.3, - warmup=10, - ) - - print("Loaded mmpose model") - return model - - -def load_wb_model(): - print("Loading mmpose whole body model ...") - - model = None - - print("Loaded mmpose model") - return model - - -# ================================================================================================== - - -def get_2d_pose(model, imgs, num_joints=17): - """See: https://mmpose.readthedocs.io/en/latest/user_guides/inference.html#basic-usage""" - - new_poses = [] - for i in range(len(imgs)): - img = imgs[i] - img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - - poses = [] - dets = model.predict(img) - for pose in dets: - pose = pose.keypoints - pose = np.asarray(pose) - - scores = pose[:, 2].reshape(-1, 1) - scores = np.clip(scores, 0, 1) - pose = np.concatenate((pose[:, :2], scores), axis=-1) - - poses.append(pose) - - if len(poses) == 0: - poses.append(np.zeros([num_joints, 3])) - - poses = np.array(poses) - new_poses.append(poses) - - return new_poses