Files
Dongyang Jin 2c29afadf3 Support skeleton (#155)
* pose

* pose

* pose

* pose

* 你的提交消息

* pose

* pose

* Delete train1.sh

* pretreatment

* configs

* pose

* reference

* Update gaittr.py

* naming

* naming

* Update transform.py

* update for datasets

* update README

* update name and README

* update

* Update transform.py
2023-09-27 16:20:00 +08:00

485 lines
20 KiB
Python

import torch
import copy
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from ..base_model import BaseModel
class MultiScaleGaitGraph(BaseModel):
"""
Learning Rich Features for Gait Recognition by Integrating Skeletons and Silhouettes
Github: https://github.com/YunjiePeng/BimodalFusion
"""
def build_network(self, model_cfg):
in_c = model_cfg['in_channels']
out_c = model_cfg['out_channels']
num_id = model_cfg['num_id']
temporal_kernel_size = model_cfg['temporal_kernel_size']
# load spatial graph
self.graph = SpatialGraph(**model_cfg['graph_cfg'])
A_lowSemantic = torch.tensor(self.graph.get_adjacency(semantic_level=0), dtype=torch.float32, requires_grad=False)
A_mediumSemantic = torch.tensor(self.graph.get_adjacency(semantic_level=1), dtype=torch.float32, requires_grad=False)
A_highSemantic = torch.tensor(self.graph.get_adjacency(semantic_level=2), dtype=torch.float32, requires_grad=False)
self.register_buffer('A_lowSemantic', A_lowSemantic)
self.register_buffer('A_mediumSemantic', A_mediumSemantic)
self.register_buffer('A_highSemantic', A_highSemantic)
# build networks
spatial_kernel_size = self.graph.num_A
temporal_kernel_size = temporal_kernel_size
kernel_size = (temporal_kernel_size, spatial_kernel_size)
self.st_gcn_networks_lowSemantic = nn.ModuleList()
self.st_gcn_networks_mediumSemantic = nn.ModuleList()
self.st_gcn_networks_highSemantic = nn.ModuleList()
for i in range(len(in_c)-1):
if i == 0:
self.st_gcn_networks_lowSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1, residual=False))
self.st_gcn_networks_mediumSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1, residual=False))
self.st_gcn_networks_highSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1, residual=False))
else:
self.st_gcn_networks_lowSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1))
self.st_gcn_networks_mediumSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1))
self.st_gcn_networks_highSemantic.append(st_gcn_block(in_c[i], in_c[i+1], kernel_size, 1))
self.st_gcn_networks_lowSemantic.append(st_gcn_block(in_c[i+1], in_c[i+1], kernel_size, 1))
self.st_gcn_networks_mediumSemantic.append(st_gcn_block(in_c[i+1], in_c[i+1], kernel_size, 1))
self.st_gcn_networks_highSemantic.append(st_gcn_block(in_c[i+1], in_c[i+1], kernel_size, 1))
self.edge_importance_lowSemantic = nn.ParameterList([
nn.Parameter(torch.ones(self.A_lowSemantic.size()))
for i in self.st_gcn_networks_lowSemantic])
self.edge_importance_mediumSemantic = nn.ParameterList([
nn.Parameter(torch.ones(self.A_mediumSemantic.size()))
for i in self.st_gcn_networks_mediumSemantic])
self.edge_importance_highSemantic = nn.ParameterList([
nn.Parameter(torch.ones(self.A_highSemantic.size()))
for i in self.st_gcn_networks_highSemantic])
self.fc = nn.Linear(in_c[-1], out_c)
self.bn_neck = nn.BatchNorm1d(out_c)
self.encoder_cls = nn.Linear(out_c, num_id, bias=False)
def semantic_pooling(self, x):
cur_node_num = x.size()[-1]
half_x_1, half_x_2 = torch.split(x, int(cur_node_num / 2), dim=-1)
x_sp = torch.add(half_x_1, half_x_2) / 2
return x_sp
def forward(self, inputs):
ipts, labs, _, _, seqL = inputs
x = ipts[0] # [N, T, V, C]
del ipts
"""
N - the number of videos.
T - the number of frames in one video.
V - the number of keypoints.
C - the number of features for one keypoint.
"""
N, T, V, C = x.size()
x = x.permute(0, 3, 1, 2).contiguous()
x = x.view(N, C, T, V)
y = self.semantic_pooling(x)
z = self.semantic_pooling(y)
for gcn_lowSemantic, importance_lowSemantic, gcn_mediumSemantic, importance_mediumSemantic, gcn_highSemantic, importance_highSemantic in zip(self.st_gcn_networks_lowSemantic, self.edge_importance_lowSemantic, self.st_gcn_networks_mediumSemantic, self.edge_importance_mediumSemantic, self.st_gcn_networks_highSemantic, self.edge_importance_highSemantic):
x, _ = gcn_lowSemantic(x, self.A_lowSemantic * importance_lowSemantic)
y, _ = gcn_mediumSemantic(y, self.A_mediumSemantic * importance_mediumSemantic)
z, _ = gcn_highSemantic(z, self.A_highSemantic * importance_highSemantic)
# Cross-scale Message Passing
x_sp = self.semantic_pooling(x)
y = torch.add(y, x_sp)
y_sp = self.semantic_pooling(y)
z = torch.add(z, y_sp)
# global pooling for each layer
x_sp = F.avg_pool2d(x, x.size()[2:])
N, C, T, V = x_sp.size()
x_sp = x_sp.view(N, C, T*V).contiguous()
y_sp = F.avg_pool2d(y, y.size()[2:])
N, C, T, V = y_sp.size()
y_sp = y_sp.view(N, C, T*V).contiguous()
z = F.avg_pool2d(z, z.size()[2:])
N, C, T, V = z.size()
z = z.permute(0, 2, 3, 1).contiguous()
z = z.view(N, T*V, C)
z_fc = self.fc(z.view(N, -1))
bn_z_fc = self.bn_neck(z_fc)
z_cls_score = self.encoder_cls(bn_z_fc)
z_fc = z_fc.unsqueeze(-1).contiguous() # [n, c, p]
z_cls_score = z_cls_score.unsqueeze(-1).contiguous() # [n, c, p]
retval = {
'training_feat': {
'triplet_joints': {'embeddings': x_sp, 'labels': labs},
'triplet_limbs': {'embeddings': y_sp, 'labels': labs},
'triplet_bodyparts': {'embeddings': z_fc, 'labels': labs},
'softmax': {'logits': z_cls_score, 'labels': labs}
},
'visual_summary': {},
'inference_feat': {
'embeddings': z_fc
}
}
return retval
class st_gcn_block(nn.Module):
r"""Applies a spatial temporal graph convolution over an input graph sequence.
Args:
in_channels (int): Number of channels in the input sequence data
out_channels (int): Number of channels produced by the convolution
kernel_size (tuple): Size of the temporal convolving kernel and graph convolving kernel
stride (int, optional): Stride of the temporal convolution. Default: 1
dropout (int, optional): Dropout rate of the final output. Default: 0
residual (bool, optional): If ``True``, applies a residual mechanism. Default: ``True``
Shape:
- Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)` format
- Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
- Output[0]: Outpu graph sequence in :math:`(N, out_channels, T_{out}, V)` format
- Output[1]: Graph adjacency matrix for output data in :math:`(K, V, V)` format
where
:math:`N` is a batch size, i.e. the number of videos.
:math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`.
:math:`T_{in}/T_{out}` is a length of input/output sequence, i.e. the number of frames in a video.
:math:`V` is the number of graph nodes.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
dropout=0,
residual=True):
super().__init__()
assert len(kernel_size) == 2
assert kernel_size[0] % 2 == 1
padding = ((kernel_size[0] - 1) // 2, 0)
self.gcn = SCN(in_channels, out_channels, kernel_size[1])
self.tcn = nn.Sequential(
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(
out_channels,
out_channels,
(kernel_size[0], 1),
(stride, 1),
padding,
),
nn.BatchNorm2d(out_channels),
nn.Dropout(dropout, inplace=True),
)
if not residual:
self.residual = lambda x: 0
elif (in_channels == out_channels) and (stride == 1):
self.residual = lambda x: x
else:
self.residual = nn.Sequential(
nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=(stride, 1)),
nn.BatchNorm2d(out_channels),
)
self.relu = nn.ReLU(inplace=True)
def forward(self, x, A):
res = self.residual(x)
x, A = self.gcn(x, A)
x = self.tcn(x) + res
return self.relu(x), A
class SCN(nn.Module):
r"""The basic module for applying a graph convolution.
Args:
in_channels (int): Number of channels in the input sequence data
out_channels (int): Number of channels produced by the convolution
kernel_size (int): Size of the graph convolving kernel
t_kernel_size (int): Size of the temporal convolving kernel
t_stride (int, optional): Stride of the temporal convolution. Default: 1
t_padding (int, optional): Temporal zero-padding added to both sides of
the input. Default: 0
t_dilation (int, optional): Spacing between temporal kernel elements.
Default: 1
bias (bool, optional): If ``True``, adds a learnable bias to the output.
Default: ``True``
Shape:
- Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)` format
- Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
- Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out}, V)` format
- Output[1]: Graph adjacency matrix for output data in :math:`(K, V, V)` format
where
:math:`N` is a batch size,
:math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`,
:math:`T_{in}/T_{out}` is a length of input/output sequence,
:math:`V` is the number of graph nodes.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
t_kernel_size=1,
t_stride=1,
t_padding=0,
t_dilation=1,
bias=True):
super().__init__()
# The defined module SCN are responsible only for the Spacial Graph (i.e. the graph in on frame),
# and the parameter t_kernel_size in this situation is always set to 1.
self.kernel_size = kernel_size
self.conv = nn.Conv2d(in_channels,
out_channels * kernel_size,
kernel_size=(t_kernel_size, 1),
padding=(t_padding, 0),
stride=(t_stride, 1),
dilation=(t_dilation, 1),
bias=bias)
"""
The 1x1 conv operation here stands for the weight metrix W.
The kernel_size here stands for the number of different adjacency matrix,
which are defined according to the partitioning strategy.
Because for neighbor nodes in the same subset (in one adjacency matrix), the weights are shared.
It is reasonable to apply 1x1 conv as the implementation of weight function.
"""
def forward(self, x, A):
assert A.size(0) == self.kernel_size
x = self.conv(x)
n, kc, t, v = x.size()
x = x.view(n, self.kernel_size, kc // self.kernel_size, t, v)
x = torch.einsum('nkctv,kvw->nctw', (x, A))
return x.contiguous(), A
class SpatialGraph():
""" Use skeleton sequences extracted by Openpose/HRNet to construct Spatial-Temporal Graph
Args:
strategy (string): must be one of the follow candidates
- uniform: Uniform Labeling
- distance: Distance Partitioning
- spatial: Spatial Configuration Partitioning
- gait_temporal: Gait Temporal Configuration Partitioning
For more information, please refer to the section 'Partition Strategies' in PGG.
layout (string): must be one of the follow candidates
- body_12: Is consists of 12 joints.
(right shoulder, right elbow, right knee, right hip, left elbow, left knee,
left shoulder, right wrist, right ankle, left hip, left wrist, left ankle).
For more information, please refer to the section 'Data Processing' in PGG.
max_hop (int): the maximal distance between two connected nodes # 1-neighbor
dilation (int): controls the spacing between the kernel points
"""
def __init__(self,
layout='body_12', # Openpose here represents for body_12
strategy='spatial',
semantic_level=0,
max_hop=1,
dilation=1):
self.layout = layout
self.strategy = strategy
self.max_hop = max_hop
self.dilation = dilation
self.num_node, self.neighbor_link_dic = self.get_layout_info(layout)
self.num_A = self.get_A_num(strategy)
def __str__(self):
return self.A
def get_A_num(self, strategy):
if self.strategy == 'uniform':
return 1
elif self.strategy == 'distance':
return 2
elif (self.strategy == 'spatial') or (self.strategy == 'gait_temporal'):
return 3
else:
raise ValueError("Do Not Exist This Strategy")
def get_layout_info(self, layout):
if layout == 'body_12':
num_node = 12
neighbor_link_dic = {
0: [(7, 1), (1, 0), (10, 4), (4, 6),
(8, 2), (2, 3), (11, 5), (5, 9),
(9, 3), (3, 0), (9, 6), (6, 0)],
1: [(1, 0), (4, 0), (0, 3), (2, 3), (5, 3)],
2: [(1, 0), (2, 0)]
}
return num_node, neighbor_link_dic
else:
raise ValueError("Do Not Exist This Layout.")
def get_edge(self, semantic_level):
# edge is a list of [child, parent] pairs, regarding the center node as root node
self_link = [(i, i) for i in range(int(self.num_node / (2 ** semantic_level)))]
neighbor_link = self.neighbor_link_dic[semantic_level]
edge = self_link + neighbor_link
center = []
if self.layout == 'body_12':
if semantic_level == 0:
center = [0, 3, 6, 9]
elif semantic_level == 1:
center = [0, 3]
elif semantic_level == 2:
center = [0]
return edge, center
def get_gait_temporal_partitioning(self, semantic_level):
if semantic_level == 0:
if self.layout == 'body_12':
positive_node = {1, 2, 4, 5, 7, 8, 10, 11}
negative_node = {0, 3, 6, 9}
elif semantic_level == 1:
if self.layout == 'body_12':
positive_node = {1, 2, 4, 5}
negative_node = {0, 3}
elif semantic_level == 2:
if self.layout == 'body_12':
positive_node = {1, 2}
negative_node = {0}
return positive_node, negative_node
def get_adjacency(self, semantic_level):
edge, center = self.get_edge(semantic_level)
num_node = int(self.num_node / (2 ** semantic_level))
hop_dis = get_hop_distance(num_node, edge, max_hop=self.max_hop)
valid_hop = range(0, self.max_hop + 1, self.dilation)
adjacency = np.zeros((num_node, num_node))
for hop in valid_hop:
adjacency[hop_dis == hop] = 1
normalize_adjacency = normalize_digraph(adjacency)
# normalize_adjacency = adjacency # withoutNodeNorm
# normalize_adjacency[a][b] = x
# when x = 0, node b has no connection with node a within valid hop.
# when x ≠ 0, the normalized adjacency from node b to node a is x.
# the value of x is normalized by the number of adjacent neighbor nodes around the node b.
if self.strategy == 'uniform':
A = np.zeros((1, num_node, num_node))
A[0] = normalize_adjacency
return A
elif self.strategy == 'distance':
A = np.zeros((len(valid_hop), num_node, num_node))
for i, hop in enumerate(valid_hop):
A[i][hop_dis == hop] = normalize_adjacency[hop_dis == hop]
return A
elif self.strategy == 'spatial':
A = []
for hop in valid_hop:
a_root = np.zeros((num_node, num_node))
a_close = np.zeros((num_node, num_node))
a_further = np.zeros((num_node, num_node))
for i in range(num_node):
for j in range(num_node):
if hop_dis[j, i] == hop:
j_hop_dis = min([hop_dis[j, _center] for _center in center])
i_hop_dis = min([hop_dis[i, _center] for _center in center])
if j_hop_dis == i_hop_dis:
a_root[j, i] = normalize_adjacency[j, i]
elif j_hop_dis > i_hop_dis:
a_close[j, i] = normalize_adjacency[j, i]
else:
a_further[j, i] = normalize_adjacency[j, i]
if hop == 0:
A.append(a_root)
else:
A.append(a_root + a_close)
A.append(a_further)
A = np.stack(A)
self.A = A
return A
elif self.strategy == 'gait_temporal':
A = []
positive_node, negative_node = self.get_gait_temporal_partitioning(semantic_level)
for hop in valid_hop:
a_root = np.zeros((num_node, num_node))
a_positive = np.zeros((num_node, num_node))
a_negative = np.zeros((num_node, num_node))
for i in range(num_node):
for j in range(num_node):
if hop_dis[j, i] == hop:
if i == j:
a_root[j, i] = normalize_adjacency[j, i]
elif j in positive_node:
a_positive[j, i] = normalize_adjacency[j, i]
else:
a_negative[j, i] = normalize_adjacency[j, i]
if hop == 0:
A.append(a_root)
else:
A.append(a_negative)
A.append(a_positive)
A = np.stack(A)
return A
else:
raise ValueError("Do Not Exist This Strategy")
def get_hop_distance(num_node, edge, max_hop=1):
# Calculate the shortest path between nodes
# i.e. The minimum number of steps needed to walk from one node to another
A = np.zeros((num_node, num_node)) # Ajacent Matrix
for i, j in edge:
A[j, i] = 1
A[i, j] = 1
# compute hop steps
hop_dis = np.zeros((num_node, num_node)) + np.inf
transfer_mat = [np.linalg.matrix_power(A, d) for d in range(max_hop + 1)]
arrive_mat = (np.stack(transfer_mat) > 0)
for d in range(max_hop, -1, -1):
hop_dis[arrive_mat[d]] = d
return hop_dis
def normalize_digraph(A):
Dl = np.sum(A, 0)
num_node = A.shape[0]
Dn = np.zeros((num_node, num_node))
for i in range(num_node):
if Dl[i] > 0:
Dn[i, i] = Dl[i]**(-1)
AD = np.dot(A, Dn)
return AD
def normalize_undigraph(A):
Dl = np.sum(A, 0)
num_node = A.shape[0]
Dn = np.zeros((num_node, num_node))
for i in range(num_node):
if Dl[i] > 0:
Dn[i, i] = Dl[i]**(-0.5)
DAD = np.dot(np.dot(Dn, A), Dn)
return DAD