import json import os import sys import time import cv2 import matplotlib import numpy as np import tqdm import test_triangulate import utils_2d_pose from skelda import evals, utils_pose sys.path.append("/SimplePoseTriangulation/swig/") import spt # ================================================================================================== # dataset_use = "panoptic" dataset_use = "human36m" # dataset_use = "mvor" # dataset_use = "shelf" # dataset_use = "campus" # dataset_use = "ikeaasm" # dataset_use = "tsinghua" # dataset_use = "human36m_wb" datasets = { "panoptic": { "path": "/datasets/panoptic/skelda/test.json", "cams": ["00_03", "00_06", "00_12", "00_13", "00_23"], # "cams": ["00_03", "00_06", "00_12"], # "cams": ["00_03", "00_06", "00_12", "00_13", "00_23", "00_15", "00_10", "00_21", "00_09", "00_01"], "take_interval": 3, "use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"], }, "human36m": { "path": "/datasets/human36m/skelda/pose_test.json", "take_interval": 5, }, "mvor": { "path": "/datasets/mvor/skelda/all.json", "take_interval": 1, "with_depth": False, }, "ikeaasm": { "path": "/datasets/ikeaasm/skelda/test.json", "take_interval": 2, }, "campus": { "path": "/datasets/campus/skelda/test.json", "take_interval": 1, }, "shelf": { "path": "/datasets/shelf/skelda/test.json", "take_interval": 1, }, "tsinghua": { "path": "/datasets/tsinghua/skelda/test.json", "take_interval": 3, }, "human36m_wb": { "path": "/datasets/human36m/skelda/wb/test.json", "take_interval": 100, }, } joint_names_2d = test_triangulate.joint_names_2d joint_names_3d = list(joint_names_2d) eval_joints = [ "head", "shoulder_left", "shoulder_right", "elbow_left", "elbow_right", "wrist_left", "wrist_right", "hip_left", "hip_right", "knee_left", "knee_right", "ankle_left", "ankle_right", ] if dataset_use in ["human36m", "panoptic"]: eval_joints[eval_joints.index("head")] = "nose" if dataset_use.endswith("_wb"): # eval_joints[eval_joints.index("head")] = "nose" eval_joints = list(joint_names_2d) # output_dir = "/SimplePoseTriangulation/data/testoutput/" output_dir = "" # ================================================================================================== def load_json(path: str): with open(path, "r", encoding="utf-8") as file: data = json.load(file) return data # ================================================================================================== def load_labels(dataset: dict): """Load labels by dataset description""" if "panoptic" in dataset: labels = load_json(dataset["panoptic"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 1500 < 90] # Filter by maximum number of persons labels = [l for l in labels if len(l["bodies3D"]) <= 10] # Filter scenes if "use_scenes" in dataset["panoptic"]: labels = [ l for l in labels if l["scene"] in dataset["panoptic"]["use_scenes"] ] # Filter cameras if not "cameras_depth" in labels[0]: for label in labels: for i, cam in reversed(list(enumerate(label["cameras"]))): if cam["name"] not in dataset["panoptic"]["cams"]: label["cameras"].pop(i) label["imgpaths"].pop(i) elif "human36m" in dataset: labels = load_json(dataset["human36m"]["path"]) labels = [lb for lb in labels if lb["subject"] == "S9"] labels = [lb for i, lb in enumerate(labels) if i % 4000 < 150] for label in labels: label.pop("action") label.pop("frame") elif "mvor" in dataset: labels = load_json(dataset["mvor"]["path"]) # Rename keys for label in labels: label["cameras_color"] = label["cameras"] label["imgpaths_color"] = label["imgpaths"] elif "ikeaasm" in dataset: labels = load_json(dataset["ikeaasm"]["path"]) cams0 = str(labels[0]["cameras"]) labels = [lb for lb in labels if str(lb["cameras"]) == cams0] elif "shelf" in dataset: labels = load_json(dataset["shelf"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "campus" in dataset: labels = load_json(dataset["campus"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "tsinghua" in dataset: labels = load_json(dataset["tsinghua"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for lb in labels if lb["seq"] == "seq_1"] labels = [lb for i, lb in enumerate(labels) if i % 300 < 90] for label in labels: label["bodyids"] = list(range(len(label["bodies3D"]))) elif "human36m_wb" in dataset: labels = load_json(dataset["human36m_wb"]["path"]) else: raise ValueError("Dataset not available") # Optionally drop samples to speed up train/eval if "take_interval" in dataset: take_interval = dataset["take_interval"] if take_interval > 1: labels = [l for i, l in enumerate(labels) if i % take_interval == 0] # Filter joints fj_func = lambda x: utils_pose.filter_joints_3d(x, eval_joints) labels = list(map(fj_func, labels)) return labels # ================================================================================================== def main(): global joint_names_3d, eval_joints whole_body = test_triangulate.whole_body if any((whole_body[k] for k in whole_body)): kpt_model = utils_2d_pose.load_wb_model() else: kpt_model = utils_2d_pose.load_model() # Manually set matplotlib backend try: matplotlib.use("TkAgg") except ImportError: print("WARNING: Using headless mode, no visualizations will be shown.") print("Loading dataset ...") labels = load_labels( { dataset_use: datasets[dataset_use], "take_interval": datasets[dataset_use]["take_interval"], } ) # Print a dataset sample for debugging print(labels[0]) minscores = { # Choose this depending on the fraction of invalid/missing persons # A higher value reduces the number of proposals "panoptic": 0.94, "human36m": 0.94, "mvor": 0.86, "campus": 0.96, "shelf": 0.96, "ikeaasm": 0.89, "tsinghua": 0.96, "human36m_wb": 0.94, "koarob": 0.91, } minscore = minscores.get(dataset_use, 0.95) min_group_sizes = { # If the number of cameras is high, and the views are not occluded, use a higher value "panoptic": 1, "shelf": 2, "tsinghua": 2, } min_group_size = min_group_sizes.get(dataset_use, 1) if dataset_use == "panoptic" and len(datasets["panoptic"]["cams"]) == 10: min_group_size = 5 print("\nRunning predictions ...") all_poses = [] all_ids = [] all_paths = [] times = [] triangulator = spt.Triangulator(min_score=minscore, min_group_size=min_group_size) old_scene = "" for label in tqdm.tqdm(labels): images_2d = [] if old_scene != label.get("scene", "") or dataset_use == "human36m_wb": # Reset last poses if scene changes old_scene = label.get("scene", "") triangulator.reset() try: start = time.time() for i in range(len(label["imgpaths"])): imgpath = label["imgpaths"][i] img = test_triangulate.load_image(imgpath) images_2d.append(img) print("IMG time:", time.time() - start) except cv2.error: print("One of the paths not found:", label["imgpaths"]) continue if dataset_use == "human36m": for i in range(len(images_2d)): # Since the images don't have the same shape, rescale some of them img = images_2d[i] ishape = img.shape if ishape != (1000, 1000, 3): cam = label["cameras"][i] cam["K"][1][1] = cam["K"][1][1] * (1000 / ishape[0]) cam["K"][1][2] = cam["K"][1][2] * (1000 / ishape[0]) cam["K"][0][0] = cam["K"][0][0] * (1000 / ishape[1]) cam["K"][0][2] = cam["K"][0][2] * (1000 / ishape[1]) images_2d[i] = cv2.resize(img, (1000, 1000)) start = time.time() poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d) time_2d = time.time() - start print("2D time:", time_2d) start = time.time() if sum(np.sum(p) for p in poses_2d) == 0: poses3D = np.zeros([1, len(joint_names_3d), 4]).tolist() else: spt_cameras = spt.convert_cameras(label["cameras"]) roomparams = [label["room_size"], label["room_center"]] poses3D = triangulator.triangulate_poses( poses_2d, spt_cameras, roomparams, joint_names_2d ) time_3d = time.time() - start print("3D time:", time_3d) all_poses.append(np.array(poses3D)) all_ids.append(label["id"]) all_paths.append(label["imgpaths"]) times.append((time_2d, time_3d)) # Print per-step triangulation timings print("") triangulator.print_stats() warmup_iters = 10 if len(times) > warmup_iters: times = times[warmup_iters:] avg_time_2d = np.mean([t[0] for t in times]) avg_time_3d = np.mean([t[1] for t in times]) tstats = { "avg_time_2d": avg_time_2d, "avg_time_3d": avg_time_3d, "avg_fps": 1.0 / (avg_time_2d + avg_time_3d), } print("\nMetrics:") print(json.dumps(tstats, indent=2)) _ = evals.mpjpe.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, save_error_imgs=output_dir, ) _ = evals.pcp.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, replace_head_with_nose=True, ) # ================================================================================================== if __name__ == "__main__": main()