import copy import math import cv2 import numpy as np from skelda import utils_pose # ================================================================================================== core_joints = [ "shoulder_left", "shoulder_right", "hip_left", "hip_right", "elbow_left", "elbow_right", "knee_left", "knee_right", "wrist_left", "wrist_right", "ankle_left", "ankle_right", ] # ================================================================================================== def undistort_points(points: np.ndarray, caminfo: dict): """Undistorts 2D pixel coordinates""" K = np.asarray(caminfo["K"], dtype=np.float32) DC = np.asarray(caminfo["DC"][0:5], dtype=np.float32) # Undistort camera matrix w = caminfo["width"] h = caminfo["height"] newK, _ = cv2.getOptimalNewCameraMatrix(K, DC, (w, h), 1, (w, h)) caminfo["K"] = newK caminfo["DC"] = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) # Undistort points pshape = points.shape points = np.reshape(points, (-1, 1, 2)) points = cv2.undistortPoints(points, K, DC, P=newK) points = points.reshape(pshape) return points, caminfo # ================================================================================================== def get_camera_P(cam): """Calculate opencv-style projection matrix""" R = np.asarray(cam["R"]) T = np.asarray(cam["T"]) K = np.asarray(cam["K"]) Tr = R @ (T * -1) P = K @ np.hstack([R, Tr]) return P # ================================================================================================== def calc_pose_score(pose1, pose2, dist1, cam1, joint_names, use_joints): """Calculates the score between two poses""" # Select core joints jids = [joint_names.index(j) for j in use_joints] pose1 = pose1[jids] pose2 = pose2[jids] dist1 = dist1[jids] mask = (pose1[:, 2] > 0.1) & (pose2[:, 2] > 0.1) if np.sum(mask) < 3: return 0.0 iscale = (cam1["width"] + cam1["height"]) / 2 scores = score_projection(pose1, pose2, dist1, mask, iscale) score = np.mean(scores) return score # ================================================================================================== def calc_pair_score(pair, poses_2d, camparams, roomparams, joint_names_2d, use_joints): """Triangulates a pair of persons and scores them based on the reprojection error""" cam1 = camparams[pair[0][0]] cam2 = camparams[pair[0][1]] pose1 = poses_2d[pair[0][0]][pair[0][2]] pose2 = poses_2d[pair[0][1]][pair[0][3]] # Select core joints jids = [joint_names_2d.index(j) for j in use_joints] pose1 = pose1[jids] pose2 = pose2[jids] poses_3d, score = triangulate_and_score(pose1, pose2, cam1, cam2, roomparams) return poses_3d, score # ================================================================================================== def score_projection(pose1, repro1, dists1, mask, iscale): min_score = 0.1 error1 = np.linalg.norm(pose1[mask, 0:2] - repro1[mask, 0:2], axis=1) # Set errors of invisible reprojections to a high value penalty = iscale mask1b = (repro1[:, 2] < min_score)[mask] error1[mask1b] = penalty # Scale error by image size and distance to the camera error1 = error1.clip(0, iscale / 4) / iscale dscale1 = np.sqrt(np.mean(dists1[mask]) / 3.5) error1 = error1 * dscale1 # Convert errors to a score score1 = 1.0 / (1.0 + error1 * 10) return score1 # ================================================================================================== def triangulate_and_score(pose1, pose2, cam1, cam2, roomparams): """Triangulates a pair of persons and scores them based on the reprojection error""" # Mask out invisible joints min_score = 0.1 mask1a = pose1[:, 2] >= min_score mask2a = pose2[:, 2] >= min_score mask = mask1a & mask2a # If too few joints are visible return a low score if np.sum(mask) < 3: pose3d = np.zeros([len(pose1), 4]) score = 0.0 return pose3d, score # Triangulate points points1 = pose1[mask, 0:2].T points2 = pose2[mask, 0:2].T points3d = cv2.triangulatePoints(cam1["P"], cam2["P"], points1, points2) points3d = (points3d / points3d[3, :])[0:3, :].T pose3d = np.zeros([len(pose1), 4]) pose3d[mask] = np.concatenate([points3d, np.ones([points3d.shape[0], 1])], axis=-1) # If the triangulated points are outside the room drop it mean = np.mean(pose3d[mask][:, 0:3], axis=0) mins = np.min(pose3d[mask][:, 0:3], axis=0) maxs = np.max(pose3d[mask][:, 0:3], axis=0) rsize = np.array(roomparams["room_size"]) / 2 rcent = np.array(roomparams["room_center"]) wdist = 0.1 center_outside = np.any((mean > rsize + rcent) | (mean < -rsize + rcent)) limb_outside = np.any( (maxs > rsize + rcent + wdist) | (mins < -rsize + rcent - wdist) ) if center_outside or limb_outside: pose3d[:, 3] = 0.001 score = 0.001 return pose3d, score # Calculate reprojection error poses_3d = np.expand_dims(pose3d, axis=0) repro1, dists1 = utils_pose.project_poses(poses_3d, cam1, calc_dists=True) repro2, dists2 = utils_pose.project_poses(poses_3d, cam2, calc_dists=True) repro1, dists1 = repro1[0], dists1[0] repro2, dists2 = repro2[0], dists2[0] # Calculate scores for each view iscale = (cam1["width"] + cam1["height"]) / 2 score1 = score_projection(pose1, repro1, dists1, mask, iscale) score2 = score_projection(pose2, repro2, dists2, mask, iscale) # Combine scores scores = (score1 + score2) / 2 # Drop lowest scores drop_k = math.floor(len(pose1) * 0.2) score = np.mean(np.sort(scores, axis=-1)[drop_k:]) # Add score to 3D pose full_scores = np.zeros([poses_3d.shape[1], 1]) full_scores[mask] = np.expand_dims(scores, axis=-1) pose3d[:, 3] = full_scores[:, 0] return pose3d, score # ================================================================================================== def calc_grouping(all_pairs, min_score: float): """Groups pairs that share a person""" # Calculate the pose center for each pair for i in range(len(all_pairs)): pair = all_pairs[i] pose_3d = pair[2][0] mask = pose_3d[:, 3] > min_score center = np.mean(pose_3d[mask, 0:3], axis=0) all_pairs[i] = all_pairs[i] + [center] groups = [] for i in range(len(all_pairs)): pair = all_pairs[i] # Create new group if non exists if len(groups) == 0: groups.append([pair[3], pair[2][0], [pair]]) continue # Check if the pair matches to an existing group max_center_dist = 0.9 max_joint_avg_dist = 0.3 best_dist = math.inf best_group = -1 for j in range(len(groups)): g0 = groups[j] center0 = g0[0] center1 = pair[3] if np.linalg.norm(center0 - center1) < max_center_dist: pose0 = g0[1] pose1 = pair[2][0] # Calculate the distance between the two poses mask0 = pose0[:, 3] > min_score mask1 = pose1[:, 3] > min_score mask = mask0 & mask1 dists = np.linalg.norm(pose0[mask, 0:3] - pose1[mask, 0:3], axis=1) dist = np.mean(dists) if dist < max_joint_avg_dist: if dist < best_dist: best_dist = dist best_group = j if best_group >= 0: # Add pair to existing group and update the mean positions group = groups[best_group] new_center = (group[0] * len(group[1]) + pair[3]) / (len(group[1]) + 1) new_pose = (group[1] * len(group[1]) + pair[2][0]) / (len(group[1]) + 1) group[2].append(pair) group[0] = new_center group[1] = new_pose else: # Create new group if no match was found groups.append([pair[3], pair[2][0], [pair]]) return groups # ================================================================================================== def merge_group(poses_3d: np.ndarray, min_score: float): """Merges a group of poses into a single pose""" # Merge poses to create initial pose # Use only those triangulations with a high score imask = poses_3d[:, :, 3:4] > min_score sum_poses = np.sum(poses_3d * imask, axis=0) sum_mask = np.sum(imask, axis=0) initial_pose_3d = np.divide( sum_poses, sum_mask, where=(sum_mask > 0), out=np.zeros_like(sum_poses) ) # Use center as default if the initial pose is empty jmask = initial_pose_3d[:, 3] > 0.0 sum_joints = np.sum(initial_pose_3d[jmask, 0:3], axis=0) sum_mask = np.sum(jmask) center = np.divide( sum_joints, sum_mask, where=(sum_mask > 0), out=np.zeros_like(sum_joints) ) initial_pose_3d[~jmask, 0:3] = center # Drop joints with low scores offset = 0.1 mask = poses_3d[:, :, 3:4] > (min_score - offset) # Drop outliers that are far away from the other proposals max_dist = 1.2 distances = np.linalg.norm( poses_3d[:, :, :3] - initial_pose_3d[np.newaxis, :, :3], axis=2 ) dmask = distances <= max_dist mask = mask & np.expand_dims(dmask, axis=-1) # Select the best-k proposals for each joint that are closest to the initial pose keep_best = 3 sorted_indices = np.argsort(distances, axis=0) best_k_mask = np.zeros_like(mask, dtype=bool) num_joints = poses_3d.shape[1] for i in range(num_joints): valid_indices = sorted_indices[:, i][mask[sorted_indices[:, i], i, 0]] best_k_mask[valid_indices[:keep_best], i, 0] = True mask = mask & best_k_mask # Final pose computation with combined masks sum_poses = np.sum(poses_3d * mask, axis=0) sum_mask = np.sum(mask, axis=0) final_pose_3d = np.divide( sum_poses, sum_mask, where=(sum_mask > 0), out=np.zeros_like(sum_poses) ) return final_pose_3d # ================================================================================================== def get_3d_pose( poses_2d, camparams, roomparams, joint_names_2d, last_poses_3d=np.array([]), min_score=0.95, ): """Triangulates 3D poses from 2D poses of multiple views""" # Convert poses and camparams to numpy arrays camparams = copy.deepcopy(camparams) for i in range(len(camparams)): poses_2d[i] = np.asarray(poses_2d[i]) camparams[i]["K"] = np.array(camparams[i]["K"]) camparams[i]["R"] = np.array(camparams[i]["R"]) camparams[i]["T"] = np.array(camparams[i]["T"]) camparams[i]["DC"] = np.array(camparams[i]["DC"]) # Undistort 2D points for i in range(len(camparams)): poses = poses_2d[i] cam = camparams[i] poses[:, :, 0:2], cam = undistort_points(poses[:, :, 0:2], cam) # Mask out points that are far outside the image (points slightly outside are still valid) offset = (cam["width"] + cam["height"]) / 40 mask = ( (poses[:, :, 0] >= 0 - offset) & (poses[:, :, 0] < cam["width"] + offset) & (poses[:, :, 1] >= 0 - offset) & (poses[:, :, 1] < cam["height"] + offset) ) poses = poses * np.expand_dims(mask, axis=-1) poses_2d[i] = poses # Calc projection matrix with updated camera parameters cam["P"] = get_camera_P(cam) camparams[i] = cam # Project last 3D poses to 2D last_poses_2d = [] last_poses_3d = np.asarray(last_poses_3d) if last_poses_3d.size > 0: for i in range(len(camparams)): poses2d, dists = utils_pose.project_poses(last_poses_3d, camparams[i]) last_poses_2d.append((poses2d, dists)) # Check matches to old poses threshold = min_score - 0.2 scored_pasts = {} if last_poses_3d.size > 0: for i in range(len(camparams)): scored_pasts[i] = {} poses = poses_2d[i] last_poses, dists = last_poses_2d[i] for j in range(len(last_poses)): scored_pasts[i][j] = [] for k in range(len(poses)): score = calc_pose_score( poses[k], last_poses[j], dists[j], camparams[i], joint_names_2d, core_joints, ) if score > threshold: scored_pasts[i][j].append(k) # Create pairs of persons # Checks if the person was already matched to the last frame and if so only creates pairs with those # Else it creates all possible pairs num_persons = [len(p) for p in poses_2d] all_pairs = [] for i in range(len(camparams)): poses = poses_2d[i] for j in range(i + 1, len(poses_2d)): poses2 = poses_2d[j] for k in range(len(poses)): for l in range(len(poses2)): pid1 = sum(num_persons[:i]) + k pid2 = sum(num_persons[:j]) + l match = False if last_poses_3d.size > 0: for m in range(len(last_poses_3d)): if k in scored_pasts[i][m] and l in scored_pasts[j][m]: match = True all_pairs.append([(i, j, k, l), (pid1, pid2)]) elif k in scored_pasts[i][m] or l in scored_pasts[j][m]: match = True if not match: all_pairs.append([(i, j, k, l), (pid1, pid2)]) # Calculate pair scores for i in range(len(all_pairs)): pair = all_pairs[i] pose_3d, score = calc_pair_score( pair, poses_2d, camparams, roomparams, joint_names_2d, core_joints ) all_pairs[i].append((pose_3d, score)) # import draw_utils # poses3D = np.array([pose_3d]) # _ = draw_utils.utils_view.show_poses3d( # poses3D, core_joints, {}, camparams # ) # draw_utils.utils_view.show_plots() # Drop pairs with low scores all_pairs = [p for p in all_pairs if p[2][1] > min_score] # Group pairs that share a person groups = calc_grouping(all_pairs, min_score) # Calculate full 3D poses poses_3d = [] for pair in all_pairs: cam1 = camparams[pair[0][0]] cam2 = camparams[pair[0][1]] pose1 = poses_2d[pair[0][0]][pair[0][2]] pose2 = poses_2d[pair[0][1]][pair[0][3]] pose_3d, _ = triangulate_and_score(pose1, pose2, cam1, cam2, roomparams) pair.append(pose_3d) # Merge groups poses_3d = [] for group in groups: poses = np.array([p[4] for p in group[2]]) pose_3d = merge_group(poses, min_score) poses_3d.append(pose_3d) if len(poses_3d) > 0: poses3D = np.array(poses_3d) else: poses3D = np.zeros([1, len(joint_names_2d), 4]) return poses3D