import json import os import sys import time import cv2 import matplotlib import numpy as np import tqdm import utils_2d_pose import utils_pipeline from skelda import evals sys.path.append("/RapidPoseTriangulation/swig/") import rpt # ================================================================================================== whole_body = { "foots": False, "face": False, "hands": False, } dataset_use = "human36m" # dataset_use = "panoptic" # dataset_use = "mvor" # dataset_use = "shelf" # dataset_use = "campus" # dataset_use = "ikeaasm" # dataset_use = "chi3d" # dataset_use = "tsinghua" # dataset_use = "human36m_wb" # dataset_use = "egohumans_tagging" # dataset_use = "egohumans_legoassemble" # dataset_use = "egohumans_fencing" # dataset_use = "egohumans_basketball" # dataset_use = "egohumans_volleyball" # dataset_use = "egohumans_badminton" # dataset_use = "egohumans_tennis" # dataset_use = "ntu" # dataset_use = "koarob" # Describes the minimum area as fraction of the image size for a 2D bounding box to be considered # If the persons are small in the image, use a lower value default_min_bbox_area = 0.1 * 0.1 # Describes how confident a 2D bounding box needs to be to be considered # If the persons are small in the image, or poorly recognizable, use a lower value default_min_bbox_score = 0.3 # Describes how good two 2D poses need to match each other to create a valid triangulation # If the quality of the 2D detections is poor, use a lower value default_min_match_score = 0.94 # Describes the minimum number of camera pairs that need to detect the same person # If the number of cameras is high, and the views are not occluded, use a higher value default_min_group_size = 1 # Batch poses per image for faster processing # If most of the time only one person is in a image, disable it, because it is slightly slower then default_batch_poses = True datasets = { "human36m": { "path": "/datasets/human36m/skelda/pose_test.json", "take_interval": 5, "min_match_score": 0.95, "min_group_size": 1, "min_bbox_score": 0.4, "min_bbox_area": 0.1 * 0.1, "batch_poses": False, }, "panoptic": { "path": "/datasets/panoptic/skelda/test.json", "cams": ["00_03", "00_06", "00_12", "00_13", "00_23"], # "cams": ["00_03", "00_06", "00_12"], # "cams": ["00_03", "00_06", "00_12", "00_13", "00_23", "00_15", "00_10", "00_21", "00_09", "00_01"], "take_interval": 3, "min_match_score": 0.95, "use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"], "min_group_size": 1, # "min_group_size": 4, "min_bbox_area": 0.05 * 0.05, }, "mvor": { "path": "/datasets/mvor/skelda/all.json", "take_interval": 1, "with_depth": False, "min_match_score": 0.85, "min_bbox_score": 0.25, }, "campus": { "path": "/datasets/campus/skelda/test.json", "take_interval": 1, "min_match_score": 0.90, "min_bbox_score": 0.5, }, "shelf": { "path": "/datasets/shelf/skelda/test.json", "take_interval": 1, "min_match_score": 0.96, "min_group_size": 2, }, "ikeaasm": { "path": "/datasets/ikeaasm/skelda/test.json", "take_interval": 2, "min_match_score": 0.92, "min_bbox_score": 0.20, }, "chi3d": { "path": "/datasets/chi3d/skelda/all.json", "take_interval": 5, }, "tsinghua": { "path": "/datasets/tsinghua/skelda/test.json", "take_interval": 3, "min_match_score": 0.95, "min_group_size": 2, }, "human36m_wb": { "path": "/datasets/human36m/skelda/wb/test.json", "take_interval": 100, "min_bbox_score": 0.4, "batch_poses": False, }, "egohumans_tagging": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "tagging", "min_group_size": 2, "min_bbox_score": 0.2, "min_bbox_area": 0.05 * 0.05, }, "egohumans_legoassemble": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "legoassemble", "min_group_size": 2, }, "egohumans_fencing": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "fencing", "min_group_size": 7, "min_bbox_score": 0.5, "min_bbox_area": 0.05 * 0.05, }, "egohumans_basketball": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "basketball", "min_group_size": 7, "min_bbox_score": 0.25, "min_bbox_area": 0.025 * 0.025, }, "egohumans_volleyball": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "volleyball", "min_group_size": 11, "min_bbox_score": 0.25, "min_bbox_area": 0.05 * 0.05, }, "egohumans_badminton": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "badminton", "min_group_size": 7, "min_bbox_score": 0.25, "min_bbox_area": 0.05 * 0.05, }, "egohumans_tennis": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, "subset": "tennis", "min_group_size": 11, "min_bbox_area": 0.025 * 0.025, }, } joint_names_2d = utils_pipeline.get_joint_names(whole_body) joint_names_3d = list(joint_names_2d) eval_joints = [ "head", "shoulder_left", "shoulder_right", "elbow_left", "elbow_right", "wrist_left", "wrist_right", "hip_left", "hip_right", "knee_left", "knee_right", "ankle_left", "ankle_right", ] if dataset_use == "human36m": eval_joints[eval_joints.index("head")] = "nose" if dataset_use == "panoptic": eval_joints[eval_joints.index("head")] = "nose" if dataset_use == "human36m_wb": if utils_pipeline.use_whole_body(whole_body): eval_joints = list(joint_names_2d) else: eval_joints[eval_joints.index("head")] = "nose" # output_dir = "/RapidPoseTriangulation/data/testoutput/" output_dir = "" # ================================================================================================== def load_json(path: str): with open(path, "r", encoding="utf-8") as file: data = json.load(file) return data # ================================================================================================== def load_labels(dataset: dict): """Load labels by dataset description""" if "panoptic" in dataset: labels = load_json(dataset["panoptic"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 1500 < 90] # Filter by maximum number of persons labels = [l for l in labels if len(l["bodies3D"]) <= 10] # Filter scenes if "use_scenes" in dataset["panoptic"]: labels = [ l for l in labels if l["scene"] in dataset["panoptic"]["use_scenes"] ] # Filter cameras if not "cameras_depth" in labels[0]: for label in labels: for i, cam in reversed(list(enumerate(label["cameras"]))): if cam["name"] not in dataset["panoptic"]["cams"]: label["cameras"].pop(i) label["imgpaths"].pop(i) elif "human36m" in dataset: labels = load_json(dataset["human36m"]["path"]) labels = [lb for lb in labels if lb["subject"] == "S9"] labels = [lb for i, lb in enumerate(labels) if i % 4000 < 150] for label in labels: label.pop("action") label.pop("frame") elif "mvor" in dataset: labels = load_json(dataset["mvor"]["path"]) # Rename keys for label in labels: label["cameras_color"] = label["cameras"] label["imgpaths_color"] = label["imgpaths"] elif "ikeaasm" in dataset: labels = load_json(dataset["ikeaasm"]["path"]) cams0 = str(labels[0]["cameras"]) labels = [lb for lb in labels if str(lb["cameras"]) == cams0] elif "shelf" in dataset: labels = load_json(dataset["shelf"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "campus" in dataset: labels = load_json(dataset["campus"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "tsinghua" in dataset: labels = load_json(dataset["tsinghua"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if lb["seq"] == "seq_1"] labels = [lb for i, lb in enumerate(labels) if i % 300 < 90] for label in labels: label["bodyids"] = list(range(len(label["bodies3D"]))) elif "chi3d" in dataset: labels = load_json(dataset["chi3d"]["path"]) labels = [lb for lb in labels if lb["setup"] == "s03"] labels = [lb for i, lb in enumerate(labels) if i % 2000 < 150] elif "human36m_wb" in dataset: labels = load_json(dataset["human36m_wb"]["path"]) elif any(("egohumans" in key for key in dataset)): labels = load_json(dataset[dataset_use]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if dataset[dataset_use]["subset"] in lb["seq"]] if dataset[dataset_use]["subset"] in ["volleyball", "tennis"]: labels = [lb for i, lb in enumerate(labels) if i % 150 < 60] else: raise ValueError("Dataset not available") # Optionally drop samples to speed up train/eval if "take_interval" in dataset: take_interval = dataset["take_interval"] if take_interval > 1: labels = [l for i, l in enumerate(labels) if i % take_interval == 0] return labels # ================================================================================================== def main(): global joint_names_3d, eval_joints # Load dataset specific parameters min_match_score = datasets[dataset_use].get( "min_match_score", default_min_match_score ) min_group_size = datasets[dataset_use].get("min_group_size", default_min_group_size) min_bbox_score = datasets[dataset_use].get("min_bbox_score", default_min_bbox_score) min_bbox_area = datasets[dataset_use].get("min_bbox_area", default_min_bbox_area) batch_poses = datasets[dataset_use].get("batch_poses", default_batch_poses) # Load 2D pose model if utils_pipeline.use_whole_body(whole_body): kpt_model = utils_2d_pose.load_wb_model( min_bbox_score, min_bbox_area, batch_poses ) else: kpt_model = utils_2d_pose.load_model(min_bbox_score, min_bbox_area, batch_poses) # Manually set matplotlib backend try: matplotlib.use("TkAgg") except ImportError: print("WARNING: Using headless mode, no visualizations will be shown.") print("Loading dataset ...") labels = load_labels( { dataset_use: datasets[dataset_use], "take_interval": datasets[dataset_use]["take_interval"], } ) # Print a dataset sample for debugging print(labels[0]) print("\nPrefetching images ...") for label in tqdm.tqdm(labels): # If the images are stored on a HDD, it sometimes takes a while to load them # Prefetching them results in more stable timings of the following steps # To prevent memory overflow, the code only loads the images, but does not store them try: for i in range(len(label["imgpaths"])): imgpath = label["imgpaths"][i] img = utils_pipeline.load_image(imgpath) except cv2.error: print("One of the paths not found:", label["imgpaths"]) continue time.sleep(3) print("\nCalculating 2D predictions ...") all_poses_2d = [] times = [] for label in tqdm.tqdm(labels): images_2d = [] start = time.time() try: for i in range(len(label["imgpaths"])): imgpath = label["imgpaths"][i] img = utils_pipeline.load_image(imgpath) images_2d.append(img) except cv2.error: print("One of the paths not found:", label["imgpaths"]) continue if dataset_use == "human36m": for i in range(len(images_2d)): # Since the images don't have the same shape, rescale some of them img = images_2d[i] ishape = img.shape if ishape != (1000, 1000, 3): cam = label["cameras"][i] cam["K"][1][1] = cam["K"][1][1] * (1000 / ishape[0]) cam["K"][1][2] = cam["K"][1][2] * (1000 / ishape[0]) cam["K"][0][0] = cam["K"][0][0] * (1000 / ishape[1]) cam["K"][0][2] = cam["K"][0][2] * (1000 / ishape[1]) images_2d[i] = cv2.resize(img, (1000, 1000)) # Convert image format to Bayer encoding to simulate real camera input # This also resulted in notably better MPJPE results in most cases, presumbly since the # demosaicing algorithm from OpenCV is better than the default one from the cameras for i in range(len(images_2d)): images_2d[i] = utils_pipeline.rgb2bayer(images_2d[i]) time_imgs = time.time() - start start = time.time() for i in range(len(images_2d)): images_2d[i] = utils_pipeline.bayer2rgb(images_2d[i]) poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) poses_2d = utils_pipeline.update_keypoints(poses_2d, joint_names_2d, whole_body) time_2d = time.time() - start all_poses_2d.append(poses_2d) times.append([time_imgs, time_2d, 0]) print("\nCalculating 3D predictions ...") all_poses_3d = [] all_ids = [] triangulator = rpt.Triangulator( min_match_score=min_match_score, min_group_size=min_group_size ) old_scene = "" old_index = -1 for i in tqdm.tqdm(range(len(labels))): label = labels[i] poses_2d = all_poses_2d[i] if old_scene != label.get("scene", "") or ( old_index + datasets[dataset_use]["take_interval"] < label["index"] ): # Reset last poses if scene changes old_scene = label.get("scene", "") triangulator.reset() start = time.time() if sum(np.sum(p) for p in poses_2d) == 0: poses3D = np.zeros([1, len(joint_names_3d), 4]).tolist() else: rpt_cameras = rpt.convert_cameras(label["cameras"]) roomparams = [label["room_size"], label["room_center"]] poses3D = triangulator.triangulate_poses( poses_2d, rpt_cameras, roomparams, joint_names_2d ) time_3d = time.time() - start old_index = label["index"] all_poses_3d.append(np.array(poses3D).tolist()) all_ids.append(label["id"]) times[i][2] = time_3d # Print per-step timings warmup_iters = 10 if len(times) > warmup_iters: times = times[warmup_iters:] avg_time_im = np.mean([t[0] for t in times]) avg_time_2d = np.mean([t[1] for t in times]) avg_time_3d = np.mean([t[2] for t in times]) tstats = { "img_loading": avg_time_im, "avg_time_2d": avg_time_2d, "avg_time_3d": avg_time_3d, "avg_fps": 1.0 / (avg_time_2d + avg_time_3d), } print("\nMetrics:") print(json.dumps(tstats, indent=2)) triangulator.print_stats() _ = evals.mpjpe.run_eval( labels, all_poses_3d, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, save_error_imgs=output_dir, ) _ = evals.pcp.run_eval( labels, all_poses_3d, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, replace_head_with_nose=True, ) # ================================================================================================== if __name__ == "__main__": main()