mmMOT代码阅读笔记
1.mmMOT系统框架图
2.我理解的框架图
3.代码结构
4.代码解读
自上往下的顺序,从main文件开始,逐渐详细
main.py
程序主文件,用来训练和验证模型
-
获取配置文件,追踪模型初始化
global args, config, best_mota args = parser.parse_args() with open(args.config) as f: config = yaml.load(f, Loader=yaml.FullLoader) config = EasyDict(config['common']) config.save_path = os.path.dirname(args.config)
model = build_model(config) optimizer = build_optim(model, config) criterion = build_criterion(config.loss) tracking_module = TrackingModule(model, optimizer, criterion,config.det_type)
-
数据加载(dataset文件夹)
# Data loading code train_transform, valid_transform = build_augmentation(config.augmentation) # train train_dataset = build_dataset( config, set_source='train', evaluate=False, train_transform=train_transform) trainval_dataset = build_dataset( config, set_source='train', evaluate=True, valid_transform=valid_transform) val_dataset = build_dataset( config, set_source='val', evaluate=True, valid_transform=valid_transform) train_sampler = DistributedGivenIterationSampler( train_dataset, config.lr_scheduler.max_iter, config.batch_size, world_size=1, rank=0, last_iter=last_iter) train_loader = DataLoader( train_dataset, batch_size=config.batch_size, shuffle=False, num_workers=config.workers, pin_memory=True, sampler=train_sampler)
-
训练
def train(train_loader, val_loader, trainval_loader, tracking_module, lr_scheduler, start_iter, tb_logger):
# 前向传播 # forward loss = tracking_module.step(input.squeeze(0), det_info, det_id, det_cls, det_split)
-
验证
def validate(val_loader, tracking_module, step, part='train', fusion_list=None, fuse_prob=False):
tracking_model.py
追踪模型主文件,模式转换、训练过程、预测过程
-
模式转换
训练模式or评估模式
#评估模式 def eval(self): if isinstance(self.model, list): for i in range(len(self.model)): self.model[i].eval() else: self.model.eval() self.clear_mem() return #训练模式 def train(self): if isinstance(self.model, list): for i in range(len(self.model)): self.model[i].train() else: self.model.train() self.clear_mem() return
-
step(),训练函数,用于模型训练,得到4种score后计算损失并反向传播
def step(self, det_img, det_info, det_id, det_cls, det_split): # model是tracking_net搭建起来的完整的训练模型,经过模型计算得到4种score det_score, link_score, new_score, end_score, trans = self.model( det_img, det_info, det_split) # generate gt_y gt_det, gt_link, gt_new, gt_end = self.generate_gt( det_score[0], det_cls, det_id, det_split) # calculate loss # loss值,由cost.py模块计算得到 loss = self.criterion(det_split, gt_det, gt_link, gt_new, gt_end, det_score, link_score, new_score, end_score, trans) # 反向传播并优化 self.optimizer.zero_grad() loss.backward() self.optimizer.step() return loss
-
predict(),得到追踪结果,step()计算出来score之后直接反向传播,不必得到最终的追踪结果,predict根据得到的4种分数使用线性规划方法求解最终的结果
def predict(self, det_imgs, det_info, dets, det_split): # model是tracking_net搭建起来的完整的训练模型,经过模型计算得到4种score det_score, link_score, new_score, end_score, _ = self.model( det_imgs, det_info, det_split) # ortools_solve是线性规划模块,在solvers.py模块中,用于根据4种score进行关联,得到追踪结果 # 我觉得这一部分的内容也可以用贪婪匹配、匈牙利算法或者神经网络替代 assign_det, assign_link, assign_new, assign_end = ortools_solve( det_score[self.test_mode], [link_score[0][self.test_mode:self.test_mode + 1]], new_score[self.test_mode], end_score[self.test_mode], det_split) assign_id, assign_bbox = self.assign_det_id(assign_det, assign_link, assign_new, assign_end, det_split, dets) aligned_ids, aligned_dets, frame_start = self.align_id( assign_id, assign_bbox) return aligned_ids, aligned_dets, frame_start
solvers.py
线性规划模块,以4种估计器估计得到的score作为输入,得到最终的关联结果
# 函数头,输入为估计器得到的估计结果,约束条件是当前帧的所有识别结果要么是新轨迹的开始,要么和之前的轨迹相连,或者要么是旧轨迹的结束,要么和之后的轨迹相连
def ortools_solve(det_score,
link_score,
new_score,
end_score,
det_split,
gt=None):
......
# 返回值为关联结果
return assign_det, assign_link, assign_new, assign_end
cost.py
损失函数定义模块,损失函数计算公式为:
-
CostLoss类、NoDistanceLoss类、DistanceLoss类(我认为这几个类可能是用于做消融实验的);
-
DetLoss类,用于计算detection score和new score和end score的损失,可以使用binary cross entropy损失、l1损失、l2损失和ghm损失,其中ghm损失在module/ghm_loss中定义;
-
LinkLoss类,用于计算关联损失,可以使用l1损失或l2损失。
-
TrackingLoss类,用于计算完整框架的loss,用于模型训练时的反向传播。
# LinkLoss类,两类损失函数初始化
if 'l2' in loss_type:
self.l2_loss = nn.MSELoss()
if 'l1' in loss_type:
print("Use smooth l1 loss for link")
self.l1_loss = nn.SmoothL1Loss()
# 计算LinkLoss
def forward(self, det_split, gt_det, link_score, gt_link):
if 'l2' in self.loss_type:
loss += self.l2_loss(link_score[i].mul(mask),gt_link[i].repeat(mask.size(0), 1, 1))
if 'l1' in self.loss_type:
loss += self.l1_loss(link_score[i].mul(mask),gt_link[i].repeat(mask.size(0), 1, 1))
return loss
# DetLoss类,用于计算det loss、new loss、end loss,可以使用4种损失函数
def forward(self, det_score, gt_score):
"""
:param det_score: 3xL
:param gt_score: L
:return: loss
"""
gt_score = gt_score.unsqueeze(0).repeat(det_score.size(0), 1)
if 'bce' in self.loss_type:
loss = F.binary_cross_entropy_with_logits(det_score, gt_score)
if 'l2' in self.loss_type:
mask = 1 - gt_score.eq(self.ignore_index)
loss = F.mse_loss(det_score.mul(mask.float()), gt_score)
if 'l1' in self.loss_type:
mask = 1 - gt_score.eq(self.ignore_index)
loss = F.smooth_l1_loss(det_score.mul(mask.float()), gt_score)
# from modules.ghm_loss import GHMC_Loss
# self.GHMC_Loss = GHMC_Loss(bins=30, momentum=0.75)
if 'ghm' in self.loss_type:
mask = 1 - gt_score.eq(self.ignore_index)
loss = self.GHMC_Loss(det_score, gt_score, mask)
return loss
# TrackingLoss类,计算完整损失,定义不同损失的权重
# 参数初始化,det、new、end权重都设为0.4
def __init__(self,
smooth_ratio=0,
detloss_type='bce',
endloss_type='l2',
det_ratio=0.4,
trans_ratio=0.4,
trans_last=False,
linkloss_type='l2_softmax'):
# 损失函数
def forward(self,
det_split,
gt_det,
gt_link,
gt_new,
gt_end,
det_score,
link_score,
new_score,
end_score,
trans=None):
loss = self.det_loss(det_score, gt_det) * self.det_ratio
loss += self.end_loss(new_score, gt_new[det_split[0]:]) * 0.4
loss += self.end_loss(end_score, gt_end[:-det_split[-1]]) * 0.4
loss += self.link_loss(det_split, gt_det, link_score, gt_link)
# 这里还包括对转换的一些处理,代码没粘过来
return loss
tracking_net.py
完整的追踪模型框架,把所有模块整合起来得到完整的追踪框架
-
def init
参数:一些设置项,如特征向量长度、模型类型和其它设置项
初始化
- 表观模型,图像特征的提取,VGG/RES
- pointnet,点云特征的提取
- 融合模型,A/B/C
- new indicator&end indicator,V1/V2
- affinity_module,邻接矩阵模型
- negative rejection module,计算detection score
-
def associate
将affinity_module得到的不同模态的关联分数根据softmax_mode进行汇合,包括直接输出、相乘、相加、求最大4种方式,得到最终的link_score/new_score/end_score
-
def feature
特征获取模块,分别获取表观特征和点云特征,再使用相应的模块进行融合
-
def determine_det
求detection score
-
def forward
根据划分的数据集,求每帧对应的det_scores/link_scores/new_scores/end_scores
class TrackingNet
# 初始化函数
def __init__(self,
seq_len,
appear_len=512,
appear_skippool=False,
appear_fpn=False,
score_arch='vgg',
score_fusion_arch='C',
appear_arch='vgg',
point_arch='v1',
point_len=512,
softmax_mode='single',
test_mode=0,
affinity_op='multiply',
dropblock=5,
end_arch='v2',
end_mode='avg',
without_reflectivity=True,
neg_threshold=0,
use_dropout=False):
# build new end indicator
if end_arch in ['v1', 'v2']:
new_end = partial(
eval("NewEndIndicator_%s" % end_arch),
kernel_size=5,
reduction=4,
mode=end_mode)
# build point net
if point_len == 0:
print("No point cloud used")
self.point_net = None
elif point_arch in ['v1']:
point_net = eval("PointNet_%s" % point_arch)
self.point_net = point_net(
point_in_channels,
out_channels=point_len,
use_dropout=use_dropout)
else:
print("Not implemented!!")
# build affinity matrix module
assert in_channels != 0
self.w_link = affinity_module(
in_channels, new_end=new_end, affinity_op=affinity_op)
# build negative rejection module
if score_arch in ['branch_cls', 'branch_reg']:
self.w_det = nn.Sequential(
nn.Conv1d(in_channels, in_channels, 1, 1),
nn.BatchNorm1d(in_channels),
nn.ReLU(inplace=True),
nn.Conv1d(in_channels, in_channels // 2, 1, 1),
nn.BatchNorm1d(in_channels // 2),
nn.ReLU(inplace=True),
nn.Conv1d(in_channels // 2, 1, 1, 1),
)
else:
print("Not implement yet")
# 关联函数,不同模态的关联score的融合,包括只有一个模态情况下的直接输出、相乘、相加和max4种操作
def associate(self, objs, dets):
link_mat, new_score, end_score = self.w_link(objs, dets)
if self.softmax_mode == 'single':
link_score = F.softmax(link_mat, dim=-1)
elif self.softmax_mode == 'dual':
link_score_prev = F.softmax(link_mat, dim=-1)
link_score_next = F.softmax(link_mat, dim=-2)
link_score = link_score_prev.mul(link_score_next)
elif self.softmax_mode == 'dual_add':
link_score_prev = F.softmax(link_mat, dim=-1)
link_score_next = F.softmax(link_mat, dim=-2)
link_score = (link_score_prev + link_score_next) / 2
elif self.softmax_mode == 'dual_max':
link_score_prev = F.softmax(link_mat, dim=-1)
link_score_next = F.softmax(link_mat, dim=-2)
link_score = torch.max(link_score_prev, link_score_next)
else:
link_score = link_mat
return link_score, new_score, end_score
# 特征获取模块
def feature(self, dets, det_info):
feats = []
# 表观特征获取
if self.appearance is not None:
appear = self.appearance(dets)
feats.append(appear)
# 点云特征获取,trans是pointnet中为了旋转不变形设计的T-net对应的转换矩阵
trans = None
if self.point_net is not None:
points, trans = self.point_net(
det_info['points'].transpose(-1, -2),
det_info['points_split'].long().squeeze(0))
feats.append(points)
# 特征融合模块,融合模型有ABC3种,后面会有详细介绍
feats = torch.cat(feats, dim=-1).t().unsqueeze(0) # LxD->1xDxL
if self.fusion_module is not None:
feats = self.fusion_module(feats)
return feats, trans
return feats, trans
# 获取det_scores
def determine_det(self, dets, feats):
det_scores = self.w_det(feats).squeeze(1) # Bx1xL -> BxL
if not self.training:
# add mask
if 'cls' in self.score_arch:
det_scores = det_scores.sigmoid()
mask = det_scores.lt(self.neg_threshold)
det_scores -= mask.float()
return det_scores
def forward(self, dets, det_info, dets_split):
feats, trans = self.feature(dets, det_info)
det_scores = self.determine_det(dets, feats)
start = 0
link_scores = []
new_scores = []
end_scores = []
for i in range(len(dets_split) - 1):
prev_end = start + dets_split[i].item()
end = prev_end + dets_split[i + 1].item()
link_score, new_score, end_score = self.associate(
feats[:, :, start:prev_end], feats[:, :, prev_end:end])
link_scores.append(link_score.squeeze(1))
new_scores.append(new_score)
end_scores.append(end_score)
start = prev_end
if not self.training:
fake_new = det_scores.new_zeros(
(det_scores.size(0), link_scores[0].size(-2)))
fake_end = det_scores.new_zeros(
(det_scores.size(0), link_scores[-1].size(-1)))
new_scores = torch.cat([fake_new] + new_scores, dim=1)
end_scores = torch.cat(end_scores + [fake_end], dim=1)
else:
new_scores = torch.cat(new_scores, dim=1)
end_scores = torch.cat(end_scores, dim=1)
return det_scores, link_scores, new_scores, end_scores, trans
appear_net.py
获取图像的表观特征,可以选择使用VGG模型或ResNet模型;
特征获取的过程中可以选择使用skipPool(减少网络层数)或FPN(特征金字塔,提高精度),这两个优化方式需要在速度和精度之间做权衡。
class AppearanceNet(nn.Module):
def __init__(self,
arch='vgg',
out_channels=512,
skippool=True,
fpn=False,
dropblock=5):
if arch == 'vgg':
base_channel = 64 // reduction
vgg_net = eval("vgg16_bn_%s" % str(out_channels))
loaded_model = vgg_net()
if skippool:
print("use Skip Pooling in appearance model")
self.layers, self.global_pool = self._parse_vgg_layers(
loaded_model)
elif arch == 'resnet50':
loaded_model = torchvision.models.resnet50(pretrained=True)
base_channel = 256
self.layers = Resnet(loaded_model)
if skippool:
print("use Skip Pooling in appearance model")
self.global_pool = self._parse_res_layers(4)
elif arch == 'resnet101':
print("use resnet101")
loaded_model = torchvision.models.resnet101(pretrained=True)
base_channel = 256
self.layers = Resnet(loaded_model)
if skippool:
print("use Skip Pooling in appearance model")
self.global_pool = self._parse_res_layers(4)
elif arch == 'resnet152':
print("use resnet152")
loaded_model = torchvision.models.resnet152(pretrained=True)
base_channel = 256
self.layers = Resnet(loaded_model)
if skippool:
print("use Skip Pooling in appearance model")
self.global_pool = self._parse_res_layers(4)
if fpn:
print("use FPN in appearance model")
# FPN Module
...
if not skippool and not fpn:
...
def forward(self, x):
if self.arch == 'vgg':
feats = self.vgg_forward(x)
else:
feats = self.res_forward(x)
if self.skippool:
...
if self.fpn:
...
else:
...
out = self.conv_last(out).squeeze(-1).squeeze(-1) # NxCx1x1 -> N*C
return out
point_net.py
用于获取点云的表观特征,使用了pointnet的一部分,只需要得到全局特征,不需要得到最终的分类结果;
pointnet原文使用最大池化得到全局特征,本文使用平均池化得到全局特征;
返回值包括out和trans两部分,out是得到的全局向量,trans是点云不变性的转移矩阵。
class PointNet_v1(nn.Module):
def __init__(self, in_channels, out_channels=512, use_dropout=False):
super(PointNet_v1, self).__init__()
# 特征获取网络初始化
self.feat = PointNetfeatGN(in_channels, out_channels)
reduction = 512 // out_channels
self.reduction = reduction
self.conv1 = torch.nn.Conv1d(1088 // reduction, 512 // reduction, 1)
self.conv2 = torch.nn.Conv1d(512 // reduction, out_channels, 1)
self.bn1 = nn.GroupNorm(512 // reduction, 512 // reduction)
self.bn2 = nn.GroupNorm(16 // reduction, out_channels)
self.out_channels = out_channels
self.relu = nn.ReLU(inplace=True)
self.avg_pool = nn.AdaptiveAvgPool1d(1)
self.avg_bn = nn.GroupNorm(512 // reduction, 512 // reduction)
self.dropout = None
if use_dropout:
print("Use dropout in pointnet")
self.dropout = nn.Dropout(p=0.5)
def forward(self, x, point_split):
x, trans = self.feat(x, point_split)
x = torch.cat(x, dim=1)
x = self.relu(self.bn1(self.conv1(x)))
if self.dropout is not None:
x = self.dropout(x)
max_feats = []
for i in range(len(point_split) - 1):
start = point_split[i].item()
end = point_split[i + 1].item()
max_feat = self.avg_pool(x[:, :, start:end])
max_feats.append(max_feat.view(-1, 512 // self.reduction, 1))
max_feats = torch.cat(max_feats, dim=-1)
out = self.relu(self.bn2(self.conv2(max_feats))).transpose(
-1, -2).squeeze(0)
assert out.size(0) == len(point_split) - 1
return out, trans
# 点云不变性的转换网络
class STN3d(nn.Module):
#...
#特征获取网络结构
class PointNetfeatGN(nn.Module):
def __init__(self, in_channels=3, out_channels=512, global_feat=True):
#...
def forward(self, x, point_split):
#...
max_feats = []
for i in range(len(point_split) - 1):
start = point_split[i].item()
end = point_split[i + 1].item()
#平均池化求全局特征
max_feat = self.avg_pool(x[:, :, start:end])
max_feats.append(
max_feat.view(-1, 1024 // self.reduction,
1).repeat(1, 1, end - start))
max_feats = torch.cat(max_feats, dim=-1)
assert max_feats.size(-1) == x.size(-1)
conv_out.append(max_feats)
return conv_out, trans
fusion_net.py
提出了3种融合模型,返回值为融合后的特征和原始的特征,三种融合模型如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-chm72Mc5-1600938557759)(pics/image-20200924094128790.png)]
-
融合模型C:加入了“注意力机制”,不同类型的传感器在不同场景下的贡献权重不同,如黑暗时image权重应该降低,因此先计算不同模态特征的attention value,再进行融合
“sigmoid weight gated point,image fusion”
-
融合模型B:没有模态C注意力加权那一步,先对每个模态下的特征进行加权,然后直接加和得到最终的特征
-
融合模型A:先concat不同模态下的特征,这样特征向量会变长,再用W将特征变回原来的长度
# Common fusion module
# 融合模块C
class fusion_module_C(nn.Module):
def __init__(self, appear_len, point_len, out_channels):
super(fusion_module_C, self).__init__()
print(
"Fusion Module C: split sigmoid weight gated point, image fusion")
self.appear_len = appear_len
self.point_len = point_len
# 注意力计算网络
self.gate_p = nn.Sequential(
nn.Conv1d(point_len, point_len, 1, 1),
nn.Sigmoid(),
)
self.gate_i = nn.Sequential(
nn.Conv1d(appear_len, appear_len, 1, 1),
nn.Sigmoid(),
)
# 特征加权网络
self.input_p = nn.Sequential(
nn.Conv1d(point_len, out_channels, 1, 1),
nn.GroupNorm(out_channels, out_channels),
)
self.input_i = nn.Sequential(
nn.Conv1d(appear_len, out_channels, 1, 1),
nn.GroupNorm(out_channels, out_channels),
)
def forward(self, objs):
"""
objs : 1xDxN
"""
feats = objs.view(2, -1, objs.size(-1)) # 1x2DxL -> 2xDxL
# 计算不同模态的注意力
gate_p = self.gate_p(feats[:1]) # 2xDxL
gate_i = self.gate_i(feats[1:]) # 2xDxL
# 特征融合
obj_fused = gate_p.mul(self.input_p(feats[:1])) + gate_i.mul(
self.input_i(feats[1:]))
obj_feats = torch.cat([feats, obj_fused.div(gate_p + gate_i)], dim=0)
return obj_feats
# 融合模块B
class fusion_module_B(nn.Module):
def __init__(self, appear_len, point_len, out_channels):
super(fusion_module_B, self).__init__()
print("Fusion Module B: point, weighted image"
"& linear fusion, with split input w")
self.appear_len = appear_len
self.point_len = point_len
self.input_p = nn.Sequential(
nn.Conv1d(out_channels, out_channels, 1, 1),
nn.GroupNorm(out_channels, out_channels),
)
self.input_i = nn.Sequential(
nn.Conv1d(out_channels, out_channels, 1, 1),
nn.GroupNorm(out_channels, out_channels),
)
def forward(self, objs):
"""
objs : 1xDxN
"""
feats = objs.view(2, -1, objs.size(-1)) # 1x2DxL -> 2xDxL
obj_fused = self.input_p(feats[:1]) + self.input_i(feats[1:])
obj_feats = torch.cat([feats, obj_fused], dim=0)
return obj_feats
# 融合模块A
class fusion_module_A(nn.Module):
def __init__(self, appear_len, point_len, out_channels):
super(fusion_module_A, self).__init__()
print("Fusion Module A: concatenate point, image & linear fusion")
self.appear_len = appear_len
self.point_len = point_len
self.input_w = nn.Sequential(
nn.Conv1d(out_channels * 2, out_channels, 1, 1),
nn.GroupNorm(out_channels, out_channels),
)
def forward(self, objs):
"""
objs : 1xDxN
"""
feats = objs.view(2, -1, objs.size(-1)) # 1x2DxL -> 2xDxL
obj_fused = self.input_w(objs) # 1x2DxL -> 1xDxL
obj_feats = torch.cat([feats, obj_fused], dim=0)
return obj_feats
gcn.py
图卷积网络
- 根据融合后的特征计算它们的特征关联度,即计算correlation feature,我认为也可以理解为在求边权
- 相乘,1*1的卷积核进行卷积,文中使用的是这个方法
- 相减
- 相减求绝对值
-
相似度矩阵计算:即所谓的图卷积
- 上一步得到的"边权":x=self.affinity(objs,dets)
- 图卷积:out=self.conv(x),可以看作是对二部图进行卷积
# Similarity function
# 卷积方式/相乘方式计算边权
def batch_multiply(objs, dets):
x = torch.einsum('bci,bcj->bcij', objs, dets)
return x
# 相减求绝对值方式计算边权
def batch_minus_abs(objs, dets):
obj_mat = objs.unsqueeze(-1).repeat(1, 1, 1, dets.size(-1)) # BxDxNxM
det_mat = dets.unsqueeze(-2).repeat(1, 1, objs.size(-1), 1) # BxDxNxM
related_pos = (obj_mat - det_mat) / 2 # BxDxNxM
x = related_pos.abs() # Bx2DxNxM
return x
# 相减方式计算边权
def batch_minus(objs, dets):
obj_mat = objs.unsqueeze(-1).repeat(1, 1, 1, dets.size(-1)) # BxDxNxM
det_mat = dets.unsqueeze(-2).repeat(1, 1, objs.size(-1), 1) # BxDxNxM
related_pos = (obj_mat - det_mat) / 2 # BxDxNxM
return related_pos
# GCN
class affinity_module(nn.Module):
def __init__(self, in_channels, new_end, affinity_op='multiply'):
super(affinity_module, self).__init__()
print(f"Use {affinity_op} similarity with fusion module")
self.in_channels = in_channels
expansion = 1
if affinity_op in ['multiply', 'minus', 'minus_abs']:
self.affinity = eval(f"batch_{affinity_op}")
else:
print("Not Implement!!")
# new score和end score的计算,new_end.py也在计算,不知道这两个的区别
self.w_new_end = new_end(in_channels * expansion)
# 图卷积网络
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels * expansion, in_channels, 1, 1),
nn.GroupNorm(in_channels, in_channels), nn.ReLU(inplace=True),
nn.Conv2d(in_channels, in_channels, 1, 1),
nn.GroupNorm(in_channels, in_channels), nn.ReLU(inplace=True),
nn.Conv2d(in_channels, in_channels // 4, 1, 1),
nn.GroupNorm(in_channels // 4, in_channels // 4),
nn.ReLU(inplace=True), nn.Conv2d(in_channels // 4, 1, 1, 1))
def forward(self, objs, dets):
"""
objs : 1xDxN
dets : 1xDxM
obj_feats: 3xDxN
det_feats: 3xDxN
"""
# if self.fusion_net is not None:
# objs = self.fusion_net(objs)
# dets = self.fusion_net(dets)
# 计算边权
x = self.affinity(objs, dets)
# new score和end score的计算
new_score, end_score = self.w_new_end(x)
# 图卷积
out = self.conv1(x)
return out, new_score, end_score
new_end.py
提出了两种方法求解new score和end score
方法1:首先用平均池化或最大池化得到用于计算score和向量new_vec和end_vec,然后分别使用对应的卷积层计算new score和end score
方法2:首先对完整的特征向量x进行卷积,然后去卷积后的x的平均值或最大值得到用于计算score和向量new_vec和end_vec,最后分别使用对应的卷积层计算new score和end score
两种方法输出向量的维度不同
class NewEndIndicator_v1(nn.Module):
def __init__(self, in_channels, kernel_size, reduction, mode='avg'):
super(NewEndIndicator_v1, self).__init__()
self.mode = mode
self.w_end_conv = nn.Sequential(
nn.GroupNorm(1, in_channels),
nn.Conv2d(in_channels, in_channels // reduction, 1, 1),
nn.GroupNorm(1, in_channels // reduction),
nn.ReLU(inplace=True),
nn.Conv2d(in_channels // reduction, 1, 1, 1),
)
self.w_new_conv = nn.Sequential(
nn.GroupNorm(1, in_channels),
nn.Conv2d(in_channels, in_channels // reduction, 1, 1),
nn.GroupNorm(1, in_channels // reduction),
nn.ReLU(inplace=True),
nn.Conv2d(in_channels // reduction, 1, 1, 1),
)
def forward(self, x):
"""
x: 1xCxNxM
w_new: Mx1
w_end: Nx1
"""
if self.mode == 'avg':
new_vec = F.adaptive_avg_pool2d(x, (1, x.size(-1)))
end_vec = F.adaptive_avg_pool2d(x, (x.size(-2), 1))
else:
new_vec = F.adaptive_max_pool2d(x, (1, x.size(-1)))
end_vec = F.adaptive_max_pool2d(x, (x.size(-2), 1))
w_new = 1 - self.w_new_conv(new_vec).view((new_vec.size(-1), -1))
w_end = 1 - self.w_end_conv(end_vec).view((end_vec.size(-2), -1))
return w_new, w_end
class NewEndIndicator_v2(nn.Module):
def __init__(self, in_channels, kernel_size, reduction, mode='avg'):
super(NewEndIndicator_v2, self).__init__()
self.mode = mode
self.conv0 = nn.Sequential(
nn.Conv2d(in_channels, in_channels, 1, 1),
nn.GroupNorm(1, in_channels),
nn.ReLU(inplace=True),
)
self.conv1 = nn.Sequential(
nn.Conv1d(in_channels, min(in_channels, 512), 1, 1),
nn.GroupNorm(1, min(in_channels, 512)), nn.ReLU(inplace=True),
nn.Conv1d(min(in_channels, 512), in_channels // reduction, 1, 1),
nn.GroupNorm(1, in_channels // reduction), nn.ReLU(inplace=True),
nn.Conv1d(in_channels // reduction, 1, 1, 1), nn.Sigmoid())
print(f"End version V2 by {mode}")
print(self)
def forward(self, x):
"""
x: BxCxNxM
w_new: BxM
w_end: BxN
"""
x = self.conv0(x)
if self.mode == 'avg':
new_vec = x.mean(dim=-2, keepdim=False) # 1xCxM
end_vec = x.mean(dim=-1, keepdim=False) # 1xCxN
else:
new_vec = x.max(dim=-2, keepdim=False)[0] # 1xCxM
end_vec = x.max(dim=-1, keepdim=False)[0] # 1xCxN
w_new = self.conv1(new_vec).squeeze(1) # BxCxM->Bx1xM->BxM
w_end = self.conv1(end_vec).squeeze(1) # BxCxN->Bx1xN->BxN
return w_new, w_end
score_net.py
用于求detection score,不再详细介绍
ghm_loss.py
计算detection score的损失,再cost.py中用到了,不再详细介绍
dropblock.py
用于skipPool操作,不再详细介绍
5.总结:
这个框架整体的架构很清晰,感觉可以尝试自己增添一些模块进去,不同的模块都是在做一些简单的卷积操作,最后的线性规划模块也可以用其它模型替代,可以尝试能不能把fantrack中的预测及关联模型的结构加进来,也可以尝试加入运动特征和bbox特征,并加入注意力机制,聚合其它车辆的信息。
推荐阅读
-
PHP学习笔记 (1) 环境配置与代码调试_php技巧
-
CSP代码阅读挖坑记录
-
cell结构代码阅读记录
-
《第一行代码》阅读笔记(八)——自定义控件
-
JavaScript高级程序设计 阅读笔记(十八) js跨平台的事件_javascript技巧
-
【延伸阅读】让老照片重现光彩(五):Pix2PixHD模型源代码+中文注释
-
CI框架源码阅读笔记8 控制器Controller.php
-
韩顺平_PHP软件工程师玩转算法公开课(第一季)01_算法重要性_五子棋算法_汉诺塔_回溯算法_学习笔记_源代码图解_PPT文档整理
-
毫无排版和缩进的 JavaScript 代码,怎么阅读?
-
CI框架源码阅读笔记8 控制器Controller.php_PHP教程