python_分布式进程中遇到的问题
程序员文章站
2023-02-24 14:54:36
看文档学习分布式进程中遇到了一下问题,文档里面例题是python2.X,我用的python3.x,就出现了一下莫名奇妙的问题,最终版代码先呈上: taskManager.py taskWorker.py 先运行 taskManager.py 服务器端代码,再快速运行 taskWorker.py 客户 ......
看文档学习分布式进程中遇到了一下问题,文档里面例题是python2.x,我用的python3.x,就出现了一下莫名奇妙的问题,最终版代码先呈上:
taskmanager.py
1 # coding:utf-8 2 # taskmanager.py for windows 服务器端 3 4 import queue 5 from multiprocessing.managers import basemanager 6 from multiprocessing import freeze_support 7 # 任务个数 8 task_number = 10 9 # 定义收发队列 10 task_queue = queue.queue(task_number); 11 result_queue = queue.queue(task_number); 12 def get_task(): 13 return task_queue 14 def get_result(): 15 return result_queue 16 #创建类似的queuemanager: 17 class queuemanager(basemanager): 18 pass 19 def win_run(): 20 # windows 下绑定调用接口不能使用lambda,所以只能先定义函数再绑定 21 queuemanager.register('get_task_queue',callable = get_task) 22 queuemanager.register('get_result_queue',callable=get_result) 23 #绑定端口并设置验证口令,windows下需要填写ip地址,linux下不填默认为本地 ip地址为本地ip地址 24 manager = queuemanager(address = ('192.xxx.xx.xxx',8001),authkey =b'qiye') 25 # 启动 26 manager.start() 27 try: 28 # 通过网络获取任务队列和结果队列 29 task = manager.get_task_queue() 30 result = manager.get_result_queue() 31 #添加任务 32 for url in ["imageurl_"+str(i) for i in range(10)]: 33 print('put task %s ...'%url) 34 task.put(url) 35 print('try get result...') 36 for i in range(10): 37 print('result is %s' %result.get(timeout=10)) 38 except: 39 print('manager error') 40 finally: 41 # 一定要关闭,否则会报管道未关闭的错误 42 manager.shutdown() 43 44 if __name__=='__main__': 45 # windows 下多进程可能会有问题,添加这句可以缓解 46 freeze_support() 47 win_run()
taskworker.py
1 # coding:utf-8 2 import time 3 from multiprocessing.managers import basemanager 4 # 创建类似的 queuemanager: 5 class queuemanager(basemanager): 6 pass 7 #第一步: 使用queuemanager注册用于获取queue的方法名称 8 queuemanager.register('get_task_queue') 9 queuemanager.register('get_result_queue') 10 #第二步:连接到服务器: 11 server_addr = '192.xxx.xx.xxx' 12 print('connect to server %s...'%server_addr) 13 #端口和验证口令注意保持与服务器进程保持一致: 14 m = queuemanager(address=(server_addr,8001),authkey=b'qiye') 15 #从网络连接 16 m.connect() 17 #第三部:获取queue的对象: 18 task = m.get_task_queue() 19 result = m.get_result_queue() 20 #第四步:从task队列获取任务,并把结果写入result队列: 21 while(not task.empty()): 22 image_url = task.get(true,timeout = 5) 23 print('run task download %s ...'%image_url) 24 time.sleep(1) 25 result.put('%s--->success'%image_url) 26 #处理结束 27 print('worker exit.')
先运行 taskmanager.py 服务器端代码,再快速运行 taskworker.py 客户端代码 运行结果依次如下:
上一篇: shell 简单脚本编程
下一篇: 针对初学PHP者的疑难问答(2)