欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mmMOT代码阅读笔记

程序员文章站 2022-04-16 17:00:48
...

1.mmMOT系统框架图

mmMOT代码阅读笔记
mmMOT代码阅读笔记

2.我理解的框架图

mmMOT代码阅读笔记

3.代码结构

mmMOT代码阅读笔记

4.代码解读

自上往下的顺序,从main文件开始,逐渐详细

main.py

程序主文件,用来训练和验证模型

  1. 获取配置文件,追踪模型初始化

    global args, config, best_mota
        args = parser.parse_args()
    
        with open(args.config) as f:
            config = yaml.load(f, Loader=yaml.FullLoader)
    
        config = EasyDict(config['common'])
        config.save_path = os.path.dirname(args.config)
    
    model = build_model(config)
    optimizer = build_optim(model, config)
    criterion = build_criterion(config.loss)
    
    tracking_module = TrackingModule(model, optimizer, criterion,config.det_type)
    
  2. 数据加载(dataset文件夹)

    # Data loading code
        train_transform, valid_transform = build_augmentation(config.augmentation)
    
        # train
        train_dataset = build_dataset(
            config,
            set_source='train',
            evaluate=False,
            train_transform=train_transform)
        trainval_dataset = build_dataset(
            config,
            set_source='train',
            evaluate=True,
            valid_transform=valid_transform)
        val_dataset = build_dataset(
            config,
            set_source='val',
            evaluate=True,
            valid_transform=valid_transform)
    
        train_sampler = DistributedGivenIterationSampler(
            train_dataset,
            config.lr_scheduler.max_iter,
            config.batch_size,
            world_size=1,
            rank=0,
            last_iter=last_iter)
    
        train_loader = DataLoader(
            train_dataset,
            batch_size=config.batch_size,
            shuffle=False,
            num_workers=config.workers,
            pin_memory=True,
            sampler=train_sampler)
    
  3. 训练

    def train(train_loader, val_loader, trainval_loader, tracking_module, lr_scheduler, start_iter, tb_logger):
    
    # 前向传播
    # forward
    loss = tracking_module.step(input.squeeze(0), det_info, det_id, det_cls, det_split)
    
  4. 验证

    def validate(val_loader,
                 tracking_module,
                 step,
                 part='train',
                 fusion_list=None,
                 fuse_prob=False):
    

tracking_model.py

追踪模型主文件,模式转换、训练过程、预测过程

  1. 模式转换

    训练模式or评估模式

    #评估模式
    def eval(self):
        if isinstance(self.model, list):
            for i in range(len(self.model)):
              self.model[i].eval()
        else:
            self.model.eval()
        self.clear_mem()
        return
    
    #训练模式
    def train(self):
        if isinstance(self.model, list):
            for i in range(len(self.model)):
                self.model[i].train()
        else:
            self.model.train()
        self.clear_mem()
        return
    
  2. step(),训练函数,用于模型训练,得到4种score后计算损失并反向传播

    def step(self, det_img, det_info, det_id, det_cls, det_split):
        # model是tracking_net搭建起来的完整的训练模型,经过模型计算得到4种score
        det_score, link_score, new_score, end_score, trans = self.model(
            det_img, det_info, det_split)
        # generate gt_y
        gt_det, gt_link, gt_new, gt_end = self.generate_gt(
            det_score[0], det_cls, det_id, det_split)
    
        # calculate loss
        # loss值,由cost.py模块计算得到
        loss = self.criterion(det_split, gt_det, gt_link, gt_new, gt_end,
                              det_score, link_score, new_score, end_score,
                              trans)
    
        # 反向传播并优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
        return loss
    
  3. predict(),得到追踪结果,step()计算出来score之后直接反向传播,不必得到最终的追踪结果,predict根据得到的4种分数使用线性规划方法求解最终的结果

    def predict(self, det_imgs, det_info, dets, det_split):
        # model是tracking_net搭建起来的完整的训练模型,经过模型计算得到4种score
        det_score, link_score, new_score, end_score, _ = self.model(
            det_imgs, det_info, det_split)
    
        # ortools_solve是线性规划模块,在solvers.py模块中,用于根据4种score进行关联,得到追踪结果
        # 我觉得这一部分的内容也可以用贪婪匹配、匈牙利算法或者神经网络替代
        assign_det, assign_link, assign_new, assign_end = ortools_solve(
            det_score[self.test_mode],
            [link_score[0][self.test_mode:self.test_mode + 1]],
            new_score[self.test_mode], end_score[self.test_mode], det_split)
    
        assign_id, assign_bbox = self.assign_det_id(assign_det, assign_link,
                                                        assign_new, assign_end,
                                                        det_split, dets)
        aligned_ids, aligned_dets, frame_start = self.align_id(
            assign_id, assign_bbox)
    
        return aligned_ids, aligned_dets, frame_start
    

