import json import os import sys import time import cv2 import matplotlib import numpy as np import tqdm import test_triangulate import utils_2d_pose from skelda import evals sys.path.append("/RapidPoseTriangulation/swig/") import rpt # ================================================================================================== # dataset_use = "panoptic" dataset_use = "human36m" # dataset_use = "mvor" # dataset_use = "shelf" # dataset_use = "campus" # dataset_use = "ikeaasm" # dataset_use = "chi3d" # dataset_use = "tsinghua" # dataset_use = "human36m_wb" # dataset_use = "egohumans" datasets = { "panoptic": { "path": "/datasets/panoptic/skelda/test.json", "cams": ["00_03", "00_06", "00_12", "00_13", "00_23"], # "cams": ["00_03", "00_06", "00_12"], # "cams": ["00_03", "00_06", "00_12", "00_13", "00_23", "00_15", "00_10", "00_21", "00_09", "00_01"], "take_interval": 3, "use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"], }, "human36m": { "path": "/datasets/human36m/skelda/pose_test.json", "take_interval": 5, }, "mvor": { "path": "/datasets/mvor/skelda/all.json", "take_interval": 1, "with_depth": False, }, "campus": { "path": "/datasets/campus/skelda/test.json", "take_interval": 1, }, "shelf": { "path": "/datasets/shelf/skelda/test.json", "take_interval": 1, }, "ikeaasm": { "path": "/datasets/ikeaasm/skelda/test.json", "take_interval": 2, }, "chi3d": { "path": "/datasets/chi3d/skelda/all.json", "take_interval": 5, }, "tsinghua": { "path": "/datasets/tsinghua/skelda/test.json", "take_interval": 3, }, "human36m_wb": { "path": "/datasets/human36m/skelda/wb/test.json", "take_interval": 100, }, "egohumans": { "path": "/datasets/egohumans/skelda/all.json", "take_interval": 2, # "subset": "tagging", "subset": "legoassemble", # "subset": "fencing", # "subset": "basketball", # "subset": "volleyball", # "subset": "badminton", # "subset": "tennis", }, } joint_names_2d = test_triangulate.joint_names_2d joint_names_3d = list(joint_names_2d) eval_joints = [ "head", "shoulder_left", "shoulder_right", "elbow_left", "elbow_right", "wrist_left", "wrist_right", "hip_left", "hip_right", "knee_left", "knee_right", "ankle_left", "ankle_right", ] if dataset_use in ["human36m", "panoptic"]: eval_joints[eval_joints.index("head")] = "nose" if dataset_use.endswith("_wb"): # eval_joints[eval_joints.index("head")] = "nose" eval_joints = list(joint_names_2d) # output_dir = "/RapidPoseTriangulation/data/testoutput/" output_dir = "" # ================================================================================================== def load_json(path: str): with open(path, "r", encoding="utf-8") as file: data = json.load(file) return data # ================================================================================================== def load_labels(dataset: dict): """Load labels by dataset description""" if "panoptic" in dataset: labels = load_json(dataset["panoptic"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 1500 < 90] # Filter by maximum number of persons labels = [l for l in labels if len(l["bodies3D"]) <= 10] # Filter scenes if "use_scenes" in dataset["panoptic"]: labels = [ l for l in labels if l["scene"] in dataset["panoptic"]["use_scenes"] ] # Filter cameras if not "cameras_depth" in labels[0]: for label in labels: for i, cam in reversed(list(enumerate(label["cameras"]))): if cam["name"] not in dataset["panoptic"]["cams"]: label["cameras"].pop(i) label["imgpaths"].pop(i) elif "human36m" in dataset: labels = load_json(dataset["human36m"]["path"]) labels = [lb for lb in labels if lb["subject"] == "S9"] labels = [lb for i, lb in enumerate(labels) if i % 4000 < 150] for label in labels: label.pop("action") label.pop("frame") elif "mvor" in dataset: labels = load_json(dataset["mvor"]["path"]) # Rename keys for label in labels: label["cameras_color"] = label["cameras"] label["imgpaths_color"] = label["imgpaths"] elif "ikeaasm" in dataset: labels = load_json(dataset["ikeaasm"]["path"]) cams0 = str(labels[0]["cameras"]) labels = [lb for lb in labels if str(lb["cameras"]) == cams0] elif "shelf" in dataset: labels = load_json(dataset["shelf"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "campus" in dataset: labels = load_json(dataset["campus"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "tsinghua" in dataset: labels = load_json(dataset["tsinghua"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if lb["seq"] == "seq_1"] labels = [lb for i, lb in enumerate(labels) if i % 300 < 90] for label in labels: label["bodyids"] = list(range(len(label["bodies3D"]))) elif "chi3d" in dataset: labels = load_json(dataset["chi3d"]["path"]) labels = [lb for lb in labels if lb["setup"] == "s03"] labels = [lb for i, lb in enumerate(labels) if i % 2000 < 150] elif "human36m_wb" in dataset: labels = load_json(dataset["human36m_wb"]["path"]) elif "egohumans" in dataset: labels = load_json(dataset["egohumans"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if dataset["egohumans"]["subset"] in lb["seq"]] if dataset["egohumans"]["subset"] in ["volleyball", "tennis"]: labels = [lb for i, lb in enumerate(labels) if i % 150 < 60] else: raise ValueError("Dataset not available") # Optionally drop samples to speed up train/eval if "take_interval" in dataset: take_interval = dataset["take_interval"] if take_interval > 1: labels = [l for i, l in enumerate(labels) if i % take_interval == 0] return labels # ================================================================================================== def main(): global joint_names_3d, eval_joints whole_body = test_triangulate.whole_body if any((whole_body[k] for k in whole_body)): kpt_model = utils_2d_pose.load_wb_model() else: kpt_model = utils_2d_pose.load_model() # Manually set matplotlib backend try: matplotlib.use("TkAgg") except ImportError: print("WARNING: Using headless mode, no visualizations will be shown.") print("Loading dataset ...") labels = load_labels( { dataset_use: datasets[dataset_use], "take_interval": datasets[dataset_use]["take_interval"], } ) # Print a dataset sample for debugging print(labels[0]) minscores = { # Describes how good two 2D poses need to match each other to create a valid triangulation # If the quality of the 2D detections is poor, use a lower value "panoptic": 0.94, "human36m": 0.94, "mvor": 0.86, "campus": 0.96, "shelf": 0.96, "ikeaasm": 0.89, "chi3d": 0.94, "tsinghua": 0.96, "egohumans": 0.95, "human36m_wb": 0.94, } minscore = minscores.get(dataset_use, 0.95) min_group_sizes = { # Describes the minimum number of camera pairs that need to detect the same person # If the number of cameras is high, and the views are not occluded, use a higher value "panoptic": 1, "shelf": 2, "chi3d": 1, "tsinghua": 2, "egohumans": 4, } min_group_size = min_group_sizes.get(dataset_use, 1) if dataset_use == "panoptic" and len(datasets["panoptic"]["cams"]) == 10: min_group_size = 4 if dataset_use == "egohumans" and ( "lego" in labels[0]["seq"] or "tagging" in labels[0]["seq"] ): min_group_size = 2 if dataset_use == "egohumans" and ( "volleyball" in labels[0]["seq"] or "badminton" in labels[0]["seq"] ): min_group_size = 7 if dataset_use == "egohumans" and "tennis" in labels[0]["seq"]: min_group_size = 11 print("\nRunning predictions ...") all_poses = [] all_ids = [] times = [] triangulator = rpt.Triangulator( min_match_score=minscore, min_group_size=min_group_size ) old_scene = "" old_index = -1 for label in tqdm.tqdm(labels): images_2d = [] if old_scene != label.get("scene", "") or ( old_index + datasets[dataset_use]["take_interval"] < label["index"] ): # Reset last poses if scene changes old_scene = label.get("scene", "") triangulator.reset() try: start = time.time() for i in range(len(label["imgpaths"])): imgpath = label["imgpaths"][i] img = test_triangulate.load_image(imgpath) images_2d.append(img) print("IMG time:", time.time() - start) except cv2.error: print("One of the paths not found:", label["imgpaths"]) continue if dataset_use == "human36m": for i in range(len(images_2d)): # Since the images don't have the same shape, rescale some of them img = images_2d[i] ishape = img.shape if ishape != (1000, 1000, 3): cam = label["cameras"][i] cam["K"][1][1] = cam["K"][1][1] * (1000 / ishape[0]) cam["K"][1][2] = cam["K"][1][2] * (1000 / ishape[0]) cam["K"][0][0] = cam["K"][0][0] * (1000 / ishape[1]) cam["K"][0][2] = cam["K"][0][2] * (1000 / ishape[1]) images_2d[i] = cv2.resize(img, (1000, 1000)) start = time.time() poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d) time_2d = time.time() - start print("2D time:", time_2d) start = time.time() if sum(np.sum(p) for p in poses_2d) == 0: poses3D = np.zeros([1, len(joint_names_3d), 4]).tolist() else: rpt_cameras = rpt.convert_cameras(label["cameras"]) roomparams = [label["room_size"], label["room_center"]] poses3D = triangulator.triangulate_poses( poses_2d, rpt_cameras, roomparams, joint_names_2d ) time_3d = time.time() - start print("3D time:", time_3d) old_index = label["index"] all_poses.append(np.array(poses3D).tolist()) all_ids.append(label["id"]) times.append((time_2d, time_3d)) # Print per-step triangulation timings print("") triangulator.print_stats() warmup_iters = 10 if len(times) > warmup_iters: times = times[warmup_iters:] avg_time_2d = np.mean([t[0] for t in times]) avg_time_3d = np.mean([t[1] for t in times]) tstats = { "avg_time_2d": avg_time_2d, "avg_time_3d": avg_time_3d, "avg_fps": 1.0 / (avg_time_2d + avg_time_3d), } print("\nMetrics:") print(json.dumps(tstats, indent=2)) _ = evals.mpjpe.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, save_error_imgs=output_dir, ) _ = evals.pcp.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, replace_head_with_nose=True, ) # ================================================================================================== if __name__ == "__main__": main()