欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  互联网

线程进程计算之多任务同步进行

程序员文章站 2022-04-01 10:36:11
hi各位大佬好,前面一篇介绍了多进程中传值的问题,这里要进行一个线程问题,即,当目前任务进行中一部分数据要用来做另一个任务(new plan),当前任务又不能停下,于是就要开一个线程执行新的任务。For Video Recommendation in Deep learning QQ Group 277356808For Visual in deep learning QQ Group629530787I'm here waiting for you别加那么多,没必要,另外,不接受这个网页....

hi各位大佬好,前面一篇介绍了多进程中传值的问题,这里要进行一个线程问题,即,当目前任务进行中一部分数据要用来做另一个任务(new plan),当前任务又不能停下,于是就要开一个线程执行新的任务。

For Video Recommendation in Deep learning QQ Group 277356808

For Visual in deep learning QQ Group 629530787

I'm here waiting for you

别加那么多,没必要,另外,不接受这个网页的私聊/私信!!!

 与其说是随手笔记,不如说是工作记录更恰当。今天本大佬节日,不知道有没有一起过的。韶华不为少年留,恨悠悠,几时休,便作春江都是泪,流不尽,许多愁。

【注,我的很多博文并不都是当天完成的,有的都是几天前就开始写了,只是发表那天提交,当然同时可能开了很多博文】

我想说,线程里面能开进程吗??不是Process这种,是Pool或者ThreadPool,可以试试。先实现demo

from threading import Thread
from multiprocessing.pool import ThreadPool
from multiprocessing import Process,Pool

有计数的快慢的问题,我先尝试下,看是手写的快还是熊猫库快。没想到还是我手写的快啊,小明哥厉害了。但是数据长度上来了还是熊猫库厉害啊,均是执行10次的累计时间,第一个是我手写的,第二个是熊猫库。鉴于我召回的items的量级,所以采用我自己写的没错了。

#data length 1000
time 1 : 0.007994,  time 2 : 0.029980

#10000
time 1 : 0.079951,  time 2 : 0.033979

#100000
time 1 : 0.959261,  time 2 : 0.067959

#百万
time 1 : 8.516236,  time 2 : 0.410749

鉴于时间要求,今天暂不进行线程中加多进程,先搞个线程整上。

推进中发现线程是变量进不去,,,,,尴尬,之前的进程是变量出不去(上个博文),解决红字问题,那就是传参,这个很容易,进不去是不想传参,有点懒。

今天不搞个博文都对不起粉丝。。。。明天更这个博文,不要说脏话,文明人。

拜拜

【20201112补充】

鉴于数据计算量大,只有一个线程根本不行,数据直接淹没了,这个线程是否活着也不得而知(因为主进程结束任务后,该线程的过程日志也不会打印在log上)。准备搞16个进程试试看看。昨天的demo线程代码在此

    k=12
    p=11
    def update_model(k,p):
        k+=1
        print("p: %d, k: %d\n"%(p,k))    


    thread1=Thread(target= update_model,args=(k,p,))          
    thread1.setDaemon(True)
    thread1.start()

    print("out: %d\n"%k)

按照上面的修改后直接报错,这是线程中的经典错误。这就是说我昨天的程序根本就没跑起来啊。除却五侯歌舞地,人间何处不相随。

Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Thread 0x00007ff69068b700 (most recent call first):
  File "thread_pool_.py", line 92 in update_model
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 870 in run
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 926 in _bootstrap_inner
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 890 in _bootstrap

Current thread 0x00007ff6a9243700 (most recent call first):
已放弃

一个博主说,将此线程函数写一个文件,也就是说线程只需要能执行那个文件即可,不知道能不能行,试试

   thread1.start()
   thread1.join()

改成这种是不行的,我还以为在执行了,top命令发现有python3的程序,后来发现是之前别人的服务,我都想给他kill了,下面是查运行的程序的主要方法

$ ps aux | grep python

我先试试直接运行函数可以不,直接运行32ThreadPool,10万用户要2min,改下其他参数看影响大不大,16个反而时间更少了。。。。吃惊,后来发现大于16就没啥用了

