diff --git a/scripts/test_triangulate.py b/scripts/test_triangulate.py new file mode 100644 index 0000000..495abfb --- /dev/null +++ b/scripts/test_triangulate.py @@ -0,0 +1,333 @@ +import copy +import json +import os +from typing import List + +import cv2 +import matplotlib +import numpy as np + +import draw_utils +import triangulate_poses +import utils_2d_pose +from skelda import utils_pose + +# ================================================================================================== + +filepath = os.path.dirname(os.path.realpath(__file__)) + "/" +test_img_dir = filepath + "../data/" + +joint_names_2d = [ + "nose", + "eye_left", + "eye_right", + "ear_left", + "ear_right", + "shoulder_left", + "shoulder_right", + "elbow_left", + "elbow_right", + "wrist_left", + "wrist_right", + "hip_left", + "hip_right", + "knee_left", + "knee_right", + "ankle_left", + "ankle_right", +] +joint_names_2d.extend( + [ + "hip_middle", + "shoulder_middle", + "head", + ] +) +joint_names_3d = list(joint_names_2d) + +main_limbs = [ + ("shoulder_left", "elbow_left"), + ("elbow_left", "wrist_left"), + ("shoulder_right", "elbow_right"), + ("elbow_right", "wrist_right"), + ("hip_left", "knee_left"), + ("knee_left", "ankle_left"), + ("hip_right", "knee_right"), + ("knee_right", "ankle_right"), +] + +# ================================================================================================== + + +def update_sample(sample, new_dir=""): + sample = copy.deepcopy(sample) + + # Rename image paths + sample["imgpaths"] = [ + os.path.join(new_dir, os.path.basename(v)) for v in sample["imgpaths"] + ] + + # Add placeholders for missing keys + sample["cameras_color"] = sample["cameras"] + sample["imgpaths_color"] = sample["imgpaths"] + sample["cameras_depth"] = [] + + return sample + + +# ================================================================================================== + + +def load_image(path: str): + image = cv2.imread(path, 3) + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + image = np.array(image, dtype=np.float32) + return image + + +# ================================================================================================== + + +def filter_poses(poses3D, poses2D, roomparams, joint_names, drop_few_limbs=True): + drop = [] + for i, pose in enumerate(poses3D): + pose = np.array(pose) + valid_joints = [j for j in pose if j[-1] > 0.1] + + # Drop persons with too few joints + if np.sum(pose[..., -1] > 0.1) < 5: + drop.append(i) + continue + + # Drop too large or too small persons + mins = np.min(valid_joints, axis=0) + maxs = np.max(valid_joints, axis=0) + diff = maxs - mins + if any(((d > 2.3) for d in diff)): + drop.append(i) + continue + if all(((d < 0.4) for d in diff)): + drop.append(i) + continue + if ( + (diff[0] < 0.2 and diff[1] < 0.2) + or (diff[1] < 0.2 and diff[2] < 0.2) + or (diff[2] < 0.2 and diff[0] < 0.2) + ): + drop.append(i) + continue + + # Drop persons outside room + mean = np.mean(valid_joints, axis=0) + mins = np.min(valid_joints, axis=0) + maxs = np.max(valid_joints, axis=0) + rsize = [r / 2 for r in roomparams["room_size"]] + rcent = roomparams["room_center"] + if any( + ( + # Center of mass outside room + mean[j] > rsize[j] + rcent[j] or mean[j] < -rsize[j] + rcent[j] + for j in range(3) + ) + ) or any( + ( + # One limb more than 10cm outside room + maxs[j] > rsize[j] + rcent[j] + 0.1 + or mins[j] < -rsize[j] + rcent[j] - 0.1 + for j in range(3) + ) + ): + drop.append(i) + continue + + if drop_few_limbs: + # Drop persons with less than 3 limbs + found_limbs = 0 + for limb in main_limbs: + start_idx = joint_names.index(limb[0]) + end_idx = joint_names.index(limb[1]) + if pose[start_idx, -1] > 0.1 and pose[end_idx, -1] > 0.1: + found_limbs += 1 + if found_limbs < 3: + drop.append(i) + continue + + # Drop persons with too small average limb length + total_length = 0 + for limb in main_limbs: + start_idx = joint_names.index(limb[0]) + end_idx = joint_names.index(limb[1]) + limb_length = np.linalg.norm(pose[end_idx, :3] - pose[start_idx, :3]) + total_length += limb_length + average_length = total_length / len(main_limbs) + if average_length < 0.1: + drop.append(i) + continue + + new_poses3D = [] + new_poses2D = [[] for _ in range(len(poses2D))] + for i in range(len(poses3D)): + if len(poses3D[i]) != len(joint_names): + # Sometimes some joints of a poor detection are missing + continue + + if i not in drop: + new_poses3D.append(poses3D[i]) + for j in range(len(poses2D)): + new_poses2D[j].append(poses2D[j][i]) + else: + new_pose = np.array(poses3D[i]) + new_pose[..., -1] = 0.001 + new_poses3D.append(new_pose) + for j in range(len(poses2D)): + new_pose = np.array(poses2D[j][i]) + new_pose[..., -1] = 0.001 + new_poses2D[j].append(new_pose) + + new_poses3D = np.array(new_poses3D) + new_poses2D = np.array(new_poses2D) + if new_poses3D.size == 0: + new_poses3D = np.zeros([1, len(joint_names), 4]) + new_poses2D = np.zeros([len(poses2D), 1, len(joint_names), 3]) + + return new_poses3D, new_poses2D + + +# ================================================================================================== + + +def update_keypoints(poses_2d: list, joint_names: List[str]) -> list: + new_views = [] + for view in poses_2d: + new_bodies = [] + for body in view: + body = body.tolist() + + new_body = body[:17] + if whole_body["foots"]: + new_body.extend(body[17:22]) + if whole_body["face"]: + new_body.extend(body[22:90]) + if whole_body["hands"]: + new_body.extend(body[90:]) + body = new_body + + hlid = joint_names.index("hip_left") + hrid = joint_names.index("hip_right") + mid_hip = [ + float(((body[hlid][0] + body[hrid][0]) / 2.0)), + float(((body[hlid][1] + body[hrid][1]) / 2.0)), + min(body[hlid][2], body[hrid][2]), + ] + body.append(mid_hip) + + slid = joint_names.index("shoulder_left") + srid = joint_names.index("shoulder_right") + mid_shoulder = [ + float(((body[slid][0] + body[srid][0]) / 2.0)), + float(((body[slid][1] + body[srid][1]) / 2.0)), + min(body[slid][2], body[srid][2]), + ] + body.append(mid_shoulder) + + elid = joint_names.index("ear_left") + erid = joint_names.index("ear_right") + head = [ + float(((body[elid][0] + body[erid][0]) / 2.0)), + float(((body[elid][1] + body[erid][1]) / 2.0)), + min(body[elid][2], body[erid][2]), + ] + body.append(head) + + new_bodies.append(body) + new_views.append(new_bodies) + + return new_views + + +# ================================================================================================== + + +def main(): + kpt_model = utils_2d_pose.load_model() + + # Manually set matplotlib backend + matplotlib.use("TkAgg") + + for dirname in sorted(os.listdir(test_img_dir)): + dirpath = os.path.join(test_img_dir, dirname) + + if not os.path.isdir(dirpath): + continue + + if (dirname[0] not in ["p", "h"]) or len(dirname) != 2: + continue + + # Load sample infos + with open(os.path.join(dirpath, "sample.json"), "r", encoding="utf-8") as file: + sample = json.load(file) + sample = update_sample(sample, dirpath) + + camparams = sample["cameras_color"] + roomparams = { + "room_size": sample["room_size"], + "room_center": sample["room_center"], + } + + # Load color images + images_2d = [] + for i in range(len(sample["cameras_color"])): + imgpath = sample["imgpaths_color"][i] + img = load_image(imgpath) + images_2d.append(img) + + # Get 2D poses + poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d) + poses_2d = update_keypoints(poses_2d, joint_names_2d) + + fig1 = draw_utils.show_poses2d( + poses_2d, np.array(images_2d), joint_names_2d, "2D detections" + ) + fig1.savefig(os.path.join(dirpath, "2d-k.png"), dpi=fig1.dpi) + # draw_utils.utils_view.show_plots() + + if len(images_2d) == 1: + draw_utils.utils_view.show_plots() + continue + + # Get 3D poses + if sum(np.sum(p) for p in poses_2d) == 0: + poses3D = np.zeros([1, len(joint_names_3d), 4]) + poses2D = np.zeros([len(images_2d), 1, len(joint_names_3d), 3]) + else: + poses3D = triangulate_poses.get_3d_pose(poses_2d, camparams, joint_names_2d) + poses2D = [] + for cam in camparams: + poses_2d, _ = utils_pose.project_poses(poses3D, cam) + poses2D.append(poses_2d) + poses3D, poses2D = filter_poses( + poses3D, + poses2D, + roomparams, + joint_names_3d, + ) + + print("\n" + dirpath) + print(poses3D) + # print(poses2D) + + fig2 = draw_utils.utils_view.show_poses3d( + poses3D, joint_names_3d, roomparams, camparams + ) + fig3 = draw_utils.show_poses2d( + poses2D, np.array(images_2d), joint_names_3d, "2D reprojections" + ) + fig2.savefig(os.path.join(dirpath, "3d-p.png"), dpi=fig2.dpi) + fig3.savefig(os.path.join(dirpath, "2d-p.png"), dpi=fig3.dpi) + draw_utils.utils_view.show_plots() + + +# ================================================================================================== + +if __name__ == "__main__": + main() diff --git a/scripts/triangulate_poses.py b/scripts/triangulate_poses.py new file mode 100644 index 0000000..0af75ae --- /dev/null +++ b/scripts/triangulate_poses.py @@ -0,0 +1,255 @@ +import copy + +import cv2 +import numpy as np + +from skelda import utils_pose + +# ================================================================================================== + +core_joints = [ + "shoulder_middle", + "hip_middle", + "shoulder_left", + "shoulder_right", + "hip_left", + "hip_right", + "elbow_left", + "elbow_right", + "knee_left", + "knee_right", + "wrist_left", + "wrist_right", + "ankle_left", + "ankle_right", +] + +# ================================================================================================== + + +def undistort_points(points: np.ndarray, caminfo: dict): + K = np.array(caminfo["K"], dtype=np.float32) + DC = np.array(caminfo["DC"][0:5], dtype=np.float32) + w = caminfo["width"] + h = caminfo["height"] + + # Undistort camera matrix + newK, _ = cv2.getOptimalNewCameraMatrix(K, DC, (w, h), 1, (w, h)) + caminfo = copy.deepcopy(caminfo) + caminfo["K"] = newK.tolist() + caminfo["DC"] = [0.0, 0.0, 0.0, 0.0, 0.0] + + # Undistort points + pshape = points.shape + points = np.reshape(points, (-1, 1, 2)) + points = cv2.undistortPoints(points, K, DC, P=newK) + points = points.reshape(pshape) + + return points, caminfo + + +# ================================================================================================== + + +def get_camera_P(cam): + """Calculate opencv-style projection matrix""" + + R = np.array(cam["R"]) + T = np.array(cam["T"]) + Tr = R @ (T * -1) + P = cam["K"] @ np.hstack([R, Tr]) + return P + + +# ================================================================================================== + + +def calc_pair_score(pair, poses_2d, camparams, joint_names_2d): + """Triangulates a pair of persons and scores them based on the reprojection error""" + + cam1 = camparams[pair[0][0]] + cam2 = camparams[pair[0][1]] + pose1 = np.array(poses_2d[pair[0][0]][pair[0][2]]) + pose2 = np.array(poses_2d[pair[0][1]][pair[0][3]]) + + # Select core joints + jids = [joint_names_2d.index(j) for j in core_joints] + pose1 = pose1[jids] + pose2 = pose2[jids] + + poses_3d, score = calc_pose_scored(pose1, pose2, cam1, cam2) + return poses_3d, score + + +# ================================================================================================== + + +def calc_pose_scored(pose1, pose2, cam1, cam2): + """Triangulates a pair of persons and scores them based on the reprojection error""" + + # Triangulate points + points1 = pose1[:, 0:2].T + points2 = pose2[:, 0:2].T + P1 = get_camera_P(cam1) + P2 = get_camera_P(cam2) + points3d = cv2.triangulatePoints(P1, P2, points1, points2) + points3d = points3d / points3d[3, :] + points3d = points3d[0:3, :].T + + # Calculate reprojection error + poses_3d = np.expand_dims(points3d, axis=0) + poses_3d = np.concatenate([poses_3d, np.ones([1, poses_3d.shape[1], 1])], axis=-1) + repro1, _ = utils_pose.project_poses(poses_3d, cam1) + repro2, _ = utils_pose.project_poses(poses_3d, cam2) + repro1 = repro1[0] + repro2 = repro2[0] + mask1 = pose1[:, 2] > 0.1 + mask2 = pose2[:, 2] > 0.1 + mask = mask1 & mask2 + error1 = np.linalg.norm(pose1[mask, 0:2] - repro1[mask, 0:2], axis=1) + error2 = np.linalg.norm(pose2[mask, 0:2] - repro2[mask, 0:2], axis=1) + + # Convert errors to a score + error1 = error1 / ((cam1["width"] + cam1["height"]) / 2) + error2 = error2 / ((cam2["width"] + cam2["height"]) / 2) + error = (error1 + error2) / 2 + scores = 1.0 / (1.0 + error * 10) + score = np.mean(scores) + + # Add score to 3D pose + full_scores = np.zeros([poses_3d.shape[1], 1]) + full_scores[mask] = np.expand_dims(scores, axis=-1) + pose_3d = np.concatenate([points3d, full_scores], axis=-1) + + return pose_3d, score + + +# ================================================================================================== + + +def calc_grouping(all_pairs): + """Groups pairs that share a person""" + + # Calculate the pose center for each pair + for i in range(len(all_pairs)): + pair = all_pairs[i] + pose_3d = pair[2][0] + mask = pose_3d[:, 2] > 0.1 + center = np.mean(pose_3d[mask, 0:3], axis=0) + all_pairs[i] = all_pairs[i] + (center,) + + groups = [] + for i in range(len(all_pairs)): + pair = all_pairs[i] + + # Create new group if non exists + if len(groups) == 0: + groups.append([pair]) + continue + + # Check if the pair belongs to an existing group + matched = False + for j in range(len(groups)): + g0 = groups[j][0] + center0 = g0[3] + center1 = pair[3] + if np.linalg.norm(center0 - center1) < 0.5: + pose0 = g0[2][0] + pose1 = pair[2][0] + + # Calculate the distance between the two poses + mask0 = pose0[:, 3] > 0.1 + mask1 = pose1[:, 3] > 0.1 + mask = mask0 & mask1 + dists = np.linalg.norm(pose0[mask, 0:3] - pose1[mask, 0:3], axis=1) + dist = np.mean(dists) + if dist < 0.3: + groups[j].append(pair) + matched = True + break + + # Create new group if no match was found + if not matched: + groups.append([pair]) + + return groups + + +# ================================================================================================== + + +def merge_group(group, poses_2d, camparams): + """Merges a group of poses into a single pose""" + + # Calculate full 3D poses + poses_3d = [] + for pair in group: + cam1 = camparams[pair[0][0]] + cam2 = camparams[pair[0][1]] + pose1 = np.array(poses_2d[pair[0][0]][pair[0][2]]) + pose2 = np.array(poses_2d[pair[0][1]][pair[0][3]]) + + pose_3d, _ = calc_pose_scored(pose1, pose2, cam1, cam2) + poses_3d.append(pose_3d) + + # Merge poses + pose_3d = np.mean(poses_3d, axis=0) + + return pose_3d + + +# ================================================================================================== + + +def get_3d_pose(poses_2d, camparams, joint_names_2d): + + # Undistort 2D points + for i, cam in enumerate(camparams): + poses = np.array(poses_2d[i]) + poses[:, :, 0:2], camparams[i] = undistort_points(poses[:, :, 0:2], cam) + poses_2d[i] = poses.tolist() + + # Create pairs of persons + num_persons = [len(p) for p in poses_2d] + all_pairs = [] + for i in range(len(poses_2d)): + poses = poses_2d[i] + for j in range(i + 1, len(poses_2d)): + poses2 = poses_2d[j] + for k in range(len(poses)): + for l in range(len(poses2)): + pid1 = sum(num_persons[:i]) + k + pid2 = sum(num_persons[:j]) + l + all_pairs.append(((i, j, k, l), (pid1, pid2))) + + # Calculate pair scores + for i in range(len(all_pairs)): + pair = all_pairs[i] + pose_3d, score = calc_pair_score(pair, poses_2d, camparams, joint_names_2d) + all_pairs[i] = all_pairs[i] + ((pose_3d, score),) + + # import draw_utils + # poses3D = np.array([pose_3d]) + # _ = draw_utils.utils_view.show_poses3d( + # poses3D, core_joints, {}, camparams + # ) + # draw_utils.utils_view.show_plots() + + # Drop pairs with low scores + min_score = 0.9 + all_pairs = [p for p in all_pairs if p[2][1] > min_score] + + # Group pairs that share a person + groups = calc_grouping(all_pairs) + + # Merge groups + poses_3d = [] + for group in groups: + pose_3d = merge_group(group, poses_2d, camparams) + poses_3d.append(pose_3d) + + if len(poses_3d) > 0: + poses3D = np.array(poses_3d) + else: + poses3D = np.zeros([1, len(joint_names_2d), 4]) + return poses3D