欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

程序如何在两个gpu卡上并行运行_单机多卡并行训练的程序编写

程序员文章站 2022-06-12 17:12:57
...

程序如何在两个gpu卡上并行运行_单机多卡并行训练的程序编写

本文采用的方法为作者独自设计,未参考其他框架的思路。如有雷同纯属巧合。

注意事项

神经网络的多卡并行训练通常有两个思路,一是数据并行,二是网络并行。数据并行即多张显卡上使用的是同一个网络和权重,将训练集拆分为两个部分,各自计算之后,将权重变化取平均值并同时更新。网络并行即考虑网络本身规模很大,因此需要将网络分成多个部分存在多张显卡上。因为这个并行设计与网络结构相关,需要针对具体问题,通用性不高,本文只分析数据并行的情况。

本文也不讨论多机并行,因为多机并行通常延迟较高,提速效果有限。

线程的分配来说,按Nvidia的推荐,一个线程控制一个显卡比较适合。因此线程设计采取1+N的模式,即主线程+N个副线程,其中N为显卡的个数。主线程负责调度和处理数据集,每个副线程对应一个显卡,控制GPU的运行。

多显卡的程序在调试过程中相对比较困难,因为一旦跨显卡调用错误,可能导致整个系统卡死,后续的调试异常困难。因此,有几个cuda设备相关的函数在这里非常关键,例如:cudaSetDevice,cudaMemcpyPeer等。它们用于设置本线程使用的显卡和跨显卡复制数据,使用时必须非常谨慎!

Nvidia旧版的驱动还存在一个诡异的问题。驱动附带的nvml可以得到显卡的更多信息,但是nvml的编号与cuda的编号竟然是不同的。新版驱动已经改为相同了。

深度学习中,cublas和cudnn库的使用会相当频繁,这两个库的函数调用时都会首先传入一个预先初始化的handler。此处需注意,如有多个显卡,必须在每张卡上都初始化。如果传入计算函数的handler与显存不在同一显卡,会导致系统立刻崩溃。

因此每一块在显卡上分配的显存,都应该同时记录下所使用的设备编号,以及cublas和cudnn的handler的位置(直接或间接记录),应使用类或者结构体来包装。此处不具体讨论类的设计。

流程和代码

在计算过程中,为了提高速度,所有降低速度的因素应尽量避免。在计算开始后,除了将处理过的数据从内存复制到显存,所有操作应在显存中进行。在数据并行中,每个显卡中保存的网络是相同的,而一个网络中的权重可能包含多个矩阵和偏置值,如将这些值合成到同一块内存中,则可以降低跨显卡复制的次数。

训练过程中,在每次分别训练之后,应收集所有显卡上的权重,取平均值并再次分发到各个显卡,因此线程的同步至关重要。此处使用原子变量来进行同步,而不是通常使用锁的方式。

副线程的部分c++代码如下。此处nets是一个网络的vector,0号为主网络,负责收集权重、计算平均值和分发。train_info结构体中的变量均为原子变量。需注意几个计数相关的部分。把这些代码进行必要的处理之后,使用std::thread运行就可以了。

#define WAIT_UNTIL(condition) { while(!(condition)) { std::this_thread::sleep_for(std::chrono::nanoseconds(100)); } }
    auto net = nets_[net_id];
    while (epoch_count < epoch0 + epoches)
    {
        epoch_count++;
        //等待数据准备完成
        WAIT_UNTIL(train_info->data_prepared == 1 || train_info->stop == 1);
        train_data_gpu.copyPartFrom(train_data_cpu_, net_id * max_batch_ / MP_count_, 0, max_batch_ / MP_count_);
        //发出拷贝数据结束信号
        train_info->data_distributed++;    //该变量表示已复制完的设备的个数
        for (int iter = 0; iter < train_data_gpu.getNumber() / train_data_sub.getNumber(); iter++)
        {
            iter_count++;
            train_data_sub.shareData(train_data_gpu, iter * train_data_sub.getNumber());
            //同步未完成
            WAIT_UNTIL(train_info->parameters_collected == 0 || train_info->stop == 1);
            //训练各自网络
            net->train(train_data_sub.X(), train_data_sub.Y());
            //发出网络训练结束信号
            train_info->trained++;    //该变量表示已训练完的设备的个数

            if (net_id == 0)
            {
                //主网络等待所有网络训练完成
                WAIT_UNTIL(train_info->trained == MP_count_ || train_info->stop == 1);
                train_info->trained = 0;
                //同步
                if (MP_count_ > 1)
                {
                    for (int i = 1; i < nets.size(); i++)
                    {
                        Matrix::copyDataAcrossDevice(nets[i]->getParameters(), net->getWorkspace());
                        Matrix::add(net->getParameters(), net->getWorkspace(), net->getParameters());
                    }
                    net->getParameters()->scale(1.0 / MP_count_);
                }
                //发布同步完成信号
                train_info->parameters_collected = MP_count_ - 1;    //主网络权值已更新,因此此处减一
            }
            else
            {
                //非主网络等待同步结束
                WAIT_UNTIL(train_info->parameters_collected > 0 || train_info->stop == 1);
                train_info->parameters_collected--;    //该变量表示未收集完权重的设备的个数
                //分发到各个网络
                Matrix::copyDataAcrossDevice(nets[0]->getParameters(), net->getParameters());
            }
        }
        if (train_info->stop == 1)
        {
            break;
        }
    }

主线程的部分代码如下:

    //创建训练进程
    std::vector<std::thread*> net_threads(nets.size());
    for (int i = 0; i < net_threads.size(); i++)
    {
        net_threads[i] = new std::thread{ &Brain::trainOneNet, this, nets, i, &train_info, epoch_count_, epoches };
    }

    train_info.stop = 0;
    int epoch0 = epoch_count_;
    for (int epoch_count = epoch0; epoch_count < epoch0 + epoches; epoch_count++)
    {
        data_preparer->prepareData(epoch_count, train_data_origin_, train_data_cpu_);    //处理数据
        train_info.data_prepared = 1;
        WAIT_UNTIL(train_info.data_distributed == MP_count_ || train_info.stop == 1);
        train_info.data_prepared = 0;
        train_info.data_distributed = 0;
        iter_count_ += iter_per_epoch;
        epoch_count_++;
        if (train_info.stop == 1) { break; }
    }
    train_info.stop = 1;

    for (int i = 0; i < net_threads.size(); i++)
    {
        net_threads[i]->join();
    }
    safe_delete(net_threads);

代码的解说待更新。