修改其他参数影响不大,增加用户数为100万则时间成倍增加,近20min

下面要采用别人的方法要面临一个问题,如何给文件传参数,argparse/sys的格式能不能行?这种也就传个字符串,数字吧,没见过传一个大的数组,下面是常见两种方式,还有其他方式多了,

os.system("python3 main.py --task train")
os.popen(cmd)

这种是不行的,所以说,那个博主的原本意思是在其他py文件中写个函数,然后导入,这样就不会出上面的“已放弃”错误了(这个已放弃不是我加的)

下面再次直接试试,因为我加了global变量,为了能够看到效果,因为即使打印运行过程,线程中的在主程序结束后的日志是无法打印在日志中的,所以我将处理结果存储在redis集群,2h有效期,当然也可为了测试考虑,整个time.sleep,这样打印的日志能看到。

增加global后没有报错,现在就是等一个小时,趁此时间搞下GCEGNN,这个issue还是没解决啊,欲说还休,欲说还休,却道天凉好个秋。

事实证明,添加sleep是可以看到打印的日志,而且也已经存上redis了(如下),但我还是想证明下,如果主程序结束了,这个线程会不会活着?

  1) "173"
  2) "214"
  3) "244"
  4) "282"
  5) "298"
  6) "331"
  7) "506"
  8) "680"
  9) "765"
 10) "779"
 11) "784"
 12) "810"
 13) "816"
 14) "995"
 15) "1050"

服务器只有用了才是自己的,赶紧搞起来啊!!不然服务器都没了。都是共同申请的,啥时候有我专用的呢?GNN与DGCF同时进行,后者还没看paper,who care?

主程序运行完后,线程在top及我上面提及的方法中查询不到是否在运行,只能通过查看redis是否存储了。

然而过了近两个小时,我发现redis没有存储啊。。。。而且,主程序结束时也没有那么大的内存,这就是说明,主程序结束,线程就死了??下面拿小demo做试验证实或证伪(那就需要改程序了)

def update_model():
    results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个

    #redis写入省略
    。。。

#线程同最上,后面加
time.sleep(100)
#保证能存上

修改为如下,程序结束等了三分钟也没存入redis,说明程序真的死了,证明了假设,红字假设也是网上常见的问题。

def update_model():
    results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个
    time.sleep(100)
    #redis写入省略
    。。。

#线程同最上,后面加
time.sleep(1)
#保证能存上

既然搞个线程不能解决,那就来两个进程可好,同时进行,也就是说添加了一个进程

直接加个Process即可解决,也就是说原来的程序可以不变,加个Process能跳过吗???(像线程那样,能够同时进行吗?)证实可以跳过,可在start后加个标志打印下即可,如下示例:进程代码参考上一篇(不要join),不再赘述

    print("main")
    #time.sleep(10)
    #。。进程
    print("main 2")
    p.start()
    print("main 3")

#打印结果
main
main 2
main 3
starting @ 2020-11-12 19:37:58.584438s

bk=1
saving time 10.072225
saving finished @ 2020-11-12 19:38:08.656873s

#后面的是update_model中的打印日志,说明跳过了start

今天任务完成了,这是完整版的,希望粉丝多多捧场。

来何容易去何迟,半在心头半在眉。

 

【20201113补充】

昨天的没有执行好,因为熊猫库处理太慢了,整成字典很快。给占用GPU的程序加了进程,只是start,后来发现不行,要有join,难道说没有join,整个流程结束了也不会终止吗?下面测试demo(参考上一篇

欲上高楼去避愁,愁还随我上高楼。

Process MyProcess-1:
Traceback (most recent call last):
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "test_update_rec_items.py", line 405, in run
    next_task = self.task_queue.get()
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

强行终止后发现这种,于是增加task为None放最后,不然while死循环了【不要问我在说啥子,真正了解了就不会这么问了,不懂就自己实践下】

再等1h,顺便搞下GNN的破事

搞定了,拜拜。

 

谁念西风独自凉,萧萧黄叶闭疏窗。

本文地址:https://blog.csdn.net/SPESEG/article/details/109601661