SGD平行算法 - Downpour SGD (单机python多线程版)
SGD 被广泛运用到机器学习(machine learning)中最优化等问题中,学术界一直热衷于提升SGD在优化问题中的收敛速率,并行计算也是热点研究的方向(包括Hogwild! [1], Delay-tolerant Algorithm for SGD [2], Elastic Average SGD [3]),本篇实现了现在比较火的Downpour SGD [4]的样例代码 (选择这个的原因引用量最大)。
理论思想
核心思想,将数据随机划分成数个子数据集sub-data, 将模型变量划分数个机器/进程/线程,基于子集合数据更新各个机器/进程/线程内的变量n次,然后到master 节点更新模型变量,各个机器/进程/线程训练独立互不干扰, 而且各个机器/进程/线程内的模型变量在训练中也互不干扰。引用原文原话 [4]:
We divide the training data into a number of subsets and run a copy of the model on
each of these subsets. The models communicate updates through a centralized parameter server,
which keeps the current state of all parameters for the model, sharded across many machines (e.g.,
if we have 10 parameter server shards, each shard is responsible for storing and applying updates
to 1/10th of the model parameters) (Figure 2). This approach is asynchronous in two distinct aspects:
the model replicas run independently of each other, and the parameter server shards also run
independently of one another
代码部分
# -*- encoding: utf-8 -*-
import re
import sys
import numpy as np
import copy
import time
import threading
def timeConsumption(func):
def func_wrapper(x):
start_time = time.time()
func(x)
end_time = time.time()
print "[Function-Time-Consumption] ", end_time - start_time
return func_wrapper
def initialize(length=100):
"""
"""
global X
global Y
X = []
Y = []
mu, sigma = 0, 0.1
V = 100
# here we assume x is two-dimesion matrix
for i in np.random.random(length):
a = i * V
for j in np.random.random(length):
b = j * V
X.append([a**2, b**2, a, b, 1, 1])
# white noise
noise = np.random.normal(mu, sigma, size=length * length)
# a * x**2 + b * x + c
function = lambda x: np.dot([103.0, -22.5, 35.5, -28, 43, 19.0], x)
Y = [function(X[i]) + noise[i] for i in range(length * length)]
X = np.array(X)
Y = np.array(Y)
return X, Y
class HogwildThreading(threading.Thread):
"""
"""
def __init__(self, threadID, name, dIndex, pIndex, n, param, gama, function, eplison):
"""
"""
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.dIndex = dIndex
self.pIndex = pIndex
self.n = n
self.param = copy.copy(param)
self.gama = gama
self.function = function
self.eplison = eplison
def run(self):
"""
In each threading,
update relevant parameters with each sub-dataset for each n step
"""
global X
global Y
global derative
while self.n > 0:
local_y = [self.function(self.param, x) for x in X[self.dIndex]]
diff = np.mean(np.subtract(local_y, Y[self.dIndex]))
if abs(diff) < self.eplison: break
# print self.name + "-" + self.threadID + " : " + str(diff)
for i in self.pIndex:
self.param[i] -= self.gama * derative[i] * diff
self.n -= 1
class HogwildSGD(object):
"""
"""
def __init__(self, X, Y, eplison=0.000001, gama=0.01, iter_num=1000, thread_num=10):
"""
ref: https://arxiv.org/abs/1106.5730
"""
_d = X.shape[-1]
# set up number of threads
if _d < thread_num: self.thread_num = _d
else: self.thread_num = thread_num
# parameter initailization
self.a = np.random.normal(0, 1, size=_d)
print self.a
self.eplison = eplison
self.gama = (1.0 / max(Y)) * 1.0 / len(X)
self.iter_num = iter_num
def function(self, a, x):
"""
Do we have prior knowledge about the function?
- quadratic ?
- exponential ?
- linear ?
"""
return np.dot(a, x)
def chunkIt(self, seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0
while last < len(seq):
out.append(seq[int(last):int(last + avg)])
last += avg
return out
@timeConsumption
def run(self):
"""
"""
global X
global Y
global derative
derative = []
for i in range(len(self.a)):
derative.append(np.mean([x[i] for x in X]))
dIndex = range(len(X))
np.random.shuffle(dIndex)
dIndex = self.chunkIt(dIndex, self.thread_num)
diff = 1
while(abs(diff) > self.eplison and self.iter_num > 0):
pIndex = range(len(self.a))
np.random.shuffle(pIndex)
pIndex = self.chunkIt(pIndex, self.thread_num)
threads = []
for i in range(self.thread_num):
instance = HogwildThreading(str(i), "HogwildThreading", dIndex[i], pIndex[i], 10, self.a, self.gama, self.function, self.eplison)
instance.start()
threads.append(instance)
for i in range(self.thread_num):
threads[i].join()
# update parameter from each data-shards
for i in range(self.thread_num):
index = threads[i].pIndex
for j in index:
self.a[j] = threads[i].param[j]
local_y = [self.function(self.a, x) for x in X]
diff = np.mean(np.subtract(local_y, Y))
print diff
self.iter_num -= 1
print self.a
def main():
"""
"""
X, Y = initialize()
instance = HogwildSGD(X, Y)
instance.run()
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding("utf-8")
main()