solvers.py

线性规划模块,以4种估计器估计得到的score作为输入,得到最终的关联结果

# 函数头,输入为估计器得到的估计结果,约束条件是当前帧的所有识别结果要么是新轨迹的开始,要么和之前的轨迹相连,或者要么是旧轨迹的结束,要么和之后的轨迹相连
def ortools_solve(det_score,
                  link_score,
                  new_score,
                  end_score,
                  det_split,
                  gt=None):
    ......

# 返回值为关联结果
	return assign_det, assign_link, assign_new, assign_end

cost.py

损失函数定义模块,损失函数计算公式为:

mmMOT代码阅读笔记

  • CostLoss类、NoDistanceLoss类、DistanceLoss类(我认为这几个类可能是用于做消融实验的);

  • DetLoss类,用于计算detection score和new score和end score的损失,可以使用binary cross entropy损失、l1损失、l2损失和ghm损失,其中ghm损失在module/ghm_loss中定义;

  • LinkLoss类,用于计算关联损失,可以使用l1损失或l2损失。

  • TrackingLoss类,用于计算完整框架的loss,用于模型训练时的反向传播。

# LinkLoss类,两类损失函数初始化
if 'l2' in loss_type:
    self.l2_loss = nn.MSELoss()
if 'l1' in loss_type:
    print("Use smooth l1 loss for link")
    self.l1_loss = nn.SmoothL1Loss()
    
# 计算LinkLoss
def forward(self, det_split, gt_det, link_score, gt_link):
    if 'l2' in self.loss_type:
        loss += self.l2_loss(link_score[i].mul(mask),gt_link[i].repeat(mask.size(0), 1, 1))
    if 'l1' in self.loss_type:
        loss += self.l1_loss(link_score[i].mul(mask),gt_link[i].repeat(mask.size(0), 1, 1))
    return loss
# DetLoss类,用于计算det loss、new loss、end loss,可以使用4种损失函数
def forward(self, det_score, gt_score):
    """

    :param det_score: 3xL
    :param gt_score: L
    :return: loss
    """
    gt_score = gt_score.unsqueeze(0).repeat(det_score.size(0), 1)
    if 'bce' in self.loss_type:
        loss = F.binary_cross_entropy_with_logits(det_score, gt_score)
    if 'l2' in self.loss_type:
        mask = 1 - gt_score.eq(self.ignore_index)
        loss = F.mse_loss(det_score.mul(mask.float()), gt_score)
    if 'l1' in self.loss_type:
        mask = 1 - gt_score.eq(self.ignore_index)
        loss = F.smooth_l1_loss(det_score.mul(mask.float()), gt_score)
    # from modules.ghm_loss import GHMC_Loss
    # self.GHMC_Loss = GHMC_Loss(bins=30, momentum=0.75)
    if 'ghm' in self.loss_type:
        mask = 1 - gt_score.eq(self.ignore_index)
        loss = self.GHMC_Loss(det_score, gt_score, mask) 
    return loss
