import numpy as np import random import torchvision.transforms as T import cv2 import math from data import transform as base_transform from utils import is_list, is_dict, get_valid_args class NoOperation(): def __call__(self, x): return x class BaseSilTransform(): def __init__(self, divsor=255.0, img_shape=None): self.divsor = divsor self.img_shape = img_shape def __call__(self, x): if self.img_shape is not None: s = x.shape[0] _ = [s] + [*self.img_shape] x = x.reshape(*_) return x / self.divsor class BaseSilCuttingTransform(): def __init__(self, divsor=255.0, cutting=None): self.divsor = divsor self.cutting = cutting def __call__(self, x): if self.cutting is not None: cutting = self.cutting else: cutting = int(x.shape[-1] // 64) * 10 x = x[..., cutting:-cutting] return x / self.divsor class BaseRgbTransform(): def __init__(self, mean=None, std=None): if mean is None: mean = [0.485*255, 0.456*255, 0.406*255] if std is None: std = [0.229*255, 0.224*255, 0.225*255] self.mean = np.array(mean).reshape((1, 3, 1, 1)) self.std = np.array(std).reshape((1, 3, 1, 1)) def __call__(self, x): return (x - self.mean) / self.std # **************** Data Agumentation **************** class RandomHorizontalFlip(object): def __init__(self, prob=0.5): self.prob = prob def __call__(self, seq): if random.uniform(0, 1) >= self.prob: return seq else: return seq[..., ::-1] class RandomErasing(object): def __init__(self, prob=0.5, sl=0.05, sh=0.2, r1=0.3, per_frame=False): self.prob = prob self.sl = sl self.sh = sh self.r1 = r1 self.per_frame = per_frame def __call__(self, seq): if not self.per_frame: if random.uniform(0, 1) >= self.prob: return seq else: for _ in range(100): seq_size = seq.shape area = seq_size[1] * seq_size[2] target_area = random.uniform(self.sl, self.sh) * area aspect_ratio = random.uniform(self.r1, 1 / self.r1) h = int(round(math.sqrt(target_area * aspect_ratio))) w = int(round(math.sqrt(target_area / aspect_ratio))) if w < seq_size[2] and h < seq_size[1]: x1 = random.randint(0, seq_size[1] - h) y1 = random.randint(0, seq_size[2] - w) seq[:, x1:x1+h, y1:y1+w] = 0. return seq return seq else: self.per_frame = False frame_num = seq.shape[0] ret = [self.__call__(seq[k][np.newaxis, ...]) for k in range(frame_num)] self.per_frame = True return np.concatenate(ret, 0) class RandomRotate(object): def __init__(self, prob=0.5, degree=10): self.prob = prob self.degree = degree def __call__(self, seq): if random.uniform(0, 1) >= self.prob: return seq else: _, dh, dw = seq.shape # rotation degree = random.uniform(-self.degree, self.degree) M1 = cv2.getRotationMatrix2D((dh // 2, dw // 2), degree, 1) # affine seq = [cv2.warpAffine(_[0, ...], M1, (dw, dh)) for _ in np.split(seq, seq.shape[0], axis=0)] seq = np.concatenate([np.array(_)[np.newaxis, ...] for _ in seq], 0) return seq class RandomPerspective(object): def __init__(self, prob=0.5): self.prob = prob def __call__(self, seq): if random.uniform(0, 1) >= self.prob: return seq else: _, h, w = seq.shape cutting = int(w // 44) * 10 x_left = list(range(0, cutting)) x_right = list(range(w - cutting, w)) TL = (random.choice(x_left), 0) TR = (random.choice(x_right), 0) BL = (random.choice(x_left), h) BR = (random.choice(x_right), h) srcPoints = np.float32([TL, TR, BR, BL]) canvasPoints = np.float32([[0, 0], [w, 0], [w, h], [0, h]]) perspectiveMatrix = cv2.getPerspectiveTransform( np.array(srcPoints), np.array(canvasPoints)) seq = [cv2.warpPerspective(_[0, ...], perspectiveMatrix, (w, h)) for _ in np.split(seq, seq.shape[0], axis=0)] seq = np.concatenate([np.array(_)[np.newaxis, ...] for _ in seq], 0) return seq class RandomAffine(object): def __init__(self, prob=0.5, degree=10): self.prob = prob self.degree = degree def __call__(self, seq): if random.uniform(0, 1) >= self.prob: return seq else: _, dh, dw = seq.shape # rotation max_shift = int(dh // 64 * 10) shift_range = list(range(0, max_shift)) pts1 = np.float32([[random.choice(shift_range), random.choice(shift_range)], [ dh-random.choice(shift_range), random.choice(shift_range)], [random.choice(shift_range), dw-random.choice(shift_range)]]) pts2 = np.float32([[random.choice(shift_range), random.choice(shift_range)], [ dh-random.choice(shift_range), random.choice(shift_range)], [random.choice(shift_range), dw-random.choice(shift_range)]]) M1 = cv2.getAffineTransform(pts1, pts2) # affine seq = [cv2.warpAffine(_[0, ...], M1, (dw, dh)) for _ in np.split(seq, seq.shape[0], axis=0)] seq = np.concatenate([np.array(_)[np.newaxis, ...] for _ in seq], 0) return seq # ****************************************** def Compose(trf_cfg): assert is_list(trf_cfg) transform = T.Compose([get_transform(cfg) for cfg in trf_cfg]) return transform def get_transform(trf_cfg=None): if is_dict(trf_cfg): transform = getattr(base_transform, trf_cfg['type']) valid_trf_arg = get_valid_args(transform, trf_cfg, ['type']) return transform(**valid_trf_arg) if trf_cfg is None: return lambda x: x if is_list(trf_cfg): transform = [get_transform(cfg) for cfg in trf_cfg] return transform raise "Error type for -Transform-Cfg-" # **************** For pose **************** class RandomSelectSequence(object): """ Randomly select different subsequences """ def __init__(self, sequence_length=10): self.sequence_length = sequence_length def __call__(self, data): try: start = np.random.randint(0, data.shape[0] - self.sequence_length) except ValueError: raise ValueError("The sequence length of data is too short, which does not meet the requirements.") end = start + self.sequence_length return data[start:end] class SelectSequenceCenter(object): """ Select center subsequence """ def __init__(self, sequence_length=10): self.sequence_length = sequence_length def __call__(self, data): try: start = int((data.shape[0]/2) - (self.sequence_length / 2)) except ValueError: raise ValueError("The sequence length of data is too short, which does not meet the requirements.") end = start + self.sequence_length return data[start:end] class MirrorPoses(object): """ Performing Mirror Operations """ def __init__(self, prob=0.5): self.prob = prob def __call__(self, data): if np.random.random() <= self.prob: center = np.mean(data[:, :, 0], axis=1, keepdims=True) data[:, :, 0] = center - data[:, :, 0] + center return data class NormalizeEmpty(object): """ Normliza Empty Joint """ def __call__(self, data): frames, joints = np.where(data[:, :, 0] == 0) for frame, joint in zip(frames, joints): center_of_gravity = np.mean(data[frame], axis=0) data[frame, joint, 0] = center_of_gravity[0] data[frame, joint, 1] = center_of_gravity[1] data[frame, joint, 2] = 0 return data class RandomMove(object): """ Move: add Random Movement to each joint """ def __init__(self,random_r =[4,1]): self.random_r = random_r def __call__(self, data): noise = np.zeros(3) noise[0] = np.random.uniform(-self.random_r[0], self.random_r[0]) noise[1] = np.random.uniform(-self.random_r[1], self.random_r[1]) data += np.tile(noise,(data.shape[0], data.shape[1], 1)) return data class PointNoise(object): """ Add Gaussian noise to pose points std: standard deviation """ def __init__(self, std=0.01): self.std = std def __call__(self, data): noise = np.random.normal(0, self.std, data.shape).astype(np.float32) return data + noise class FlipSequence(object): """ Temporal Fliping """ def __init__(self, probability=0.5): self.probability = probability def __call__(self, data): if np.random.random() <= self.probability: return np.flip(data,axis=0).copy() return data class InversePosesPre(object): ''' Left-right flip of skeletons ''' def __init__(self, probability=0.5, joint_format='coco'): self.probability = probability if joint_format == 'coco': self.invers_arr = [0, 2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 16, 15] elif joint_format in ['alphapose', 'openpose']: self.invers_arr = [0, 1, 5, 6, 7, 2, 3, 4, 11, 12, 13, 8, 9, 10, 15, 14, 17, 16] else: raise ValueError("Invalid joint_format.") def __call__(self, data): for i in range(len(data)): if np.random.random() <= self.probability: data[i]=data[i,self.invers_arr,:] return data class JointNoise(object): """ Add Gaussian noise to joint std: standard deviation """ def __init__(self, std=0.25): self.std = std def __call__(self, data): # T, V, C noise = np.hstack(( np.random.normal(0, self.std, (data.shape[1], 2)), np.zeros((data.shape[1], 1)) )).astype(np.float32) return data + np.repeat(noise[np.newaxis, ...], data.shape[0], axis=0) class GaitTRMultiInput(object): def __init__(self, joint_format='coco',): if joint_format == 'coco': self.connect_joint = np.array([5,0,0,1,2,0,0,5,6,7,8,5,6,11,12,13,14]) elif joint_format in ['alphapose', 'openpose']: self.connect_joint = np.array([1,1,1,2,3,1,5,6,2,8,9,5,11,12,0,0,14,15]) else: raise ValueError("Invalid joint_format.") def __call__(self, data): # (C, T, V) -> (I, C * 2, T, V) data = np.transpose(data, (2, 0, 1)) data = data[:2, :, :] C, T, V = data.shape data_new = np.zeros((5, C, T, V)) # Joints data_new[0, :C, :, :] = data for i in range(V): data_new[1, :, :, i] = data[:, :, i] - data[:, :, 0] # Velocity for i in range(T - 2): data_new[2, :, i, :] = data[:, i + 1, :] - data[:, i, :] data_new[3, :, i, :] = data[:, i + 2, :] - data[:, i, :] # Bones for i in range(len(self.connect_joint)): data_new[4, :, :, i] = data[:, :, i] - data[:, :, self.connect_joint[i]] I, C, T, V = data_new.shape data_new = data_new.reshape(I*C, T, V) # (C T V) -> (T V C) data_new = np.transpose(data_new, (1, 2, 0)) return data_new class GaitGraphMultiInput(object): def __init__(self, center=0, joint_format='coco'): self.center = center if joint_format == 'coco': self.connect_joint = np.array([5,0,0,1,2,0,0,5,6,7,8,5,6,11,12,13,14]) elif joint_format in ['alphapose', 'openpose']: self.connect_joint = np.array([1,1,1,2,3,1,5,6,2,8,9,5,11,12,0,0,14,15]) else: raise ValueError("Invalid joint_format.") def __call__(self, data): T, V, C = data.shape x_new = np.zeros((T, V, 3, C + 2)) # Joints x = data x_new[:, :, 0, :C] = x for i in range(V): x_new[:, i, 0, C:] = x[:, i, :2] - x[:, self.center, :2] # Velocity for i in range(T - 2): x_new[i, :, 1, :2] = x[i + 1, :, :2] - x[i, :, :2] x_new[i, :, 1, 3:] = x[i + 2, :, :2] - x[i, :, :2] x_new[:, :, 1, 3] = x[:, :, 2] # Bones for i in range(V): x_new[:, i, 2, :2] = x[:, i, :2] - x[:, self.connect_joint[i], :2] # Angles bone_length = 0 for i in range(C - 1): bone_length += np.power(x_new[:, :, 2, i], 2) bone_length = np.sqrt(bone_length) + 0.0001 for i in range(C - 1): x_new[:, :, 2, C+i] = np.arccos(x_new[:, :, 2, i] / bone_length) x_new[:, :, 2, 3] = x[:, :, 2] return x_new class GaitGraph1Input(object): ''' Transpose the input ''' def __call__(self, data): # (T V C) -> (C T V) data = np.transpose(data, (2, 0, 1)) return data[...,np.newaxis] class SkeletonInput(object): ''' Transpose the input ''' def __call__(self, data): # (T V C) -> (T C V) data = np.transpose(data, (0, 2, 1)) return data[...,np.newaxis] class TwoView(object): def __init__(self,trf_cfg): assert is_list(trf_cfg) self.transform = T.Compose([get_transform(cfg) for cfg in trf_cfg]) def __call__(self, data): return np.concatenate([self.transform(data), self.transform(data)], axis=1) class MSGGTransform(): def __init__(self, joint_format="coco"): if joint_format == "coco": #17 self.mask=[6,8,14,12,7,13,5,10,16,11,9,15] elif joint_format in ['alphapose', 'openpose']: #18 self.mask=[2,3,9,8,6,12,5,4,10,11,7,13] else: raise ValueError("Invalid joint_format.") def __call__(self, x): result=x[...,self.mask,:].copy() return result