欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

分布式计算--(分布式+多进程+多线程+多协程)

程序员文章站 2022-04-04 08:25:59
先来个最简单的例子: 把1-10000每个数求平方 服务器server: 用两个队列存储任务、结果 定义两个函数 要实现分布式得继承multiprocessing.managers.BaseManager 在主函数里multiprocessing.freeze_support()开启分布式支持 注册 ......

先来个最简单的例子:

把1-10000每个数求平方

服务器server:

用两个队列存储任务、结果

定义两个函数

要实现分布式得继承multiprocessing.managers.BaseManager

在主函数里multiprocessing.freeze_support()开启分布式支持

注册两个函数给客户端调用

创建管理器,设置ip地址和开启端口、链接密码。

用两个队列加任务、收结果。用刚刚注册的函数

把1-10000压入队列,

把结果压入队列

最后完成关闭服务器

客户端client:

也需要继承multiprocessing.managers.BaseManager

定义一个协程处理一个数据,同时把结果压入结果队列

定义一个线程处理10个数据,开启10个协程

定义一个进程,进程驱动10个线程

主函数:同客户端注册两个函数

同客户端创建管理器,设置ip地址和开启端口、链接密码。

链接服务器

同客户端调用注册的函数,两个队列

 

套四层循环:10个进程、100个线程、1000个协程

循环进程函数

 

上代码:

服务器server:

#coding:utf-8
import multiprocessing  #分布式进程
import multiprocessing.managers #分布式进程管理器
import random,time  #随机数,时间
import Queue #队列

task_queue=Queue.Queue() #任务
result_queue=Queue.Queue() #结果

def  return_task(): #返回任务队列
    return task_queue
def return_result(): #返回结果队列
    return   result_queue

class  QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
    pass

if __name__=="__main__":
    multiprocessing.freeze_support()#开启分布式支持
    QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用
    QueueManger.register("get_result", callable=return_result)
    manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码
    manger.start() #开启
    task,result=manger.get_task(),manger.get_result() #任务,结果
    for  i  in range(10000):
        print "task add data",i
        task.put(i)
    print "waitting for------"
    for  i  in range(10000):
        res=result.get(timeout=100)
        print "get data",res

    manger.shutdown()#关闭服务器

客户端client:

#coding:utf-8
import multiprocessing  #分布式进程
import multiprocessing.managers  # 分布式进程管理器
import random,time  #随机数,时间
import Queue #队列
import threading
import gevent
import gevent.monkey


class  QueueManger(multiprocessing.managers.BaseManager):# 继承,进程管理共享数据
    pass
def  gevetygo(num ,result): #协程处理一个数据
    print num*num
    result.put(num*num)

def  threadgo(datalist,result): # 线程处理10个数据,开启10个协程
    tasklist=[]
    for  data  in datalist:
        tasklist.append(gevent.spawn(gevetygo, data,result))
    gevent.joinall(tasklist)

def  processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 进程驱动了10个线程
    threadlist=[]
    for  datalist in ddatalist:
        mythread=threading.Thread(target=threadgo,args=(datalist,result))
        mythread.start()
        threadlist.append(mythread)
    for mythread in threadlist:
        mythread.join()

if __name__=="__main__":
    QueueManger.register("get_task")  # 注册函数调用服务器
    QueueManger.register("get_result")
    manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
    manger.connect()  # 链接服务器
    task= manger.get_task()
    result =manger.get_result()  # 任务,结果

    # 1000
    # 10个进程
    # 100个线程
    # 1000个协程

    for  i  in range(10):
        cubelist = []  # [[[1],[2]]]
        for j in range(10):
            arealist = []
            for k in range(10):
                linelist = []
                for l in range(10):
                    data = task.get()
                    linelist.append(data)
                arealist.append(linelist)
            cubelist.append(arealist)

        processlist = []
        for myarealist in cubelist:
            process = multiprocessing.Process(target=processgo, args=(myarealist, result))
            process.start()
            processlist.append(process)
        for process in processlist:
            process.join()

 

遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。

如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况