线程进程计算之多任务同步进行
hi各位大佬好,前面一篇介绍了多进程中传值的问题,这里要进行一个线程问题,即,当目前任务进行中一部分数据要用来做另一个任务(new plan),当前任务又不能停下,于是就要开一个线程执行新的任务。
For Video Recommendation in Deep learning QQ Group 277356808
For Visual in deep learning QQ Group 629530787
与其说是随手笔记,不如说是工作记录更恰当。今天本大佬节日,不知道有没有一起过的。韶华不为少年留,恨悠悠,几时休,便作春江都是泪,流不尽,许多愁。
【注,我的很多博文并不都是当天完成的,有的都是几天前就开始写了,只是发表那天提交,当然同时可能开了很多博文】
我想说,线程里面能开进程吗??不是Process这种,是Pool或者ThreadPool,可以试试。先实现demo
from threading import Thread
from multiprocessing.pool import ThreadPool
from multiprocessing import Process,Pool
有计数的快慢的问题,我先尝试下,看是手写的快还是熊猫库快。没想到还是我手写的快啊,小明哥厉害了。但是数据长度上来了还是熊猫库厉害啊,均是执行10次的累计时间,第一个是我手写的,第二个是熊猫库。鉴于我召回的items的量级,所以采用我自己写的没错了。
#data length 1000
time 1 : 0.007994, time 2 : 0.029980
#10000
time 1 : 0.079951, time 2 : 0.033979
#100000
time 1 : 0.959261, time 2 : 0.067959
#百万
time 1 : 8.516236, time 2 : 0.410749
鉴于时间要求,今天暂不进行线程中加多进程,先搞个线程整上。
推进中发现线程是变量进不去,,,,,尴尬,之前的进程是变量出不去(上个博文),解决红字问题,那就是传参,这个很容易,进不去是不想传参,有点懒。
今天不搞个博文都对不起粉丝。。。。明天更这个博文,不要说脏话,文明人。
拜拜
【20201112补充】
鉴于数据计算量大,只有一个线程根本不行,数据直接淹没了,这个线程是否活着也不得而知(因为主进程结束任务后,该线程的过程日志也不会打印在log上)。准备搞16个进程试试看看。昨天的demo线程代码在此
k=12
p=11
def update_model(k,p):
k+=1
print("p: %d, k: %d\n"%(p,k))
thread1=Thread(target= update_model,args=(k,p,))
thread1.setDaemon(True)
thread1.start()
print("out: %d\n"%k)
按照上面的修改后直接报错,这是线程中的经典错误。这就是说我昨天的程序根本就没跑起来啊。除却五侯歌舞地,人间何处不相随。
Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Thread 0x00007ff69068b700 (most recent call first):
File "thread_pool_.py", line 92 in update_model
File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 870 in run
File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 926 in _bootstrap_inner
File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 890 in _bootstrap
Current thread 0x00007ff6a9243700 (most recent call first):
已放弃
一个博主说,将此线程函数写一个文件,也就是说线程只需要能执行那个文件即可,不知道能不能行,试试
thread1.start()
thread1.join()
改成这种是不行的,我还以为在执行了,top命令发现有python3的程序,后来发现是之前别人的服务,我都想给他kill了,下面是查运行的程序的主要方法
$ ps aux | grep python
我先试试直接运行函数可以不,直接运行32ThreadPool,10万用户要2min,改下其他参数看影响大不大,16个反而时间更少了。。。。吃惊,后来发现大于16就没啥用了
修改其他参数影响不大,增加用户数为100万则时间成倍增加,近20min
下面要采用别人的方法要面临一个问题,如何给文件传参数,argparse/sys的格式能不能行?这种也就传个字符串,数字吧,没见过传一个大的数组,下面是常见两种方式,还有其他方式多了,
os.system("python3 main.py --task train")
os.popen(cmd)
这种是不行的,所以说,那个博主的原本意思是在其他py文件中写个函数,然后导入,这样就不会出上面的“已放弃”错误了(这个已放弃不是我加的)
下面再次直接试试,因为我加了global变量,为了能够看到效果,因为即使打印运行过程,线程中的在主程序结束后的日志是无法打印在日志中的,所以我将处理结果存储在redis集群,2h有效期,当然也可为了测试考虑,整个time.sleep,这样打印的日志能看到。
增加global后没有报错,现在就是等一个小时,趁此时间搞下GCEGNN,这个issue还是没解决啊,欲说还休,欲说还休,却道天凉好个秋。
事实证明,添加sleep是可以看到打印的日志,而且也已经存上redis了(如下),但我还是想证明下,如果主程序结束了,这个线程会不会活着?
1) "173"
2) "214"
3) "244"
4) "282"
5) "298"
6) "331"
7) "506"
8) "680"
9) "765"
10) "779"
11) "784"
12) "810"
13) "816"
14) "995"
15) "1050"
服务器只有用了才是自己的,赶紧搞起来啊!!不然服务器都没了。都是共同申请的,啥时候有我专用的呢?GNN与DGCF同时进行,后者还没看paper,who care?
主程序运行完后,线程在top及我上面提及的方法中查询不到是否在运行,只能通过查看redis是否存储了。
然而过了近两个小时,我发现redis没有存储啊。。。。而且,主程序结束时也没有那么大的内存,这就是说明,主程序结束,线程就死了??下面拿小demo做试验证实或证伪(那就需要改程序了)
def update_model():
results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个
#redis写入省略
。。。
#线程同最上,后面加
time.sleep(100)
#保证能存上
修改为如下,程序结束等了三分钟也没存入redis,说明程序真的死了,证明了假设,红字假设也是网上常见的问题。
def update_model():
results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个
time.sleep(100)
#redis写入省略
。。。
#线程同最上,后面加
time.sleep(1)
#保证能存上
既然搞个线程不能解决,那就来两个进程可好,同时进行,也就是说添加了一个进程
直接加个Process即可解决,也就是说原来的程序可以不变,加个Process能跳过吗???(像线程那样,能够同时进行吗?)证实可以跳过,可在start后加个标志打印下即可,如下示例:进程代码参考上一篇(不要join),不再赘述
print("main")
#time.sleep(10)
#。。进程
print("main 2")
p.start()
print("main 3")
#打印结果
main
main 2
main 3
starting @ 2020-11-12 19:37:58.584438s
bk=1
saving time 10.072225
saving finished @ 2020-11-12 19:38:08.656873s
#后面的是update_model中的打印日志,说明跳过了start
今天任务完成了,这是完整版的,希望粉丝多多捧场。
来何容易去何迟,半在心头半在眉。
【20201113补充】
昨天的没有执行好,因为熊猫库处理太慢了,整成字典很快。给占用GPU的程序加了进程,只是start,后来发现不行,要有join,难道说没有join,整个流程结束了也不会终止吗?下面测试demo(参考上一篇)
欲上高楼去避愁,愁还随我上高楼。
Process MyProcess-1:
Traceback (most recent call last):
File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "test_update_rec_items.py", line 405, in run
next_task = self.task_queue.get()
File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
强行终止后发现这种,于是增加task为None放最后,不然while死循环了【不要问我在说啥子,真正了解了就不会这么问了,不懂就自己实践下】
再等1h,顺便搞下GNN的破事,
搞定了,拜拜。
谁念西风独自凉,萧萧黄叶闭疏窗。
本文地址:https://blog.csdn.net/SPESEG/article/details/109601661
上一篇: ArrayList 源码解析和设计思路
下一篇: Java基础总结01拓展