TFSEQ Part I: 分布式训练的方案和效率对比
TFSEQ Part I: 分布式训练的方案和效率对比
本文作者:追一科技算法工程师 Tony
1. 前言
TFSEQ 这个系列总结了笔者在使用 tensorflow 进行自然语言处理的一些经验和思考。计划写三篇文章:
- 分布式训练的方案和效率对比
- 序列模型的实现细节
- Batch size大小,优化和泛化
此为第一篇。
在增大数据集的同时增大模型参数量(Scaling)是提高准确率的一个有效方案,见百度这篇文章。但这也意味着计算量和训练时间的快速上升。为了缩减训练时间,我们可以使用分布式/并行训练。
搭建一个大型的分布式系统是一个耗时耗力的大工程。在量级不那么大的训练场景下,通常多卡并行是一个简单、经济且高效的方案。单机多卡系统可以认为是分布式系统的一种简单特例:卡间通信走 PCIe (或者更加土豪的 Nvlink),要比走以太网(Ethernet)快很多。分布式系统通过 Infinteband 连接方案和 Nvidia 的 GPUDirect RDMA 技术,可以实现不同 host 上的卡间直连,也可以达到甚至超过单机 PCIe 的通信速度。
本文假设读者已经有深度学习在自然语言处理应用上的基本知识,并用 Tensorflow 实现过一些序列模型。为了避免翻译带来的歧义,部分术语会直接使用英文表述(使用中文的话会在括号里加上英文术语),所以中英混杂的文风难以避免。为了讨论方便,以下先做一些术语的规定。
min-batch SGD 是一种迭代式优化(iterative optimization)的算法,每一次迭代都包括以下三个步骤:
- 读取 mini-batch,使用模型进行前馈计算(feedforward or forward)
- 计算 loss,并利用 loss 的值进行反向传播(backpropagation or backprop),得到各个参数的梯度(gradient)
- 根据算出的梯度,利用选定的优化算法(tensorflow 中称为 Optimizer),如标准 SGD 或者更加流行的 Adam 对参数进行更新。
每个主机(host)上都有多个设备(device,可以是 GPU 或 CPU)。每个 device 都有对应的内存(memory)。host 和 device 通常抽象成计算节点(node),可以进行运算以及和其他节点通信。本文用 node 做一般性的讨论,但在实现上还是以单机多卡的方案为主,此时 node 即为 device。
在计算时 node 间通常需要相互通信(communication)。Message Passing Model 是常用并行计算的通信模型,其通信操作在 MPI (message passing interface) 中定义。MPI 中的通信方式分两种:点对点通信(Point-to-point communication)和集合通信(collective communication)。点对点通信中只有一个发送者(sender) 和接受者(receiver)。而集合通信中通常有多个发送者和接受者。在分布式训练中比较常用的是集合通信。以下介绍中会用到以下三种操作:
- broadcast,将参数从一个 node 发到多个 node 上
- reduce,将参数从多个 node 收集到一个 node 上,同时对收集到的参数进行归并(求和,求积)。
- allreduce,每个 node 都从其他 node 上面收集参数,同时对收集到的参数进行归并。
这里有集合通信操作的详细图解。
分布式/并行训练的优化目标之一是减少通信对计算的阻塞。
2. Tensorflow 的简单 profiling
为了了解训练过程中 GPU/CPU 的使用情况,我们可以使用 Cuda 提供的 cupti 来对 GPU 运行情况进行 profiling。我们可以从 tensorflow 中的函数调用这个库并打印出 profile 的 log。以下代码可以用来记录一个迭代中 GPU 的占用情况,文档主要来源于这个issue。由于文档的稀缺使得部分内容只能靠猜。google 同时也在开发新的 profiler。
import tensorflow as tf
from tensorflow.python.client import timeline
def profile(fetch_keys, sess, step, output_dir):
run_metadata = tf.RunMetadata()
options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
sess.run(fetch_keys,
options=options,
run_metadata=run_metadata)
trace = timeline.Timeline(step_stats=run_metadata.step_stats)
trace_file = open(os.path.join(output_dir, 'timeline_%d.json' % step), 'w')
trace_file.write(trace.generate_chrome_trace_format())
拿到保存的 timeline.json
后,在 Chrome 的地址栏中输入 chrome://tracing/
,点击 load
。便可以得到这个图:
用 chrome 的 tracing 功能读取单个 step 的计算情况。
其中最底下的 CPU:0, GPU:0,1,2,… 即为对应的 tensorflow 对 CPU/GPU 的占用情况,上面的是 CUDA 内部线程的运行记录。点击图中每个 block 可以看到详细的情况,包括 block 对应的 op(operation) 名字,block 的前后依赖,运行时间等。
每个 block 都是根据一个 tensorflow 后端 kernel(GPU/CPU实现)划分的。点击右上方的 view option
可以看到不同 block 的依赖线图,我们可以根据这些信息找到计算阻塞的原因,并尝试优化它们。常见的计算阻塞原因有:
- 对 input pipleline 的等待,这说明 input pipeline 参数没有调好
- 不必要的变量传输。这可以通过用
tf.device
调整变量放置来优化。 - 对 cpu 计算的依赖。常见的需要 cpu 计算的 kernal 是各种 sampler。我们可以尝试把这一部分计算放到 input pipeline 里。
在图中可以看到在使用 tensorflow 提供的 op 去实现计算图的时候,有大量的时间花在 op 的发起和切换上了。这会极大地降低 GPU 的使用率。所以进一步优化的方案便是自己用 cuda 和 C++ 写 tensorflow kernel,或者使用现成的 kernel(如 tf.contrib.cudnn_rnn
)
3. 数据并行和模型并行
深度学习模型的并行有两种方案:模型并行(model parallel)和数据并行(data parallel)。
假设我们有多个 node:
- 模型并行:不同 node 输入相同数据,运行模型的不同部分
- 数据并行:不同 node 输入不同数据,运行相同的完整的模型。
为了完成一次更新,不同 node 间需要交换 forward 和 backprop 的信息,所以通信数据量是选择这两种并行方案的一个考量因素。另一个考量因素是由数据依赖(data dependency)带来的计算的阻塞(blocking)。最后一个考量因素是内存限制。当模型参数以及计算产生的中间变量无法放入一个 node 的内存时,我们只能使用模型并行。
当参数量巨大的时候,数据通信量会成为模型运行的瓶颈。模型并行的数据通信量可以比数据并行更少。可以看从这里化用的例子。
假设有一个 的数据矩阵 ,一个 的参数矩阵 ,一个 的参数矩阵 。loss 假设为 。另假设我们有两个 node,将数据传输到 node 上的耗时忽略不计(使用第一篇文章描述的input pipeline)。我们需要计算 forward 和 backprop 并更新 和 。
使用模型并行:我们可以把 A 拆成两个 的矩阵 。B 拆成两个 的矩阵 ,则 。forward 时每个 node 需要传输/接收 个浮点数(将 传给其他 node),backprop 不需要传输数据。
使用数据并行:我们可以在两个 node 上放置初始值完全相同的两份 , ,并将 拆成两个 的矩阵 分别在两个 node 上foward 和 backprop。理想上 forward 不需要传输数据,backprop 时每个 node 需要传输/接收 个浮点数(将由 计算出来的 和 的梯度传给其他 node)。
实际的模型比上述的矩阵乘法要复杂得多,为了减少由数据等待引起的阻塞,使用模型并行时需要仔细地将模型的计算进行分解,这无疑增大了工作量,不便于初期的快速迭代。所以当模型能够放入一个 node 的内存时,通常会优先使用数据并行。
4. 数据并行的方法
4.1 概述
在模型的更新中,我们关心的全局状态是模型的参数,假设为一个向量 。我们将执行计算的 node 称作 worker。状态的更新来自于各个 worker 利用各自的数据算出的梯度对 的更新,而各个 worker 又需要拿到最新的模型参数 才能算出梯度。可以看到各个 node 对全局状态十分依赖,需要进行频繁的同步。
由于分布式系统的搭建耗资较大,普通人也没有对应的需求,所以以下的实现都以单机多卡为例。每种方案都提供了多进程(稍作修改就可以多机训练)和单进程(仅限单机训练)的实现。
4.2 Parameter server 模式
以参数 为同步基础,我们可以采用 master-slave 的同步模式:将 node 分成两种角色:parameter server(ps) 负责维护一份最新的参数 ,worker 负责利用从 ps 读到的最新参数计算出梯度(forward 和 backprop),并对 ps 发送梯度和参数更新请求。这被称为 parameter server 的模式,tensorflow 就是围绕这个思路设计的。
Deep learning 之前的大规模模型通常体现在特征量上,模型仍旧是浅层的线性模型。特征量增大(百万到千万级别的特征量)带来的是特征的稀疏,所以每个 worker 平均只需要很少一部分变量就可以完成计算,和 ps 间的数据传输量不会很大。早期每个 host 的算力和资源量不大,所以增大计算规模往往意味着增大主机的数量,这带来的是 host 故障率的提高。 master-slave 的模式可以让系统拥有较好的容错能力。
但是现在随着模型的加深,参数之间的相互依赖增大,需要传输的参数量增大,使得 ps 的传输带宽逐渐成为瓶颈。而随着 host 计算能力的增强,我们需要的 host 数越来越少,故障率变低,容错的需求也越来越少。现在主流的做法更是变成了单机多卡。这使得 parameter server 的设计变得不适用于深度学习的计算。
ps 拿到参数后,视更新的方案不同可以分为同步更新(synchronous update)和异步更新(asynchronous update)。
4.2.1 同步更新
这是最低效的方案,这也是 tensorflow 多卡官方示例介绍的方案。ps 会同时充当 reducer 的角色,等待所有 worker 都发来梯度和参数更新请求后,ps 会对梯度取平均(reduce mean),并用平均过后的梯度更新一次参数。各个 worker 在从 ps 读取最新参数的过程中,以及等待 ps 更新参数的过程中,都是处于空闲状态。
单训练进程的代码实现可以看这里,其中 ps(reducer) 即为变量初次创建对应的 node(device)。多训练进程(分布式)的代码实现可以看这里。
由于所有 worker 都必须和 ps 通信,迭代过程也会被 ps 的通信带宽(bandwidth) 限制住。当模型参数量很大时,所有 node 的计算时间占比会很低。
下面化用百度slides里面的例子对 GPU 计算和通信的耗时做一个粗略的估算,不感兴趣的读者可以直接跳到结论。
假设我们有一个4层隐层为4000的双向LSTM(state of the art on all tasks),假设embedding的维度也是4000。按照tensorflow.nn.rnn_cell.LSTMCell
的实现,将几个 gate 和 memory 对应的参数矩阵拼接后形成的总的参数矩阵为 。
假设我们有 张卡。我们先计算模型参数需要的传输时间。忽略掉 bias ,这个模型的参数大小为
由于 ps 需要 broadcast 参数以及对参数的梯度做 reduce mean,所以每张卡需从 ps 读取 4 GB 的参数数据,往 ps 发送 4GB 的梯度数据。PCIe 3.0 x16 可用的带宽峰值约为 16GB/s,所以由 ps 的数据传输引起的阻塞一共为:
$8 \times N\div 16\textrm{GB/s} = \frac{N}{2}\textrm{s} $
在这段时间里各个 worker 都处于空闲状态。
假设每个 device 拿到数据的 batch size 为 16,每个 batch 的序列最大步长为 100 步,则每步 LSTM cell 的输入为输入 embedding 和隐层拼接起来的数据矩阵 ,
由这篇文章可以知道,矩阵乘法 的 为 。假设 LSTM 的计算只有矩阵乘法,每一步都需要计算 4 个。又因为这是双向LSTM,所以对于整个序列的 forward 计算,我们一共需要
backprop 也需要同样的计算量。常用的 GTX 1080 Ti 的计算吞吐量的峰值为 11.3 TFLOPS,考虑实现上切换的开销,假设在计算的情况下只用了50%左右的性能,即认为其吞吐量约 6TFLOPS,所以耗费在计算的时间约为
所以
对于八张卡的场景大概只有 20% 的时间用在 GPU 计算上。
为了减缓因单个 ps 的有限带宽带来的阻塞,通常会设置多个 ps 对通信进行分流。这又增加了系统的复杂程度。受通信延时以及 worker 异构(hetergeneous,计算/通信时间不同)的影响,迭代过程也会被通信和计算耗时最长的 worker 阻塞住。
node 异构带来的阻塞也是同步更新的通病。
4.2.2 异步更新
谷歌是最早使用 parameter server + 异步更新方案进行深度网络训练的。异步更新是 parameter server 模式的标配,其收敛性已经有证明。与同步更新不同,异步更新中 ps 在收到 worker 的梯度以及更新请求的时候,会立即对参数发起更新,而不等待其他 worker。在完成梯度的计算后,worker 会立刻从 ps 上读取参数,进行下一步的迭代。
多训练进程(分布式)的代码实现可以看这里。
异步更新将各个 work 和 ps 的通信在时间上分散开,使得数据传输的等待时间减少。同时各个 worker 也不需要和其他 worker 同步,减少了阻塞的时间,特别是异构的 worker 产生的阻塞。这增加了模型训练的吞吐量(throughput)。
接着上面 GPU 计算的例子,假设因为其他 worker 的通信,使得 ps-worker 间平均可以使用的 PCIe 带宽只有 80%,这种情况下每个卡因传输而阻塞的时间可以降到
KaTeX parse error: Expected '}', got 'EOF' at end of input: … 0.65\textrm{s}
所以八卡情景下计算时间占比可达到 60% 左右。
但是异步更新的方案会引入两个不稳定性来源:
- 参数和更新用的梯度并不来自同一个迭代。用来更新的梯度可能是几步更新前的参数算出来的。
- 参数的读取并没有加锁。这导致 worker 可能会读到更新一半的参数。
对于落后于当前迭代的梯度(staled gradients),上述实现采取的做法是直接丢掉。这造成了不同 worker racing 的情况,对计算资源和数据的利用效率不高。
上述两个不稳定性来源要求模型采用更小的学习率(learning rate)。而小学习率加上上述的不稳定性会带来收敛速度的显著降低,同时训练发散(divergence)的风险也增大了,这两者抵消了异步训练带来的吞吐量的提高。实际使用中经常会看到 loss 有时候会突然变得很高(overshoot)。所以异步更新目前已经不是主流优化方向了。
4.3 Allreduce 模式
参数 $\theta $ 的变化来自于使用各个 node 算出的梯度对其进行的更新。如果在初始化的时候便同步了所有 node 拿到的参数拷贝 ,并在参数更新之前对梯度进行同步,这样在任何时刻各个 node 的参数拷贝 都是一致的。所以我们可以以梯度为同步基础,间接实现参数 的同步。而梯度的同步则依赖于 allreduce 操作(peer-to-peer 的通信)的高效实现。在 Allreduce 模式中,所有 node 同时充当 ps 和 worker 的角色。
每个 node 可以直接利用本地的参数拷贝 进行 forward 和 backprop 的计算,免除了 parameter server 模式中从 ps 读取参数的阻塞。所有 node 都得到算出梯度后,执行 allreduce 操作,各个 node 都会得到全部 node 的梯度平均。最后各个 node 利用拿到的梯度平均对本地的 进行一次更新。在目前简单的 ring-allreduce 的实现下,各个 node 需要发送和接受一份梯度。
单训练进程的代码实现可以看这里。多训练进程的实现可以使用 Horovod。
Nvidia 开发了高效的 GPU 集合通信库 NCCL (Nvidia collective communication library),tensorflow 从 1.0 开始可以使用 tf.contrib.nccl
进行调用。同时 tensorflow 从 1.4 版本开始也实现了自己的集合通信库 tf.contrib.all_reduce
。只不过正如 tensorflow 一贯的作风,最好的文档就是他们的源码和自己找的其他来源的教程。在分布式/多卡训练这一块,tensorflow 目前还主要专注于 tf.contrib.distribute
的开发,目的是和他们的 tf.estimator
和 tf.keras
相兼容,这意味着如果想在自己改的模型上用上他们的分布式/多卡训练方案,你还得学习tf.estimator
的一整套接口:)
继续上面 GPU 计算的例子。假设各个 node 都是同构的(计算/通信耗时一致),且的传输带宽都为 ,需要传输的数据量为 ,使用下面介绍的 ring-allreduce 操作理论上可以把梯度平均的耗时控制在 ,其中 为 的数据量。对于上述例子而言就是 。GPU 计算占用比率为 80% 左右。
parameter server + 异步更新会因为 node 数目的增加而使得 worker 平均可以使用的 PCIe 带宽减少。Allreduce 模式则没有这个问题:由于使用了 ring-allreduce,传输耗时在一定规模内基本不随 node 数目的增加而变化,计算提速和 node 数目可以大致做到线性关系(linear scaling)。但集群规模变大的时候,由于卡间直连的成本变高,通常会设置多个中间通信节点(switch),从而产生网络阻塞,此时线性关系也不再成立。
同步更新也让我们可以使用比异步更新更大的学习率(learning rate),且训练发散(divergence)的风险和单卡训练一致。但当系统中存在异构的 worker 的时候,更新仍然会被最慢的 worker 阻塞住。所幸随着 host 的计算能力增强,计算需要的 host 数量减少,worker 同构的要求比较容易实现。
Allreduce 模式由于可扩展性强,现在已经渐渐成为主流的多卡/分布式训练方案。
4.3.1 Horovod or Tensorflow native?
Uber 在去年开源了 Horovod 工具,可以很方便地在 Tensorflow/Pytorch 等流行框架上进行基于 Allreduce 模式的分布式训练。在实现上它为每个 node 都创建了一个新进程,每个进程都有自己独立的 input pipeline。各个 node 算出梯度后,通过 Horovod 的 horovod.tensorflow.DistributedOptimizer
和 horovod.tensorflow.BroadcastGlobalVariablesHook
进行进程间的参数/梯度同步。Horovod 还支持配置集合通信(Collective Communication)的不同实现。
但是各个 node 相互独立的 input pipeline 使得 bucketing 带来的速度提升效果大打折扣。由于 allreduce 的操作需要等待所有 node 都算完梯度,所以拿到序列长度最长的 mini-batch 的 node 会阻塞所有 node,这带来 worker 异构的问题。在单机多卡的场景下,我们可以构建一个全局的 input pipeline 进行 bucketing,再将得到的 batch 切分给各个 node。这在单机单训练进程中比较容易实现。
4.3.2 ring-allreduce
ring-allreduce 在并行计算领域提出很久了,但是直到2017年,百度才高效地在 tensorflow 上实现了基于 ring-allreduce 的深度学习分布式训练。这是他们介绍时用的 slides。Nvidia在介绍 NCCL 的 slides 里面也有一个很形象的算法图示。
ring-allreduce 假设各个 node 以一个环排列,这种假设可以适用于很多种拓扑结构,有其工程实现上的方便性。
我们也可以根据拓扑结构去进一步优化 allreduce 算法。索法最近的炫技(财力)的文章便展示了在大规模集群中对 allreduce 算法的优化。
下面简单介绍一下ring-allreduce的算法。
对于 个 node,我们把数据切成 个chunk。 如上图所示,ring-allreduce 分成 scatter reduce 和 all gather 两步。
在 scatter reduce 这一步共需要跑 个 tick。在每个 tick 里面,每个 node 都会向下一个 node 发送一个chunk,并从上一个 node 拿到一个 chunk 和本地的 chunk 做一个 in-place 运算(求和,求积等)。scatter reduce 做完之后,每个 node 都有一个做完 reduce 的chunk。
在 all gather 这一步也需要跑 个 tick。每个 tick 里每个 node 都会把一个做完 reduce 的 chunk 传给下一个 node。
这种将数据分片传输做法可以更有效地利用各个 node 的带宽,防止 node 的相互等待。同时在scatter reduce 的操作里,in-place 运算和通信可以同时进行,掩盖一部分通信时间。在 ring-allreduce 中,每个 node 共需要发送和接受 的数据量。
在单机多卡的场景中,由于 PCIe 是全双工的(可以同时收发),所以数据传输的耗时只有 , 其中 是 PCIe 设备传输的带宽。Nvidia 的 NCCL 对卡间集合通信做了相当的优化,在实现中考虑了 GPU 间的拓扑结构。见这个 slides。
即使 Tensorflow 文档假装很专业地提到
In our experiment, we demonstrate that although NCCL often leads to much faster data aggregation by itself, it doesn’t necessarily lead to faster training. Our hypothesis is that the implicit copies are essentially free since they go to the copy engine on GPU, as long as its latency can be hidden by the main computation itself. Although NCCL can transfer data faster, it takes one SM away, and adds more pressure to the underlying L2 cache. Our results show that for 8-GPUs, NCCL often leads to better performance. However, for fewer GPUs, the implicit copies often perform better.
但为了方(懒)便(癌)省(发)事(作)(不想去拆这个*),直接调用 tf.contrib.nccl
即可。
5. 梯度平均的实现细节
在多卡(分布式)训练中通常采用对梯度进行多级平均的方法:先对每个 device 的 mini-batch 产生的梯度算一个平均,再对 host 的各个 device 算一次平均,最后再对多个 host 算一个平均。下面针对多卡训练的情景进行分析。
假设第 个 device 的 mini-batch 的样本个数为 ,每个样本产生的梯度为 ,device 个数为 ,则多卡算得的平均梯度为
假如各个 device 的样本个数都相等 ,则
此时多卡/单卡对各个样本的梯度的权重是一致的,其中 为一次更新中用到的总样本。而当各个 device 的样本个数不等时, 较大的 device 中,每个样本的权重会低于 ,其中 为所有 的平均。
在训练过程中这种影响通常忽略不计,但是在报告测试结果时,我们需要小心这种差别。
6. FP16 的提速
FP16 也是一个比较流行的提速方案。这会带来两个方面的提速:GPU计算速度(P100, V100, Titan V 中有大量 FP16 计算单元)的提速以及变量传输的提速。但实现上会稍微有些复杂,同时数值精度的降低会带来训练的不稳定。对于 1080Ti 这些游戏卡,受 nvidia 限制,FP16 计算单元很少,总体性能不如 FP32 计算。默认的计算是会将 FP16 转成 FP32 进行计算的,这又带来了数值转换的耗时。
在带宽受限的场景下(通常是大规模集群),利用 FP32 计算完梯度后,可以将梯度转化成 FP16(先将数值转换到 FP16 的有效范围) 再进行卡间同步。即使没有支持 FP16 计算单元的 GPU,这种提速也是很可观的。
但在带宽较大且 GPU 硬件受限的时候(单机多卡 1080ti), FP16 带来的提速仅限一小部分通信的提速,不值得浪费精力。
7. 杂项
- 尽量减少 CPU-GPU 之间的数据依赖。即使 tensorflow 内部的计算图优化会提前发起数据读取,但这种作用仍然是有限的。如果使用了
tf.device
手动放置变量,优先将参数和中间变量放在同一个 device 上。 - 简化 python 中的 training loop。如果你使用了 tensorboard,且在 summary 中定义了一些计算量较大的统计量(如 gradient 的 histogram),可以每隔 n 步写一次 summary,这样可以既监控训练进程又不会拖慢训练速度。
- 如果不需要做 state dropout的话,优先使用
tf.contrib.cudnn_rnn
模块。但需要注意 cudnn 的 cell 实现和 tensorflow 有所不同。如果使用了多层双向 rnn,cudnn 的实现对应的是tf.contrib.rnn.stack_bidirectional_rnn
- 每个卡的 mini-batch 的 batch size 也是限制多卡速度提升的主要原因。如果模型太小,batch size 也很小的话,GPU 的核便没有被充分利用,再考虑通信上的损耗,速度其实远不如在单卡上训练。但是大 batch size 会对模型的泛化性能产生影响,这是第三篇文章要讨论的。