# TrackingLoss类,计算完整损失,定义不同损失的权重
# 参数初始化,det、new、end权重都设为0.4
def __init__(self,
             smooth_ratio=0,
             detloss_type='bce',
             endloss_type='l2',
             det_ratio=0.4,
             trans_ratio=0.4,
             trans_last=False,
             linkloss_type='l2_softmax'):

# 损失函数
def forward(self,
            det_split,
            gt_det,
            gt_link,
            gt_new,
            gt_end,
            det_score,
            link_score,
            new_score,
            end_score,
            trans=None):

    loss = self.det_loss(det_score, gt_det) * self.det_ratio
    loss += self.end_loss(new_score, gt_new[det_split[0]:]) * 0.4
    loss += self.end_loss(end_score, gt_end[:-det_split[-1]]) * 0.4
    loss += self.link_loss(det_split, gt_det, link_score, gt_link)
	
    # 这里还包括对转换的一些处理,代码没粘过来
    
    return loss

tracking_net.py

完整的追踪模型框架,把所有模块整合起来得到完整的追踪框架

  • def init

    参数:一些设置项,如特征向量长度、模型类型和其它设置项

    初始化

    • 表观模型,图像特征的提取,VGG/RES
    • pointnet,点云特征的提取
    • 融合模型,A/B/C
    • new indicator&end indicator,V1/V2
    • affinity_module,邻接矩阵模型
    • negative rejection module,计算detection score
  • def associate

    将affinity_module得到的不同模态的关联分数根据softmax_mode进行汇合,包括直接输出、相乘、相加、求最大4种方式,得到最终的link_score/new_score/end_score

  • def feature

    特征获取模块,分别获取表观特征和点云特征,再使用相应的模块进行融合

  • def determine_det

    求detection score

  • def forward

    根据划分的数据集,求每帧对应的det_scores/link_scores/new_scores/end_scores

