deepgaitv2

This commit is contained in:
Dongyang Jin
2023-12-10 13:54:59 +08:00
parent 501e15d6f0
commit 7fe5d1b26a
8 changed files with 860 additions and 1 deletions
+129
View File
@@ -0,0 +1,129 @@
import torch
import torch.nn as nn
import os
import numpy as np
import os.path as osp
import matplotlib.pyplot as plt
from ..base_model import BaseModel
from ..modules import SetBlockWrapper, HorizontalPoolingPyramid, PackSequenceWrapper, SeparateFCs, SeparateBNNecks, conv1x1, conv3x3, BasicBlock2D, BasicBlockP3D, BasicBlock3D
from einops import rearrange
blocks_map = {
'2d': BasicBlock2D,
'p3d': BasicBlockP3D,
'3d': BasicBlock3D
}
class DeepGaitV2(BaseModel):
def build_network(self, model_cfg):
mode = model_cfg['Backbone']['mode']
assert mode in blocks_map.keys()
block = blocks_map[mode]
in_channels = model_cfg['Backbone']['in_channels']
layers = model_cfg['Backbone']['layers']
channels = model_cfg['Backbone']['channels']
if mode == '3d':
strides = [
[1, 1],
[1, 2, 2],
[1, 2, 2],
[1, 1, 1]
]
else:
strides = [
[1, 1],
[2, 2],
[2, 2],
[1, 1]
]
self.inplanes = channels[0]
self.layer0 = SetBlockWrapper(nn.Sequential(
conv3x3(in_channels, self.inplanes, 1),
nn.BatchNorm2d(self.inplanes),
nn.ReLU(inplace=True)
))
self.layer1 = SetBlockWrapper(self.make_layer(BasicBlock2D, channels[0], strides[0], blocks_num=layers[0], mode=mode))
self.layer2 = self.make_layer(block, channels[1], strides[1], blocks_num=layers[1], mode=mode)
self.layer3 = self.make_layer(block, channels[2], strides[2], blocks_num=layers[2], mode=mode)
self.layer4 = self.make_layer(block, channels[3], strides[3], blocks_num=layers[3], mode=mode)
if mode == '2d':
self.layer2 = SetBlockWrapper(self.layer2)
self.layer3 = SetBlockWrapper(self.layer3)
self.layer4 = SetBlockWrapper(self.layer4)
self.FCs = SeparateFCs(16, channels[3], channels[2])
self.BNNecks = SeparateBNNecks(16, channels[2], class_num=model_cfg['SeparateBNNecks']['class_num'])
self.TP = PackSequenceWrapper(torch.max)
self.HPP = HorizontalPoolingPyramid(bin_num=[16])
def make_layer(self, block, planes, stride, blocks_num, mode='2d'):
if max(stride) > 1 or self.inplanes != planes * block.expansion:
if mode == '3d':
downsample = nn.Sequential(nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=[1, 1, 1], stride=stride, padding=[0, 0, 0], bias=False), nn.BatchNorm3d(planes * block.expansion))
elif mode == '2d':
downsample = nn.Sequential(conv1x1(self.inplanes, planes * block.expansion, stride=stride), nn.BatchNorm2d(planes * block.expansion))
elif mode == 'p3d':
downsample = nn.Sequential(nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=[1, 1, 1], stride=[1, *stride], padding=[0, 0, 0], bias=False), nn.BatchNorm3d(planes * block.expansion))
else:
raise TypeError('xxx')
else:
downsample = lambda x: x
layers = [block(self.inplanes, planes, stride=stride, downsample=downsample)]
self.inplanes = planes * block.expansion
s = [1, 1] if mode in ['2d', 'p3d'] else [1, 1, 1]
for i in range(1, blocks_num):
layers.append(
block(self.inplanes, planes, stride=s)
)
return nn.Sequential(*layers)
def forward(self, inputs):
ipts, labs, typs, vies, seqL = inputs
sils = ipts[0].unsqueeze(1)
assert sils.size(-1) in [44, 88]
del ipts
out0 = self.layer0(sils)
out1 = self.layer1(out0)
out2 = self.layer2(out1)
out3 = self.layer3(out2)
out4 = self.layer4(out3) # [n, c, s, h, w]
# Temporal Pooling, TP
outs = self.TP(out4, seqL, options={"dim": 2})[0] # [n, c, h, w]
# Horizontal Pooling Matching, HPM
feat = self.HPP(outs) # [n, c, p]
embed_1 = self.FCs(feat) # [n, c, p]
embed_2, logits = self.BNNecks(embed_1) # [n, c, p]
embed = embed_1
retval = {
'training_feat': {
'triplet': {'embeddings': embed_1, 'labels': labs},
'softmax': {'logits': logits, 'labels': labs}
},
'visual_summary': {
'image/sils': rearrange(sils, 'n c s h w -> (n s) c h w'),
},
'inference_feat': {
'embeddings': embed
}
}
return retval
+156 -1
View File
@@ -705,4 +705,159 @@ class ParallelBN1d(nn.Module):
x = rearrange(x, 'n c p -> n (c p)')
x = self.bn1d(x)
x = rearrange(x, 'n (c p) -> n c p', p=self.parts_num)
return x
return x
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=dilation, groups=groups, bias=False, dilation=dilation)
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class BasicBlock2D(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None):
super(BasicBlock2D, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if groups != 1 or base_width != 64:
raise ValueError(
'BasicBlock only supports groups=1 and base_width=64')
if dilation > 1:
raise NotImplementedError(
"Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class BasicBlockP3D(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None):
super(BasicBlockP3D, self).__init__()
if norm_layer is None:
norm_layer2d = nn.BatchNorm2d
norm_layer3d = nn.BatchNorm3d
if groups != 1 or base_width != 64:
raise ValueError(
'BasicBlock only supports groups=1 and base_width=64')
if dilation > 1:
raise NotImplementedError(
"Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.relu = nn.ReLU(inplace=True)
self.conv1 = SetBlockWrapper(
nn.Sequential(
conv3x3(inplanes, planes, stride),
norm_layer2d(planes),
nn.ReLU(inplace=True)
)
)
self.conv2 = SetBlockWrapper(
nn.Sequential(
conv3x3(planes, planes),
norm_layer2d(planes),
)
)
self.shortcut3d = nn.Conv3d(planes, planes, (3, 1, 1), (1, 1, 1), (1, 0, 0), bias=False)
self.sbn = norm_layer3d(planes)
self.downsample = downsample
def forward(self, x):
'''
x: [n, c, s, h, w]
'''
identity = x
out = self.conv1(x)
out = self.relu(out + self.sbn(self.shortcut3d(out)))
out = self.conv2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class BasicBlock3D(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=[1, 1, 1], downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None):
super(BasicBlock3D, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm3d
if groups != 1 or base_width != 64:
raise ValueError(
'BasicBlock only supports groups=1 and base_width=64')
if dilation > 1:
raise NotImplementedError(
"Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
assert stride[0] in [1, 2, 3]
if stride[0] in [1, 2]:
tp = 1
else:
tp = 0
self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=(3, 3, 3), stride=stride, padding=[tp, 1, 1], bias=False)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv3d(planes, planes, kernel_size=(3, 3, 3), stride=[1, 1, 1], padding=[1, 1, 1], bias=False)
self.bn2 = norm_layer(planes)
self.downsample = downsample
def forward(self, x):
'''
x: [n, c, s, h, w]
'''
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out