欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SGD平行算法 - Downpour SGD (单机python多线程版)

程序员文章站 2022-04-01 16:52:13
...

SGD 被广泛运用到机器学习(machine learning)中最优化等问题中,学术界一直热衷于提升SGD在优化问题中的收敛速率,并行计算也是热点研究的方向(包括Hogwild! [1], Delay-tolerant Algorithm for SGD [2], Elastic Average SGD [3]),本篇实现了现在比较火的Downpour SGD [4]的样例代码 (选择这个的原因引用量最大)。

理论思想

核心思想,将数据随机划分成数个子数据集sub-data, 将模型变量划分数个机器/进程/线程,基于子集合数据更新各个机器/进程/线程内的变量n次,然后到master 节点更新模型变量,各个机器/进程/线程训练独立互不干扰, 而且各个机器/进程/线程内的模型变量在训练中也互不干扰。引用原文原话 [4]:

We divide the training data into a number of subsets and run a copy of the model on
each of these subsets. The models communicate updates through a centralized parameter server,
which keeps the current state of all parameters for the model, sharded across many machines (e.g.,
if we have 10 parameter server shards, each shard is responsible for storing and applying updates
to 1/10th of the model parameters) (Figure 2). This approach is asynchronous in two distinct aspects:
the model replicas run independently of each other, and the parameter server shards also run
independently of one another

代码部分

# -*- encoding: utf-8 -*-
import re
import sys
import numpy as np
import copy
import time
import threading

def timeConsumption(func):
    def func_wrapper(x):
        start_time = time.time()
        func(x)
        end_time = time.time()
        print "[Function-Time-Consumption] ", end_time - start_time
    return func_wrapper

def initialize(length=100):
    """
    """
    global X
    global Y
    X = []
    Y = []
    mu, sigma = 0, 0.1
    V = 100
    # here we assume x is two-dimesion matrix 
    for i in np.random.random(length):
        a = i * V
        for j in np.random.random(length): 
            b = j * V
            X.append([a**2, b**2, a, b, 1, 1])

    # white noise
    noise = np.random.normal(mu, sigma, size=length * length)

    # a * x**2 + b * x + c
    function = lambda x: np.dot([103.0, -22.5, 35.5, -28, 43, 19.0], x)
    Y = [function(X[i]) + noise[i] for i in range(length * length)] 

    X = np.array(X)
    Y = np.array(Y)
    return X, Y

class HogwildThreading(threading.Thread):
    """
    """

    def __init__(self, threadID, name, dIndex, pIndex, n, param, gama, function, eplison):
        """
        """
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.dIndex = dIndex
        self.pIndex = pIndex
        self.n = n
        self.param = copy.copy(param)
        self.gama = gama
        self.function = function
        self.eplison = eplison

    def run(self):
        """
        In each threading, 
            update relevant parameters with each sub-dataset for each n step
        """
        global X
        global Y
        global derative
        while self.n > 0:
            local_y = [self.function(self.param, x) for x in X[self.dIndex]]
            diff = np.mean(np.subtract(local_y, Y[self.dIndex]))
            if abs(diff) < self.eplison: break
            # print self.name + "-" + self.threadID + " : " + str(diff)
            for i in self.pIndex:
                self.param[i] -= self.gama * derative[i] * diff
            self.n -= 1

class HogwildSGD(object):
    """
    """

    def __init__(self, X, Y, eplison=0.000001, gama=0.01, iter_num=1000, thread_num=10):
        """
        ref: https://arxiv.org/abs/1106.5730
        """
        _d = X.shape[-1]

        # set up number of threads
        if _d < thread_num: self.thread_num = _d
        else: self.thread_num = thread_num

        # parameter initailization
        self.a = np.random.normal(0, 1, size=_d)
        print self.a
        self.eplison = eplison
        self.gama = (1.0 / max(Y)) * 1.0 / len(X)
        self.iter_num = iter_num

    def function(self, a, x):
        """
        Do we have prior knowledge about the function?
            - quadratic ?
            - exponential ?
            - linear ?
        """
        return np.dot(a, x)

    def chunkIt(self, seq, num):
        avg = len(seq) / float(num)
        out = []
        last = 0.0

        while last < len(seq):
            out.append(seq[int(last):int(last + avg)])
            last += avg

        return out

    @timeConsumption
    def run(self):
        """
        """
        global X
        global Y
        global derative

        derative = []
        for i in range(len(self.a)):
            derative.append(np.mean([x[i] for x in X]))

        dIndex = range(len(X))
        np.random.shuffle(dIndex)
        dIndex = self.chunkIt(dIndex, self.thread_num)

        diff = 1
        while(abs(diff) > self.eplison and self.iter_num > 0):

            pIndex = range(len(self.a))
            np.random.shuffle(pIndex)
            pIndex = self.chunkIt(pIndex, self.thread_num)

            threads = []
            for i in range(self.thread_num):
                instance = HogwildThreading(str(i), "HogwildThreading", dIndex[i], pIndex[i], 10, self.a, self.gama, self.function, self.eplison)
                instance.start()
                threads.append(instance)
            for i in range(self.thread_num):
                threads[i].join()
            # update parameter from each data-shards
            for i in range(self.thread_num):
                index = threads[i].pIndex
                for j in index:
                    self.a[j] = threads[i].param[j]

            local_y = [self.function(self.a, x) for x in X]
            diff = np.mean(np.subtract(local_y, Y)) 
            print diff
            self.iter_num -= 1
        print self.a

def main():
    """
    """
    X, Y = initialize()

    instance = HogwildSGD(X, Y)
    instance.run()


if __name__ == "__main__":
    reload(sys)
    sys.setdefaultencoding("utf-8")

    main()