import json import os import time import cv2 import matplotlib import numpy as np import tqdm import test_triangulate import triangulate_poses import utils_2d_pose from skelda import evals, utils_pose # ================================================================================================== # dataset_use = "panoptic" dataset_use = "human36m" # dataset_use = "mvor" # dataset_use = "shelf" # dataset_use = "campus" # dataset_use = "ikeaasm" # dataset_use = "tsinghua" # dataset_use = "human36m_wb" datasets = { "panoptic": { "path": "/datasets/panoptic/skelda/test.json", "cams": ["00_03", "00_06", "00_12", "00_13", "00_23"], "take_interval": 3, "use_scenes": ["160906_pizza1", "160422_haggling1", "160906_ian5"], }, "human36m": { "path": "/datasets/human36m/skelda/pose_test.json", "take_interval": 5, }, "mvor": { "path": "/datasets/mvor/skelda/all.json", "take_interval": 1, "with_depth": False, }, "ikeaasm": { "path": "/datasets/ikeaasm/skelda/test.json", "take_interval": 2, }, "campus": { "path": "/datasets/campus/skelda/test.json", "take_interval": 1, }, "shelf": { "path": "/datasets/shelf/skelda/test.json", "take_interval": 1, }, "tsinghua": { "path": "/datasets/tsinghua/skelda/test.json", "take_interval": 3, }, "human36m_wb": { "path": "/datasets/human36m/skelda/wb/test.json", "take_interval": 100, }, } joint_names_2d = test_triangulate.joint_names_2d joint_names_3d = list(joint_names_2d) eval_joints = [ "head", "shoulder_left", "shoulder_right", "elbow_left", "elbow_right", "wrist_left", "wrist_right", "hip_left", "hip_right", "knee_left", "knee_right", "ankle_left", "ankle_right", ] if dataset_use in ["human36m", "panoptic"]: eval_joints[eval_joints.index("head")] = "nose" if dataset_use.endswith("_wb"): # eval_joints[eval_joints.index("head")] = "nose" eval_joints = list(joint_names_2d) # output_dir = "/SimplePoseTriangulation/data/testoutput/" output_dir = "" # ================================================================================================== def load_json(path: str): with open(path, "r", encoding="utf-8") as file: data = json.load(file) return data # ================================================================================================== def load_labels(dataset: dict): """Load labels by dataset description""" if "panoptic" in dataset: labels = load_json(dataset["panoptic"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 1500 < 90] # Filter by maximum number of persons labels = [l for l in labels if len(l["bodies3D"]) <= 10] # Filter scenes if "use_scenes" in dataset["panoptic"]: labels = [ l for l in labels if l["scene"] in dataset["panoptic"]["use_scenes"] ] # Filter cameras if not "cameras_depth" in labels[0]: for label in labels: for i, cam in reversed(list(enumerate(label["cameras"]))): if cam["name"] not in dataset["panoptic"]["cams"]: label["cameras"].pop(i) label["imgpaths"].pop(i) elif "human36m" in dataset: labels = load_json(dataset["human36m"]["path"]) labels = [lb for lb in labels if lb["subject"] == "S9"] labels = [lb for i, lb in enumerate(labels) if i % 4000 < 150] for label in labels: label.pop("action") label.pop("frame") elif "mvor" in dataset: labels = load_json(dataset["mvor"]["path"]) # Rename keys for label in labels: label["cameras_color"] = label["cameras"] label["imgpaths_color"] = label["imgpaths"] elif "ikeaasm" in dataset: labels = load_json(dataset["ikeaasm"]["path"]) labels = [lb for i, lb in enumerate(labels) if i % 300 < 72] elif "shelf" in dataset: labels = load_json(dataset["shelf"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "campus" in dataset: labels = load_json(dataset["campus"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] elif "tsinghua" in dataset: labels = load_json(dataset["tsinghua"]["path"]) labels = [lb for lb in labels if "test" in lb["splits"]] labels = [lb for i, lb in enumerate(labels) if i % 800 < 90] for label in labels: label["bodyids"] = list(range(len(label["bodies3D"]))) elif "human36m_wb" in dataset: labels = load_json(dataset["human36m_wb"]["path"]) else: raise ValueError("Dataset not available") # Optionally drop samples to speed up train/eval if "take_interval" in dataset: take_interval = dataset["take_interval"] if take_interval > 1: labels = [l for i, l in enumerate(labels) if i % take_interval == 0] # Filter joints fj_func = lambda x: utils_pose.filter_joints_3d(x, eval_joints) labels = list(map(fj_func, labels)) return labels # ================================================================================================== def add_extra_joints(poses3D, poses2D, joint_names_3d): # Update "head" joint as average of "ear" joints idx_h = joint_names_3d.index("head") idx_el = joint_names_3d.index("ear_left") idx_er = joint_names_3d.index("ear_right") for i in range(len(poses3D)): if poses3D[i, idx_h, 3] == 0: ear_left = poses3D[i, idx_el] ear_right = poses3D[i, idx_er] if ear_left[3] > 0.1 and ear_right[3] > 0.1: head = (ear_left + ear_right) / 2 head[3] = min(ear_left[3], ear_right[3]) poses3D[i, idx_h] = head for j in range(len(poses2D)): ear_left = poses2D[j][i, idx_el] ear_right = poses2D[j][i, idx_er] if ear_left[2] > 0.1 and ear_right[2] > 0.1: head = (ear_left + ear_right) / 2 head[2] = min(ear_left[2], ear_right[2]) poses2D[j][i, idx_h] = head return poses3D, poses2D # ================================================================================================== def add_missing_joints(poses3D, joint_names_3d): """Replace missing joints with their nearest adjacent joints""" adjacents = { "hip_right": ["hip_middle", "hip_left"], "hip_left": ["hip_middle", "hip_right"], "knee_right": ["hip_right", "knee_left"], "knee_left": ["hip_left", "knee_right"], "ankle_right": ["knee_right", "ankle_left"], "ankle_left": ["knee_left", "ankle_right"], "shoulder_right": ["shoulder_middle", "shoulder_left"], "shoulder_left": ["shoulder_middle", "shoulder_right"], "elbow_right": ["shoulder_right", "hip_right"], "elbow_left": ["shoulder_left", "hip_left"], "wrist_right": ["elbow_right"], "wrist_left": ["elbow_left"], "nose": ["shoulder_middle", "shoulder_right", "shoulder_left"], "head": ["shoulder_middle", "shoulder_right", "shoulder_left"], "foot_*_left_*": ["ankle_left"], "foot_*_right_*": ["ankle_right"], "face_*": ["nose"], "hand_*_left_*": ["wrist_left"], "hand_*_right_*": ["wrist_right"], } for i in range(len(poses3D)): valid_joints = np.where(poses3D[i, :, 3] > 0.1)[0] body_center = np.mean(poses3D[i, valid_joints, :3], axis=0) for j in range(len(joint_names_3d)): adname = "" if joint_names_3d[j][0:5] == "foot_" and "_left" in joint_names_3d[j]: adname = "foot_*_left_*" elif joint_names_3d[j][0:5] == "foot_" and "_right" in joint_names_3d[j]: adname = "foot_*_right_*" elif joint_names_3d[j][0:5] == "face_": adname = "face_*" elif joint_names_3d[j][0:5] == "hand_" and "_left" in joint_names_3d[j]: adname = "hand_*_left_*" elif joint_names_3d[j][0:5] == "hand_" and "_right" in joint_names_3d[j]: adname = "hand_*_right_*" elif joint_names_3d[j] in adjacents: adname = joint_names_3d[j] if adname == "": continue if poses3D[i, j, 3] == 0: if joint_names_3d[j] in adjacents or joint_names_3d[j][0:5] in [ "foot_", "face_", "hand_", ]: adjacent_joints = [ poses3D[i, joint_names_3d.index(a), :] for a in adjacents[adname] ] adjacent_joints = [a[0:3] for a in adjacent_joints if a[3] > 0.1] if len(adjacent_joints) > 0: poses3D[i, j, :3] = np.mean(adjacent_joints, axis=0) else: poses3D[i, j, :3] = body_center else: poses3D[i, j, :3] = body_center poses3D[i, j, 3] = 0.1 return poses3D # ================================================================================================== def main(): global joint_names_3d, eval_joints whole_body = test_triangulate.whole_body if any((whole_body[k] for k in whole_body)): kpt_model = utils_2d_pose.load_wb_model() else: kpt_model = utils_2d_pose.load_model() # Manually set matplotlib backend try: matplotlib.use("TkAgg") except ImportError: print("WARNING: Using headless mode, no visualizations will be shown.") print("Loading dataset ...") labels = load_labels( { dataset_use: datasets[dataset_use], "take_interval": datasets[dataset_use]["take_interval"], } ) # Print a dataset sample for debugging print(labels[0]) print("\nRunning predictions ...") all_poses = [] all_ids = [] all_paths = [] times = [] for label in tqdm.tqdm(labels): images_2d = [] try: start = time.time() for i in range(len(label["imgpaths"])): imgpath = label["imgpaths"][i] img = test_triangulate.load_image(imgpath) images_2d.append(img) print("IMG time:", time.time() - start) except cv2.error: print("One of the paths not found:", label["imgpaths"]) continue if dataset_use == "human36m": for i in range(len(images_2d)): # Since the images don't have the same shape, rescale some of them img = images_2d[i] ishape = img.shape if ishape != (1000, 1000, 3): cam = label["cameras"][i] cam["K"][1][1] = cam["K"][1][1] * (1000 / ishape[0]) cam["K"][1][2] = cam["K"][1][2] * (1000 / ishape[0]) cam["K"][0][0] = cam["K"][0][0] * (1000 / ishape[1]) cam["K"][0][2] = cam["K"][0][2] * (1000 / ishape[1]) images_2d[i] = cv2.resize(img, (1000, 1000)) roomparams = { "room_size": label["room_size"], "room_center": label["room_center"], } start = time.time() poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d) time_2d = time.time() - start print("2D time:", time_2d) start = time.time() if sum(np.sum(p) for p in poses_2d) == 0: poses3D = np.zeros([1, len(joint_names_3d), 4]) poses2D = np.zeros([len(images_2d), 1, len(joint_names_3d), 3]) else: poses3D = triangulate_poses.get_3d_pose( poses_2d, label["cameras"], roomparams, joint_names_2d ) poses2D = [] for cam in label["cameras"]: poses_2d, _ = utils_pose.project_poses(poses3D, cam) poses2D.append(poses_2d) poses3D, poses2D = add_extra_joints(poses3D, poses2D, joint_names_3d) poses3D, poses2D = test_triangulate.filter_poses( poses3D, poses2D, roomparams, joint_names_3d, drop_few_limbs=(dataset_use != "mvor"), ) poses3D = add_missing_joints(poses3D, joint_names_3d) time_3d = time.time() - start print("3D time:", time_3d) all_poses.append(poses3D) all_ids.append(label["id"]) all_paths.append(label["imgpaths"]) times.append((time_2d, time_3d)) warmup_iters = 10 if len(times) > warmup_iters: times = times[warmup_iters:] avg_time_2d = np.mean([t[0] for t in times]) avg_time_3d = np.mean([t[1] for t in times]) tstats = { "avg_time_2d": avg_time_2d, "avg_time_3d": avg_time_3d, "avg_fps": 1.0 / (avg_time_2d + avg_time_3d), } print("\nMetrics:") print(json.dumps(tstats, indent=2)) _ = evals.mpjpe.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, save_error_imgs=output_dir, ) _ = evals.pcp.run_eval( labels, all_poses, all_ids, joint_names_net=joint_names_3d, joint_names_use=eval_joints, replace_head_with_nose=True, ) # ================================================================================================== if __name__ == "__main__": main()