欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

python_分布式进程中遇到的问题

程序员文章站 2023-02-24 14:54:36
看文档学习分布式进程中遇到了一下问题,文档里面例题是python2.X,我用的python3.x,就出现了一下莫名奇妙的问题,最终版代码先呈上: taskManager.py taskWorker.py 先运行 taskManager.py 服务器端代码,再快速运行 taskWorker.py 客户 ......

看文档学习分布式进程中遇到了一下问题,文档里面例题是python2.x,我用的python3.x,就出现了一下莫名奇妙的问题,最终版代码先呈上:

taskmanager.py

 1 # coding:utf-8
 2 # taskmanager.py  for  windows  服务器端
 3 
 4 import queue
 5 from multiprocessing.managers import basemanager
 6 from multiprocessing import freeze_support
 7 # 任务个数
 8 task_number = 10
 9 # 定义收发队列
10 task_queue = queue.queue(task_number);
11 result_queue = queue.queue(task_number);
12 def get_task():
13     return task_queue
14 def get_result():
15     return result_queue
16 #创建类似的queuemanager:
17 class queuemanager(basemanager):
18     pass
19 def win_run():
20     # windows 下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
21     queuemanager.register('get_task_queue',callable = get_task)
22     queuemanager.register('get_result_queue',callable=get_result)
23     #绑定端口并设置验证口令,windows下需要填写ip地址,linux下不填默认为本地 ip地址为本地ip地址
24     manager = queuemanager(address = ('192.xxx.xx.xxx',8001),authkey =b'qiye')
25     # 启动
26     manager.start()
27     try:
28         # 通过网络获取任务队列和结果队列
29         task = manager.get_task_queue()
30         result = manager.get_result_queue()
31         #添加任务
32         for url in ["imageurl_"+str(i) for i in range(10)]:
33             print('put task %s ...'%url)
34             task.put(url)
35         print('try get result...')
36         for i in range(10):
37             print('result is %s' %result.get(timeout=10))
38     except:
39         print('manager error')
40     finally:
41         # 一定要关闭,否则会报管道未关闭的错误
42         manager.shutdown()
43 
44 if __name__=='__main__':
45     # windows 下多进程可能会有问题,添加这句可以缓解
46     freeze_support()
47     win_run()

taskworker.py

 1 # coding:utf-8
 2 import time
 3 from multiprocessing.managers import basemanager
 4 # 创建类似的 queuemanager:
 5 class queuemanager(basemanager):
 6     pass
 7 #第一步: 使用queuemanager注册用于获取queue的方法名称
 8 queuemanager.register('get_task_queue')
 9 queuemanager.register('get_result_queue')
10 #第二步:连接到服务器:
11 server_addr = '192.xxx.xx.xxx'
12 print('connect to server %s...'%server_addr)
13 #端口和验证口令注意保持与服务器进程保持一致:
14 m = queuemanager(address=(server_addr,8001),authkey=b'qiye')
15 #从网络连接
16 m.connect()
17 #第三部:获取queue的对象:
18 task = m.get_task_queue()
19 result = m.get_result_queue()
20 #第四步:从task队列获取任务,并把结果写入result队列:
21 while(not task.empty()):
22     image_url = task.get(true,timeout = 5)
23     print('run task download %s ...'%image_url)
24     time.sleep(1)
25     result.put('%s--->success'%image_url)
26 #处理结束
27 print('worker exit.')

先运行 taskmanager.py 服务器端代码,再快速运行 taskworker.py 客户端代码 运行结果依次如下:

python_分布式进程中遇到的问题

python_分布式进程中遇到的问题