From 1cc71cce5790ecef26b22177d4d1ce72cad2a7bb Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 28 Jan 2025 11:50:50 +0100 Subject: [PATCH] Dropping more old python code. --- scripts/test_skelda_dataset.cpp | 12 +- scripts/test_skelda_dataset.py | 38 +-- scripts/test_triangulate.py | 137 +++++---- scripts/utils_2d_pose.py | 514 -------------------------------- scripts/utils_pipeline.py | 100 ++----- 5 files changed, 102 insertions(+), 699 deletions(-) delete mode 100644 scripts/utils_2d_pose.py diff --git a/scripts/test_skelda_dataset.cpp b/scripts/test_skelda_dataset.cpp index b15246d..b2dfe08 100644 --- a/scripts/test_skelda_dataset.cpp +++ b/scripts/test_skelda_dataset.cpp @@ -146,7 +146,7 @@ int main(int argc, char **argv) print_steps = std::max((size_t)1, print_steps); std::cout << "Running predictions: |"; - size_t bar_width = (size_t)std::ceil((float)time_count / (float)print_steps) - 2; + size_t bar_width = (size_t)std::ceil((float)time_count / (float)print_steps); for (size_t i = 0; i < bar_width; i++) { std::cout << "-"; @@ -155,7 +155,7 @@ int main(int argc, char **argv) // Calculate 2D poses [items, views, persons, joints, 3] std::vector>>>> all_poses_2d; - std::cout << "Calculating 2D poses: "; + std::cout << "Calculating 2D poses: |"; for (size_t i = 0; i < dataset.size(); i++) { if (i % print_steps == 0) @@ -192,14 +192,14 @@ int main(int argc, char **argv) all_poses_2d.push_back(std::move(poses_2d_upd)); } - std::cout << std::endl; + std::cout << "|" << std::endl; // Calculate 3D poses [items, persons, joints, 4] std::vector>>> all_poses_3d; std::vector all_ids; std::string old_scene = ""; int old_id = -1; - std::cout << "Calculating 3D poses: "; + std::cout << "Calculating 3D poses: |"; for (size_t i = 0; i < dataset.size(); i++) { if (i % print_steps == 0) @@ -245,11 +245,11 @@ int main(int argc, char **argv) all_ids.push_back(item["id"].get()); old_id = item["index"]; } - std::cout << std::endl; + std::cout << "|" << std::endl; // Print timing stats std::cout << "\nMetrics:" << std::endl; - size_t warmup = 10; + size_t warmup = std::min((size_t)10, time_count - 1); double time_image = 0.0; double time_debayer = 0.0; double time_pose2d = 0.0; diff --git a/scripts/test_skelda_dataset.py b/scripts/test_skelda_dataset.py index 38d1a13..837b602 100644 --- a/scripts/test_skelda_dataset.py +++ b/scripts/test_skelda_dataset.py @@ -203,25 +203,11 @@ output_dir = "" # ================================================================================================== -def load_json(path: str): - with open(path, "r", encoding="utf-8") as file: - data = json.load(file) - return data - - -def save_json(data: dict, path: str): - with open(path, "w+", encoding="utf-8") as file: - json.dump(data, file, indent=0) - - -# ================================================================================================== - - def load_labels(dataset: dict): """Load labels by dataset description""" if "panoptic" in dataset: - labels = load_json(dataset["panoptic"]["path"]) + labels = utils_pipeline.load_json(dataset["panoptic"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 1500 < 90] # Filter by maximum number of persons @@ -242,7 +228,7 @@ def load_labels(dataset: dict): label["imgpaths"].pop(i) elif "human36m" in dataset: - labels = load_json(dataset["human36m"]["path"]) + labels = utils_pipeline.load_json(dataset["human36m"]["path"]) labels = [lb for lb in labels if lb["subject"] == "S9"] labels = [lb for i, lb in enumerate(labels) if i % 4000 < 150] @@ -251,7 +237,7 @@ def load_labels(dataset: dict): label.pop("frame") elif "mvor" in dataset: - labels = load_json(dataset["mvor"]["path"]) + labels = utils_pipeline.load_json(dataset["mvor"]["path"]) # Rename keys for label in labels: @@ -259,20 +245,20 @@ def load_labels(dataset: dict): label["imgpaths_color"] = label["imgpaths"] elif "ikeaasm" in dataset: - labels = load_json(dataset["ikeaasm"]["path"]) + labels = utils_pipeline.load_json(dataset["ikeaasm"]["path"]) cams0 = str(labels[0]["cameras"]) labels = [lb for lb in labels if str(lb["cameras"]) == cams0] elif "shelf" in dataset: - labels = load_json(dataset["shelf"]["path"]) + labels = utils_pipeline.load_json(dataset["shelf"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "campus" in dataset: - labels = load_json(dataset["campus"]["path"]) + labels = utils_pipeline.load_json(dataset["campus"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "tsinghua" in dataset: - labels = load_json(dataset["tsinghua"]["path"]) + labels = utils_pipeline.load_json(dataset["tsinghua"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if lb["seq"] == "seq_1"] labels = [lb for i, lb in enumerate(labels) if i % 300 < 90] @@ -281,15 +267,15 @@ def load_labels(dataset: dict): label["bodyids"] = list(range(len(label["bodies3D"]))) elif "chi3d" in dataset: - labels = load_json(dataset["chi3d"]["path"]) + labels = utils_pipeline.load_json(dataset["chi3d"]["path"]) labels = [lb for lb in labels if lb["setup"] == "s03"] labels = [lb for i, lb in enumerate(labels) if i % 2000 < 150] elif "human36m_wb" in dataset: - labels = load_json(dataset["human36m_wb"]["path"]) + labels = utils_pipeline.load_json(dataset["human36m_wb"]["path"]) elif any(("egohumans" in key for key in dataset)): - labels = load_json(dataset[dataset_use]["path"]) + labels = utils_pipeline.load_json(dataset[dataset_use]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if dataset[dataset_use]["subset"] in lb["seq"]] if dataset[dataset_use]["subset"] in ["volleyball", "tennis"]: @@ -359,7 +345,7 @@ def main(): "whole_body": whole_body, "take_interval": datasets[dataset_use]["take_interval"], } - save_json(config, config_path) + utils_pipeline.save_json(config, config_path) # Call the CPP binary os.system("/RapidPoseTriangulation/scripts/test_skelda_dataset.bin") @@ -367,7 +353,7 @@ def main(): # Load the results print("Loading exports ...") res_path = tmp_export_dir + "results.json" - results = load_json(res_path) + results = utils_pipeline.load_json(res_path) all_poses_3d = results["all_poses_3d"] all_poses_2d = results["all_poses_2d"] all_ids = results["all_ids"] diff --git a/scripts/test_triangulate.py b/scripts/test_triangulate.py index 10547e8..af85cae 100644 --- a/scripts/test_triangulate.py +++ b/scripts/test_triangulate.py @@ -1,28 +1,33 @@ import copy import json import os -import sys -import time import matplotlib import numpy as np -import utils_2d_pose import utils_pipeline from skelda import utils_pose, utils_view - -sys.path.append("/RapidPoseTriangulation/swig/") -import rpt +from skelda.writers import json_writer # ================================================================================================== filepath = os.path.dirname(os.path.realpath(__file__)) + "/" test_img_dir = filepath + "../data/" + whole_body = { "foots": False, "face": False, "hands": False, } +config = { + "min_match_score": 0.94, + "min_group_size": 1, + "min_bbox_score": 0.3, + "min_bbox_area": 0.1 * 0.1, + "batch_poses": True, + "whole_body": whole_body, + "take_interval": 1, +} joint_names_2d = utils_pipeline.get_joint_names(whole_body) joint_names_3d = list(joint_names_2d) @@ -40,9 +45,15 @@ def update_sample(sample, new_dir=""): ] # Add placeholders for missing keys - sample["cameras_color"] = sample["cameras"] - sample["imgpaths_color"] = sample["imgpaths"] - sample["cameras_depth"] = [] + if not "scene" in sample: + sample["scene"] = "default" + if not "id" in sample: + sample["id"] = "0" + if not "index" in sample: + sample["index"] = 0 + for cam in sample["cameras"]: + if not "type" in cam: + cam["type"] = "pinhole" return sample @@ -51,10 +62,6 @@ def update_sample(sample, new_dir=""): def main(): - if any((whole_body[k] for k in whole_body)): - kpt_model = utils_2d_pose.load_wb_model() - else: - kpt_model = utils_2d_pose.load_model(min_bbox_score=0.3) # Manually set matplotlib backend matplotlib.use("TkAgg") @@ -74,68 +81,58 @@ def main(): sample = json.load(file) sample = update_sample(sample, dirpath) - camparams = sample["cameras_color"] + if len(sample["imgpaths"]) == 1: + # At least two images are required + continue + + # Save dataset + labels = [sample] + tmp_export_dir = "/tmp/rpt/" + for label in labels: + if "splits" in label: + label.pop("splits") + json_writer.save_dataset(labels, tmp_export_dir) + + # Save config + config_path = tmp_export_dir + "config.json" + utils_pipeline.save_json(config, config_path) + + # Call the CPP binary + os.system("/RapidPoseTriangulation/scripts/test_skelda_dataset.bin") + + # Load the results + print("Loading exports ...") + res_path = tmp_export_dir + "results.json" + results = utils_pipeline.load_json(res_path) + poses_3d = results["all_poses_3d"][0] + poses_2d = results["all_poses_2d"][0] + joint_names_3d = results["joint_names_3d"] + + # Visualize the 2D results + fig1 = utils_view.draw_many_images( + sample["imgpaths"], [], [], poses_2d, joint_names_2d, "2D detections" + ) + fig1.savefig(os.path.join(dirpath, "2d-k.png"), dpi=fig1.dpi) + + # Visualize the 3D results + print("Detected 3D poses:") + poses_3d = np.array(poses_3d) + print(poses_3d.round(3)) + if len(poses_3d) == 0: + utils_view.show_plots() + continue + camparams = sample["cameras"] roomparams = { "room_size": sample["room_size"], "room_center": sample["room_center"], } - - # Load color images - images_2d = [] - for i in range(len(sample["cameras_color"])): - imgpath = sample["imgpaths_color"][i] - img = utils_pipeline.load_image(imgpath) - img = utils_pipeline.rgb2bayer(img) - img = utils_pipeline.bayer2rgb(img) - images_2d.append(img) - - # Get 2D poses - stime = time.time() - poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) - poses_2d = utils_pipeline.update_keypoints(poses_2d, joint_names_2d, whole_body) - print("2D time:", time.time() - stime) - # print([np.array(p).round(6).tolist() for p in poses_2d]) - - fig1 = utils_view.draw_many_images( - sample["imgpaths_color"], [], [], poses_2d, joint_names_2d, "2D detections" - ) - fig1.savefig(os.path.join(dirpath, "2d-k.png"), dpi=fig1.dpi) - # draw_utils.utils_view.show_plots() - - if len(images_2d) == 1: - utils_view.show_plots() - continue - - # Get 3D poses - if sum(np.sum(p) for p in poses_2d) == 0: - poses3D = np.zeros([1, len(joint_names_3d), 4]) - poses2D = np.zeros([len(images_2d), 1, len(joint_names_3d), 3]) - else: - cameras = rpt.convert_cameras(camparams) - roomp = [roomparams["room_size"], roomparams["room_center"]] - triangulator = rpt.Triangulator(min_match_score=0.94) - - stime = time.time() - poses_3d = triangulator.triangulate_poses( - poses_2d, cameras, roomp, joint_names_2d - ) - poses3D = np.array(poses_3d) - if len(poses3D) == 0: - poses3D = np.zeros([1, len(joint_names_3d), 4]) - print("3D time:", time.time() - stime) - - poses2D = [] - for cam in camparams: - poses_2d, _ = utils_pose.project_poses(poses3D, cam) - poses2D.append(poses_2d) - - print(poses3D) - # print(poses2D) - # print(poses3D.round(3).tolist()) - - fig2 = utils_view.draw_poses3d(poses3D, joint_names_3d, roomparams, camparams) + poses_2d_proj = [] + for cam in camparams: + poses_2d_cam, _ = utils_pose.project_poses(poses_3d, cam) + poses_2d_proj.append(poses_2d_cam) + fig2 = utils_view.draw_poses3d(poses_3d, joint_names_3d, roomparams, camparams) fig3 = utils_view.draw_many_images( - sample["imgpaths_color"], [], [], poses2D, joint_names_3d, "2D projections" + sample["imgpaths"], [], [], poses_2d_proj, joint_names_3d, "2D projections" ) fig2.savefig(os.path.join(dirpath, "3d-p.png"), dpi=fig2.dpi) fig3.savefig(os.path.join(dirpath, "2d-p.png"), dpi=fig3.dpi) diff --git a/scripts/utils_2d_pose.py b/scripts/utils_2d_pose.py deleted file mode 100644 index 601518d..0000000 --- a/scripts/utils_2d_pose.py +++ /dev/null @@ -1,514 +0,0 @@ -import math -import os -from abc import ABC, abstractmethod -from typing import List - -import cv2 -import numpy as np -import onnxruntime as ort -from tqdm import tqdm - -# ================================================================================================== - - -class BaseModel(ABC): - def __init__(self, model_path: str, warmup: int): - self.model_path = model_path - self.runtime = "" - - if not os.path.exists(model_path): - raise FileNotFoundError("File not found:", model_path) - - if model_path.endswith(".onnx"): - print("Loading model:", model_path) - self.init_onnxruntime(model_path) - self.runtime = "ort" - else: - raise ValueError("Unsupported model format:", model_path) - - if warmup > 0: - print("Running warmup for '{}' ...".format(self.__class__.__name__)) - self.warmup(warmup // 2) - self.warmup(warmup // 2) - - def init_onnxruntime(self, model_path): - usetrt = True - usegpu = True - - self.opt = ort.SessionOptions() - providers = ort.get_available_providers() - # ort.set_default_logger_severity(1) - - self.providers = [] - if usetrt and "TensorrtExecutionProvider" in providers: - self.providers.append( - ( - "TensorrtExecutionProvider", - { - "trt_engine_cache_enable": True, - "trt_engine_cache_path": "/RapidPoseTriangulation/data/trt_cache/", - }, - ) - ) - elif usegpu and "CUDAExecutionProvider" in providers: - self.providers.append("CUDAExecutionProvider") - else: - self.providers.append("CPUExecutionProvider") - print("Using providers:", self.providers) - - self.session = ort.InferenceSession( - model_path, providers=self.providers, sess_options=self.opt - ) - - self.input_names = [input.name for input in self.session.get_inputs()] - self.input_shapes = [input.shape for input in self.session.get_inputs()] - - input_types = [input.type for input in self.session.get_inputs()] - self.input_types = [] - for i in range(len(input_types)): - input_type = input_types[i] - if input_type == "tensor(float32)": - itype = np.float32 - elif input_type == "tensor(float16)": - itype = np.float16 - elif input_type == "tensor(int32)": - itype = np.int32 - elif input_type == "tensor(uint8)": - itype = np.uint8 - else: - raise ValueError("Undefined input type:", input_type) - self.input_types.append(itype) - - @abstractmethod - def preprocess(self, **kwargs): - pass - - @abstractmethod - def postprocess(self, **kwargs): - pass - - def warmup(self, epoch: int): - np.random.seed(42) - - for _ in tqdm(range(epoch)): - inputs = {} - for i in range(len(self.input_names)): - iname = self.input_names[i] - - if "image" in iname: - ishape = list(self.input_shapes[i]) - if "batch_size" in ishape: - max_batch_size = 10 - ishape[0] = np.random.choice(list(range(1, max_batch_size + 1))) - tensor = np.random.random(ishape) - tensor = tensor * 255 - else: - raise ValueError("Undefined input type:", iname) - - tensor = tensor.astype(self.input_types[i]) - inputs[iname] = tensor - - self.call_model_ort(list(inputs.values())) - - def call_model_ort(self, tensor): - inputs = {} - for i in range(len(self.input_names)): - iname = self.input_names[i] - inputs[iname] = tensor[i] - result = self.session.run(None, inputs) - return result - - def __call__(self, **kwargs): - tensor = self.preprocess(**kwargs) - result = self.call_model_ort(tensor) - output = self.postprocess(result=result, **kwargs) - return output - - -# ================================================================================================== - - -class LetterBox: - def __init__(self, target_size, fill_value=0): - self.target_size = target_size - self.fill_value = fill_value - - def calc_params(self, ishape): - img_h, img_w = ishape[:2] - target_h, target_w = self.target_size - - scale = min(target_w / img_w, target_h / img_h) - new_w = round(img_w * scale) - new_h = round(img_h * scale) - - pad_w = target_w - new_w - pad_h = target_h - new_h - pad_left = pad_w // 2 - pad_top = pad_h // 2 - pad_right = pad_w - pad_left - pad_bottom = pad_h - pad_top - paddings = (pad_left, pad_right, pad_top, pad_bottom) - - return paddings, scale, (new_w, new_h) - - def resize_image(self, image): - paddings, _, new_size = self.calc_params(image.shape) - - # Resize the image - new_w, new_h = new_size - resized_img = cv2.resize( - image, - (new_w, new_h), - interpolation=cv2.INTER_NEAREST, - ) - - # Optionally pad the image - pad_left, pad_right, pad_top, pad_bottom = paddings - if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0: - final_img = resized_img - else: - final_img = cv2.copyMakeBorder( - resized_img, - pad_top, - pad_bottom, - pad_left, - pad_right, - borderType=cv2.BORDER_CONSTANT, - value=[self.fill_value, self.fill_value, self.fill_value], - ) - - return final_img - - -# ================================================================================================== - - -class BoxCrop: - def __init__(self, target_size, padding_scale=1.0, fill_value=0): - self.target_size = target_size - self.padding_scale = padding_scale - self.fill_value = fill_value - - def calc_params(self, ishape, bbox): - img_h, img_w = ishape[:2] - target_h, target_w = self.target_size - - # Round the bounding box coordinates - start_x = math.floor(bbox[0]) - start_y = math.floor(bbox[1]) - end_x = math.ceil(bbox[2]) - end_y = math.ceil(bbox[3]) - - # Calculate original bounding box center - center_x = (start_x + end_x) / 2.0 - center_y = (start_y + end_y) / 2.0 - - # Scale the bounding box by the padding_scale - bbox_w = end_x - start_x - bbox_h = end_y - start_y - scaled_w = bbox_w * self.padding_scale - scaled_h = bbox_h * self.padding_scale - - # Calculate the aspect ratios - bbox_aspect = scaled_w / scaled_h - target_aspect = target_w / target_h - - # Adjust the scaled bounding box to match the target aspect ratio - if bbox_aspect > target_aspect: - adjusted_h = scaled_w / target_aspect - adjusted_w = scaled_w - else: - adjusted_w = scaled_h * target_aspect - adjusted_h = scaled_h - - # Calculate scaled bounding box coordinates - bbox_w = adjusted_w - bbox_h = adjusted_h - new_start_x = center_x - bbox_w / 2.0 - new_start_y = center_y - bbox_h / 2.0 - new_end_x = center_x + bbox_w / 2.0 - new_end_y = center_y + bbox_h / 2.0 - - # Round the box coordinates - start_x = int(math.floor(new_start_x)) - start_y = int(math.floor(new_start_y)) - end_x = int(math.ceil(new_end_x)) - end_y = int(math.ceil(new_end_y)) - - # Define the new box coordinates - new_start_x = max(0, start_x) - new_start_y = max(0, start_y) - new_end_x = min(img_w - 1, end_x) - new_end_y = min(img_h - 1, end_y) - new_box = [new_start_x, new_start_y, new_end_x, new_end_y] - - # Calculate resized crop size - bbox_w = new_box[2] - new_box[0] - bbox_h = new_box[3] - new_box[1] - scale = min(target_w / bbox_w, target_h / bbox_h) - new_w = round(bbox_w * scale) - new_h = round(bbox_h * scale) - - # Calculate paddings - pad_w = target_w - new_w - pad_h = target_h - new_h - pad_left, pad_right, pad_top, pad_bottom = 0, 0, 0, 0 - if pad_w > 0: - if start_x < 0: - pad_left = pad_w - pad_right = 0 - elif end_x > ishape[1]: - pad_left = 0 - pad_right = pad_w - else: - # Can be caused by bbox rounding - pad_left = pad_w // 2 - pad_right = pad_w - pad_left - if pad_h > 0: - if start_y < 0: - pad_top = pad_h - pad_bottom = 0 - elif end_y > ishape[0]: - pad_top = 0 - pad_bottom = pad_h - else: - # Can be caused by bbox rounding - pad_top = pad_h // 2 - pad_bottom = pad_h - pad_top - paddings = (pad_left, pad_right, pad_top, pad_bottom) - - return paddings, scale, new_box, (new_w, new_h) - - def crop_resize_box(self, image, bbox): - paddings, _, new_box, new_size = self.calc_params(image.shape, bbox) - - # Extract the bounding box - cropped_img = image[new_box[1] : new_box[3], new_box[0] : new_box[2]] - - # Resize the image - new_w, new_h = new_size - resized_img = cv2.resize( - cropped_img, - (new_w, new_h), - interpolation=cv2.INTER_NEAREST, - ) - - # Optionally pad the image - pad_left, pad_right, pad_top, pad_bottom = paddings - if pad_left == 0 and pad_right == 0 and pad_top == 0 and pad_bottom == 0: - final_img = resized_img - else: - final_img = cv2.copyMakeBorder( - resized_img, - pad_top, - pad_bottom, - pad_left, - pad_right, - borderType=cv2.BORDER_CONSTANT, - value=[self.fill_value, self.fill_value, self.fill_value], - ) - - return final_img - - -# ================================================================================================== - - -class RTMDet(BaseModel): - def __init__( - self, - model_path: str, - conf_threshold: float, - min_area_fraction: float, - warmup: int = 30, - ): - super(RTMDet, self).__init__(model_path, warmup) - self.target_size = (320, 320) - self.conf_threshold = conf_threshold - self.letterbox = LetterBox(self.target_size, fill_value=114) - - img_area = self.target_size[0] * self.target_size[1] - self.min_area = img_area * min_area_fraction - - def preprocess(self, image: np.ndarray): - image = self.letterbox.resize_image(image) - tensor = np.asarray(image).astype(self.input_types[0], copy=False) - tensor = np.expand_dims(tensor, axis=0) - tensor = [tensor] - return tensor - - def postprocess(self, result: List[np.ndarray], image: np.ndarray): - boxes = np.squeeze(result[0], axis=0) - - human_class = boxes[:, 5] == 0 - boxes = boxes[human_class] - - keep = boxes[:, 4] > self.conf_threshold - boxes = boxes[keep] - - if len(boxes) == 0: - return np.array([]) - - # Drop boxes with too small area - areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) - keep = areas >= self.min_area - boxes = boxes[keep] - - if len(boxes) == 0: - return np.array([]) - - paddings, scale, _ = self.letterbox.calc_params(image.shape) - - boxes[:, 0] -= paddings[0] - boxes[:, 2] -= paddings[0] - boxes[:, 1] -= paddings[2] - boxes[:, 3] -= paddings[2] - - boxes = np.maximum(boxes, 0) - th, tw = self.target_size - pad_w = paddings[0] + paddings[1] - pad_h = paddings[2] + paddings[3] - max_w = tw - pad_w - 1 - max_h = th - pad_h - 1 - boxes[:, 0] = np.minimum(boxes[:, 0], max_w) - boxes[:, 1] = np.minimum(boxes[:, 1], max_h) - boxes[:, 2] = np.minimum(boxes[:, 2], max_w) - boxes[:, 3] = np.minimum(boxes[:, 3], max_h) - - boxes[:, 0:4] /= scale - return boxes - - -# ================================================================================================== - - -class RTMPose(BaseModel): - def __init__(self, model_path: str, warmup: int = 30): - super(RTMPose, self).__init__(model_path, warmup) - self.target_size = (384, 288) - self.boxcrop = BoxCrop(self.target_size, padding_scale=1.25, fill_value=0) - - def preprocess(self, image: np.ndarray, bboxes: np.ndarray): - cutouts = [] - for i in range(len(bboxes)): - region = self.boxcrop.crop_resize_box(image, bboxes[i]) - tensor = np.asarray(region).astype(self.input_types[0], copy=False) - cutouts.append(tensor) - - if len(bboxes) == 1: - cutouts = np.expand_dims(cutouts[0], axis=0) - else: - cutouts = np.stack(cutouts, axis=0) - - tensor = [cutouts] - return tensor - - def postprocess( - self, result: List[np.ndarray], image: np.ndarray, bboxes: np.ndarray - ): - kpts = [] - for i in range(len(bboxes)): - kp = result[0][i] - - paddings, scale, bbox, _ = self.boxcrop.calc_params(image.shape, bboxes[i]) - kp[:, 0] -= paddings[0] - kp[:, 1] -= paddings[2] - kp[:, 0:2] /= scale - kp[:, 0] += bbox[0] - kp[:, 1] += bbox[1] - kp[:, 0:2] = np.maximum(kp[:, 0:2], 0) - max_w = image.shape[1] - 1 - max_h = image.shape[0] - 1 - kp[:, 0] = np.minimum(kp[:, 0], max_w) - kp[:, 1] = np.minimum(kp[:, 1], max_h) - kpts.append(kp) - - return kpts - - -# ================================================================================================== - - -class TopDown: - def __init__( - self, - det_model_path: str, - pose_model_path: str, - box_conf_threshold: float, - box_min_area: float, - warmup: int = 30, - ): - self.batch_poses = bool("Bx" in pose_model_path) - - self.det_model = RTMDet( - det_model_path, box_conf_threshold, box_min_area, warmup - ) - self.pose_model = RTMPose(pose_model_path, warmup) - - def predict(self, image): - boxes = self.det_model(image=image) - if len(boxes) == 0: - return [] - - results = [] - if self.batch_poses: - results = self.pose_model(image=image, bboxes=boxes) - else: - for i in range(boxes.shape[0]): - kp = self.pose_model(image=image, bboxes=[boxes[i]]) - results.append(kp[0]) - - return results - - -# ================================================================================================== - - -def load_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False): - print("Loading 2D model ...") - - model = TopDown( - "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx", - f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-m_{'B' if batch_poses else '1'}x384x288x3_fp16_extra-steps.onnx", - box_conf_threshold=min_bbox_score, - box_min_area=min_bbox_area, - warmup=30, - ) - - print("Loaded 2D model") - return model - - -def load_wb_model(min_bbox_score=0.3, min_bbox_area=0.1 * 0.1, batch_poses=False): - print("Loading 2D-WB model ...") - - # The FP16 pose model is much worse than the FP32 for whole-body keypoints - model = TopDown( - "/RapidPoseTriangulation/extras/mmdeploy/exports/rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx", - f"/RapidPoseTriangulation/extras/mmdeploy/exports/rtmpose-l_wb_{'B' if batch_poses else '1'}x384x288x3_extra-steps.onnx", - box_conf_threshold=min_bbox_score, - box_min_area=min_bbox_area, - warmup=30, - ) - - print("Loaded 2D-WB model") - return model - - -# ================================================================================================== - - -def get_2d_pose(model, imgs, num_joints=17): - - new_poses = [] - for i in range(len(imgs)): - img = imgs[i] - dets = model.predict(img) - - if len(dets) == 0: - poses = np.zeros([1, num_joints, 3], dtype=float) - else: - poses = np.asarray(dets, dtype=float) - new_poses.append(poses) - - return new_poses diff --git a/scripts/utils_pipeline.py b/scripts/utils_pipeline.py index b586617..8e6c0cf 100644 --- a/scripts/utils_pipeline.py +++ b/scripts/utils_pipeline.py @@ -1,13 +1,26 @@ -from typing import List - -import cv2 -import numpy as np +import json # ================================================================================================== + +def load_json(path: str): + with open(path, "r", encoding="utf-8") as file: + data = json.load(file) + return data + + +def save_json(data: dict, path: str): + with open(path, "w+", encoding="utf-8") as file: + json.dump(data, file, indent=0) + + +# ================================================================================================== + + def use_whole_body(whole_body: dict) -> bool: return any((whole_body[k] for k in whole_body)) + # ================================================================================================== @@ -174,82 +187,3 @@ def get_joint_names(whole_body: dict): ) return joint_names_2d - - -# ================================================================================================== - - -def load_image(path: str): - image = cv2.imread(path, 3) - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - image = np.asarray(image, dtype=np.uint8) - return image - - -# ================================================================================================== - - -def rgb2bayer(img): - bayer = np.zeros((img.shape[0], img.shape[1]), dtype=img.dtype) - bayer[0::2, 0::2] = img[0::2, 0::2, 0] - bayer[0::2, 1::2] = img[0::2, 1::2, 1] - bayer[1::2, 0::2] = img[1::2, 0::2, 1] - bayer[1::2, 1::2] = img[1::2, 1::2, 2] - return bayer - - -def bayer2rgb(bayer): - img = cv2.cvtColor(bayer, cv2.COLOR_BayerBG2RGB) - return img - - -# ================================================================================================== - - -def update_keypoints(poses_2d: list, joint_names: List[str], whole_body: dict) -> list: - new_views = [] - for view in poses_2d: - new_bodies = [] - for body in view: - body = body.tolist() - - new_body = body[:17] - if whole_body["foots"]: - new_body.extend(body[17:23]) - if whole_body["face"]: - new_body.extend(body[23:91]) - if whole_body["hands"]: - new_body.extend(body[91:]) - body = new_body - - hlid = joint_names.index("hip_left") - hrid = joint_names.index("hip_right") - mid_hip = [ - float(((body[hlid][0] + body[hrid][0]) / 2.0)), - float(((body[hlid][1] + body[hrid][1]) / 2.0)), - min(body[hlid][2], body[hrid][2]), - ] - body.append(mid_hip) - - slid = joint_names.index("shoulder_left") - srid = joint_names.index("shoulder_right") - mid_shoulder = [ - float(((body[slid][0] + body[srid][0]) / 2.0)), - float(((body[slid][1] + body[srid][1]) / 2.0)), - min(body[slid][2], body[srid][2]), - ] - body.append(mid_shoulder) - - elid = joint_names.index("ear_left") - erid = joint_names.index("ear_right") - head = [ - float(((body[elid][0] + body[erid][0]) / 2.0)), - float(((body[elid][1] + body[erid][1]) / 2.0)), - min(body[elid][2], body[erid][2]), - ] - body.append(head) - - new_bodies.append(body) - new_views.append(new_bodies) - - return new_views