class TrackingNet
	# 初始化函数
    def __init__(self,
                 seq_len,
                 appear_len=512,
                 appear_skippool=False,
                 appear_fpn=False,
                 score_arch='vgg',
                 score_fusion_arch='C',
                 appear_arch='vgg',
                 point_arch='v1',
                 point_len=512,
                 softmax_mode='single',
                 test_mode=0,
                 affinity_op='multiply',
                 dropblock=5,
                 end_arch='v2',
                 end_mode='avg',
                 without_reflectivity=True,
                 neg_threshold=0,
                 use_dropout=False):
        # build new end indicator
        if end_arch in ['v1', 'v2']:
            new_end = partial(
                eval("NewEndIndicator_%s" % end_arch),
                kernel_size=5,
                reduction=4,
                mode=end_mode)

        # build point net
        if point_len == 0:
            print("No point cloud used")
            self.point_net = None
        elif point_arch in ['v1']:
            point_net = eval("PointNet_%s" % point_arch)
            self.point_net = point_net(
                point_in_channels,
                out_channels=point_len,
                use_dropout=use_dropout)
        else:
            print("Not implemented!!")

        # build affinity matrix module
        assert in_channels != 0
        self.w_link = affinity_module(
            in_channels, new_end=new_end, affinity_op=affinity_op)

        # build negative rejection module
        if score_arch in ['branch_cls', 'branch_reg']:
            self.w_det = nn.Sequential(
                nn.Conv1d(in_channels, in_channels, 1, 1),
                nn.BatchNorm1d(in_channels),
                nn.ReLU(inplace=True),
                nn.Conv1d(in_channels, in_channels // 2, 1, 1),
                nn.BatchNorm1d(in_channels // 2),
                nn.ReLU(inplace=True),
                nn.Conv1d(in_channels // 2, 1, 1, 1),
            )
        else:
            print("Not implement yet")
            
       # 关联函数,不同模态的关联score的融合,包括只有一个模态情况下的直接输出、相乘、相加和max4种操作
       def associate(self, objs, dets):
        link_mat, new_score, end_score = self.w_link(objs, dets)

        if self.softmax_mode == 'single':
            link_score = F.softmax(link_mat, dim=-1)
        elif self.softmax_mode == 'dual':
            link_score_prev = F.softmax(link_mat, dim=-1)
            link_score_next = F.softmax(link_mat, dim=-2)
            link_score = link_score_prev.mul(link_score_next)
        elif self.softmax_mode == 'dual_add':
            link_score_prev = F.softmax(link_mat, dim=-1)
            link_score_next = F.softmax(link_mat, dim=-2)
            link_score = (link_score_prev + link_score_next) / 2
        elif self.softmax_mode == 'dual_max':
            link_score_prev = F.softmax(link_mat, dim=-1)
            link_score_next = F.softmax(link_mat, dim=-2)
            link_score = torch.max(link_score_prev, link_score_next)
        else:
            link_score = link_mat

        return link_score, new_score, end_score

    # 特征获取模块
    def feature(self, dets, det_info):
        feats = []

        # 表观特征获取
        if self.appearance is not None:
            appear = self.appearance(dets)
            feats.append(appear)

        # 点云特征获取,trans是pointnet中为了旋转不变形设计的T-net对应的转换矩阵
        trans = None
        if self.point_net is not None:
            points, trans = self.point_net(
                det_info['points'].transpose(-1, -2),
                det_info['points_split'].long().squeeze(0))
            feats.append(points)

        # 特征融合模块,融合模型有ABC3种,后面会有详细介绍
        feats = torch.cat(feats, dim=-1).t().unsqueeze(0)  # LxD->1xDxL
        if self.fusion_module is not None:
            feats = self.fusion_module(feats)
            return feats, trans

        return feats, trans

    # 获取det_scores
    def determine_det(self, dets, feats):
        det_scores = self.w_det(feats).squeeze(1)  # Bx1xL -> BxL

        if not self.training:
            # add mask
            if 'cls' in self.score_arch:
                det_scores = det_scores.sigmoid()

            mask = det_scores.lt(self.neg_threshold)
            det_scores -= mask.float()
        return det_scores

    def forward(self, dets, det_info, dets_split):
        feats, trans = self.feature(dets, det_info)
        det_scores = self.determine_det(dets, feats)

        start = 0
        link_scores = []
        new_scores = []
        end_scores = []
        for i in range(len(dets_split) - 1):
            prev_end = start + dets_split[i].item()
            end = prev_end + dets_split[i + 1].item()
            link_score, new_score, end_score = self.associate(
                feats[:, :, start:prev_end], feats[:, :, prev_end:end])
            link_scores.append(link_score.squeeze(1))
            new_scores.append(new_score)
            end_scores.append(end_score)
            start = prev_end

        if not self.training:
            fake_new = det_scores.new_zeros(
                (det_scores.size(0), link_scores[0].size(-2)))
            fake_end = det_scores.new_zeros(
                (det_scores.size(0), link_scores[-1].size(-1)))
            new_scores = torch.cat([fake_new] + new_scores, dim=1)
            end_scores = torch.cat(end_scores + [fake_end], dim=1)
        else:
            new_scores = torch.cat(new_scores, dim=1)
            end_scores = torch.cat(end_scores, dim=1)
        return det_scores, link_scores, new_scores, end_scores, trans
 

appear_net.py

获取图像的表观特征,可以选择使用VGG模型或ResNet模型;

特征获取的过程中可以选择使用skipPool(减少网络层数)或FPN(特征金字塔,提高精度),这两个优化方式需要在速度和精度之间做权衡。

class AppearanceNet(nn.Module):

    def __init__(self,
                 arch='vgg',
                 out_channels=512,
                 skippool=True,
                 fpn=False,
                 dropblock=5):

        if arch == 'vgg':
            base_channel = 64 // reduction
            vgg_net = eval("vgg16_bn_%s" % str(out_channels))
            loaded_model = vgg_net()
            if skippool:
                print("use Skip Pooling in appearance model")
                self.layers, self.global_pool = self._parse_vgg_layers(
                    loaded_model)
        elif arch == 'resnet50':
            loaded_model = torchvision.models.resnet50(pretrained=True)
            base_channel = 256
            self.layers = Resnet(loaded_model)
            if skippool:
                print("use Skip Pooling in appearance model")
                self.global_pool = self._parse_res_layers(4)
        elif arch == 'resnet101':
            print("use resnet101")
            loaded_model = torchvision.models.resnet101(pretrained=True)
            base_channel = 256
            self.layers = Resnet(loaded_model)
            if skippool:
                print("use Skip Pooling in appearance model")
                self.global_pool = self._parse_res_layers(4)
        elif arch == 'resnet152':
            print("use resnet152")
            loaded_model = torchvision.models.resnet152(pretrained=True)
            base_channel = 256
            self.layers = Resnet(loaded_model)
            if skippool:
                print("use Skip Pooling in appearance model")
                self.global_pool = self._parse_res_layers(4)
        if fpn:
            print("use FPN in appearance model")
            # FPN Module
            ...

        if not skippool and not fpn:
            ...
        
	def forward(self, x):
        if self.arch == 'vgg':
            feats = self.vgg_forward(x)
        else:
            feats = self.res_forward(x)

        if self.skippool:
            ...

        if self.fpn:
            ...
        else:
            ...

        out = self.conv_last(out).squeeze(-1).squeeze(-1)  # NxCx1x1 -> N*C
        return out

point_net.py

用于获取点云的表观特征,使用了pointnet的一部分,只需要得到全局特征,不需要得到最终的分类结果;

pointnet原文使用最大池化得到全局特征,本文使用平均池化得到全局特征;

返回值包括out和trans两部分,out是得到的全局向量,trans是点云不变性的转移矩阵。

class PointNet_v1(nn.Module):

    def __init__(self, in_channels, out_channels=512, use_dropout=False):
        super(PointNet_v1, self).__init__()
        # 特征获取网络初始化
        self.feat = PointNetfeatGN(in_channels, out_channels)
        reduction = 512 // out_channels
        self.reduction = reduction
        self.conv1 = torch.nn.Conv1d(1088 // reduction, 512 // reduction, 1)
        self.conv2 = torch.nn.Conv1d(512 // reduction, out_channels, 1)
        self.bn1 = nn.GroupNorm(512 // reduction, 512 // reduction)
        self.bn2 = nn.GroupNorm(16 // reduction, out_channels)
        self.out_channels = out_channels
        self.relu = nn.ReLU(inplace=True)
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.avg_bn = nn.GroupNorm(512 // reduction, 512 // reduction)
        self.dropout = None
        if use_dropout:
            print("Use dropout in pointnet")
            self.dropout = nn.Dropout(p=0.5)

    def forward(self, x, point_split):
        x, trans = self.feat(x, point_split)
        x = torch.cat(x, dim=1)
        x = self.relu(self.bn1(self.conv1(x)))
        if self.dropout is not None:
            x = self.dropout(x)

        max_feats = []
        for i in range(len(point_split) - 1):
            start = point_split[i].item()
            end = point_split[i + 1].item()
            max_feat = self.avg_pool(x[:, :, start:end])
            max_feats.append(max_feat.view(-1, 512 // self.reduction, 1))

        max_feats = torch.cat(max_feats, dim=-1)
        out = self.relu(self.bn2(self.conv2(max_feats))).transpose(
            -1, -2).squeeze(0)
        assert out.size(0) == len(point_split) - 1

        return out, trans

# 点云不变性的转换网络
class STN3d(nn.Module):
    #...

#特征获取网络结构
class PointNetfeatGN(nn.Module):

    def __init__(self, in_channels=3, out_channels=512, global_feat=True):
        #...

    def forward(self, x, point_split):
        #...
        max_feats = []
        for i in range(len(point_split) - 1):
            start = point_split[i].item()
            end = point_split[i + 1].item()
            #平均池化求全局特征
            max_feat = self.avg_pool(x[:, :, start:end])
            max_feats.append(
                max_feat.view(-1, 1024 // self.reduction,
                              1).repeat(1, 1, end - start))

        max_feats = torch.cat(max_feats, dim=-1)

        assert max_feats.size(-1) == x.size(-1)
        conv_out.append(max_feats)

        return conv_out, trans

fusion_net.py

提出了3种融合模型,返回值为融合后的特征和原始的特征,三种融合模型如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-chm72Mc5-1600938557759)(pics/image-20200924094128790.png)]

  • 融合模型C:加入了“注意力机制”,不同类型的传感器在不同场景下的贡献权重不同,如黑暗时image权重应该降低,因此先计算不同模态特征的attention value,再进行融合

    “sigmoid weight gated point,image fusion”
    mmMOT代码阅读笔记
    mmMOT代码阅读笔记

  • 融合模型B:没有模态C注意力加权那一步,先对每个模态下的特征进行加权,然后直接加和得到最终的特征

    mmMOT代码阅读笔记

  • 融合模型A:先concat不同模态下的特征,这样特征向量会变长,再用W将特征变回原来的长度

    mmMOT代码阅读笔记

# Common fusion module
# 融合模块C
class fusion_module_C(nn.Module):

    def __init__(self, appear_len, point_len, out_channels):
        super(fusion_module_C, self).__init__()
        print(
            "Fusion Module C: split sigmoid weight gated point, image fusion")
        self.appear_len = appear_len
        self.point_len = point_len
        # 注意力计算网络
        self.gate_p = nn.Sequential(
            nn.Conv1d(point_len, point_len, 1, 1),
            nn.Sigmoid(),
        )
        self.gate_i = nn.Sequential(
            nn.Conv1d(appear_len, appear_len, 1, 1),
            nn.Sigmoid(),
        )
        # 特征加权网络
        self.input_p = nn.Sequential(
            nn.Conv1d(point_len, out_channels, 1, 1),
            nn.GroupNorm(out_channels, out_channels),
        )
        self.input_i = nn.Sequential(
            nn.Conv1d(appear_len, out_channels, 1, 1),
            nn.GroupNorm(out_channels, out_channels),
        )

    def forward(self, objs):
        """
            objs : 1xDxN
        """
        feats = objs.view(2, -1, objs.size(-1))  # 1x2DxL -> 2xDxL
        # 计算不同模态的注意力
        gate_p = self.gate_p(feats[:1])  # 2xDxL
        gate_i = self.gate_i(feats[1:])  # 2xDxL
        # 特征融合
        obj_fused = gate_p.mul(self.input_p(feats[:1])) + gate_i.mul(
            self.input_i(feats[1:]))

        obj_feats = torch.cat([feats, obj_fused.div(gate_p + gate_i)], dim=0)
        return obj_feats


# 融合模块B
class fusion_module_B(nn.Module):

    def __init__(self, appear_len, point_len, out_channels):
        super(fusion_module_B, self).__init__()
        print("Fusion Module B: point, weighted image"
              "& linear fusion, with split input w")
        self.appear_len = appear_len
        self.point_len = point_len
        self.input_p = nn.Sequential(
            nn.Conv1d(out_channels, out_channels, 1, 1),
            nn.GroupNorm(out_channels, out_channels),
        )
        self.input_i = nn.Sequential(
            nn.Conv1d(out_channels, out_channels, 1, 1),
            nn.GroupNorm(out_channels, out_channels),
        )

    def forward(self, objs):
        """
            objs : 1xDxN
        """

        feats = objs.view(2, -1, objs.size(-1))  # 1x2DxL -> 2xDxL
        obj_fused = self.input_p(feats[:1]) + self.input_i(feats[1:])
        obj_feats = torch.cat([feats, obj_fused], dim=0)
        return obj_feats

# 融合模块A
class fusion_module_A(nn.Module):

    def __init__(self, appear_len, point_len, out_channels):
        super(fusion_module_A, self).__init__()
        print("Fusion Module A: concatenate point, image & linear fusion")
        self.appear_len = appear_len
        self.point_len = point_len
        self.input_w = nn.Sequential(
            nn.Conv1d(out_channels * 2, out_channels, 1, 1),
            nn.GroupNorm(out_channels, out_channels),
        )

    def forward(self, objs):
        """
            objs : 1xDxN
        """
        feats = objs.view(2, -1, objs.size(-1))  # 1x2DxL -> 2xDxL
        obj_fused = self.input_w(objs)  # 1x2DxL -> 1xDxL
        obj_feats = torch.cat([feats, obj_fused], dim=0)
        return obj_feats

gcn.py

图卷积网络

  1. 根据融合后的特征计算它们的特征关联度,即计算correlation feature,我认为也可以理解为在求边权

mmMOT代码阅读笔记

  • 相乘,1*1的卷积核进行卷积,文中使用的是这个方法
  • 相减
  • 相减求绝对值
  1. 相似度矩阵计算:即所谓的图卷积

    • 上一步得到的"边权":x=self.affinity(objs,dets)
    • 图卷积:out=self.conv(x),可以看作是对二部图进行卷积
# Similarity function
# 卷积方式/相乘方式计算边权
def batch_multiply(objs, dets):
    x = torch.einsum('bci,bcj->bcij', objs, dets)
    return x

# 相减求绝对值方式计算边权
def batch_minus_abs(objs, dets):
    obj_mat = objs.unsqueeze(-1).repeat(1, 1, 1, dets.size(-1))  # BxDxNxM
    det_mat = dets.unsqueeze(-2).repeat(1, 1, objs.size(-1), 1)  # BxDxNxM
    related_pos = (obj_mat - det_mat) / 2  # BxDxNxM
    x = related_pos.abs()  # Bx2DxNxM
    return x

# 相减方式计算边权
def batch_minus(objs, dets):
    obj_mat = objs.unsqueeze(-1).repeat(1, 1, 1, dets.size(-1))  # BxDxNxM
    det_mat = dets.unsqueeze(-2).repeat(1, 1, objs.size(-1), 1)  # BxDxNxM
    related_pos = (obj_mat - det_mat) / 2  # BxDxNxM
    return related_pos


# GCN
class affinity_module(nn.Module):

    def __init__(self, in_channels, new_end, affinity_op='multiply'):
        super(affinity_module, self).__init__()
        print(f"Use {affinity_op} similarity with fusion module")
        self.in_channels = in_channels
        expansion = 1

        if affinity_op in ['multiply', 'minus', 'minus_abs']:
            self.affinity = eval(f"batch_{affinity_op}")
        else:
            print("Not Implement!!")

        # new score和end score的计算,new_end.py也在计算,不知道这两个的区别
        self.w_new_end = new_end(in_channels * expansion)
        # 图卷积网络
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels * expansion, in_channels, 1, 1),
            nn.GroupNorm(in_channels, in_channels), nn.ReLU(inplace=True),
            nn.Conv2d(in_channels, in_channels, 1, 1),
            nn.GroupNorm(in_channels, in_channels), nn.ReLU(inplace=True),
            nn.Conv2d(in_channels, in_channels // 4, 1, 1),
            nn.GroupNorm(in_channels // 4, in_channels // 4),
            nn.ReLU(inplace=True), nn.Conv2d(in_channels // 4, 1, 1, 1))

    def forward(self, objs, dets):
        """
        objs : 1xDxN
        dets : 1xDxM
        obj_feats: 3xDxN
        det_feats: 3xDxN
        """
        # if self.fusion_net is not None:
        #     objs = self.fusion_net(objs)
        #     dets = self.fusion_net(dets)
        # 计算边权
        x = self.affinity(objs, dets)
        # new score和end score的计算
        new_score, end_score = self.w_new_end(x)
        # 图卷积
        out = self.conv1(x)

        return out, new_score, end_score

new_end.py

提出了两种方法求解new score和end score

方法1:首先用平均池化或最大池化得到用于计算score和向量new_vec和end_vec,然后分别使用对应的卷积层计算new score和end score

方法2:首先对完整的特征向量x进行卷积,然后去卷积后的x的平均值或最大值得到用于计算score和向量new_vec和end_vec,最后分别使用对应的卷积层计算new score和end score

两种方法输出向量的维度不同

class NewEndIndicator_v1(nn.Module):

    def __init__(self, in_channels, kernel_size, reduction, mode='avg'):
        super(NewEndIndicator_v1, self).__init__()
        self.mode = mode
        self.w_end_conv = nn.Sequential(
            nn.GroupNorm(1, in_channels),
            nn.Conv2d(in_channels, in_channels // reduction, 1, 1),
            nn.GroupNorm(1, in_channels // reduction),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction, 1, 1, 1),
        )
        self.w_new_conv = nn.Sequential(
            nn.GroupNorm(1, in_channels),
            nn.Conv2d(in_channels, in_channels // reduction, 1, 1),
            nn.GroupNorm(1, in_channels // reduction),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction, 1, 1, 1),
        )

    def forward(self, x):
        """
        x: 1xCxNxM
        w_new: Mx1
        w_end: Nx1
        """
        if self.mode == 'avg':
            new_vec = F.adaptive_avg_pool2d(x, (1, x.size(-1)))
            end_vec = F.adaptive_avg_pool2d(x, (x.size(-2), 1))
        else:
            new_vec = F.adaptive_max_pool2d(x, (1, x.size(-1)))
            end_vec = F.adaptive_max_pool2d(x, (x.size(-2), 1))
        w_new = 1 - self.w_new_conv(new_vec).view((new_vec.size(-1), -1))
        w_end = 1 - self.w_end_conv(end_vec).view((end_vec.size(-2), -1))

        return w_new, w_end


class NewEndIndicator_v2(nn.Module):

    def __init__(self, in_channels, kernel_size, reduction, mode='avg'):
        super(NewEndIndicator_v2, self).__init__()
        self.mode = mode
        self.conv0 = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, 1, 1),
            nn.GroupNorm(1, in_channels),
            nn.ReLU(inplace=True),
        )
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels, min(in_channels, 512), 1, 1),
            nn.GroupNorm(1, min(in_channels, 512)), nn.ReLU(inplace=True),
            nn.Conv1d(min(in_channels, 512), in_channels // reduction, 1, 1),
            nn.GroupNorm(1, in_channels // reduction), nn.ReLU(inplace=True),
            nn.Conv1d(in_channels // reduction, 1, 1, 1), nn.Sigmoid())
        print(f"End version V2 by {mode}")
        print(self)

    def forward(self, x):
        """
        x: BxCxNxM
        w_new: BxM
        w_end: BxN
        """
        x = self.conv0(x)
        if self.mode == 'avg':
            new_vec = x.mean(dim=-2, keepdim=False)  # 1xCxM
            end_vec = x.mean(dim=-1, keepdim=False)  # 1xCxN
        else:
            new_vec = x.max(dim=-2, keepdim=False)[0]  # 1xCxM
            end_vec = x.max(dim=-1, keepdim=False)[0]  # 1xCxN
        w_new = self.conv1(new_vec).squeeze(1)  # BxCxM->Bx1xM->BxM
        w_end = self.conv1(end_vec).squeeze(1)  # BxCxN->Bx1xN->BxN
        
        return w_new, w_end

score_net.py

用于求detection score,不再详细介绍

ghm_loss.py

计算detection score的损失,再cost.py中用到了,不再详细介绍

dropblock.py

用于skipPool操作,不再详细介绍

5.总结:

这个框架整体的架构很清晰,感觉可以尝试自己增添一些模块进去,不同的模块都是在做一些简单的卷积操作,最后的线性规划模块也可以用其它模型替代,可以尝试能不能把fantrack中的预测及关联模型的结构加进来,也可以尝试加入运动特征和bbox特征,并加入注意力机制,聚合其它车辆的信息。

相关标签: 算法 网络