使用concurrent.futures模块中的线程池与进程池
使用concurrent.futures模块中的线程池与进程池
线程池与进程池
以线程池举例,系统使用多线程方式运行时,会产生大量的线程创建与销毁,创建与销毁必定会带来一定的消耗,甚至导致系统资源的崩溃,这时使用线程池就是一个很好的解决方式。
“池”就说明了这里边维护了不止一个线程,线程池会提前创建好规定数量的线程,把需要使用多线程的任务提交给线程池,线程池会自己选择空闲的线程来执行提交的任务,任务完成后,线程并不会在池子中销毁,而是继续存在并等待完成下一个分配的任务。当线程池以满的时候,提交的线程会等待,也就是说线程池会有一个最大数量的运行线程限制。
进程池同样也是这个道理。
concurrent.futures模块为我们提供了threadpoolexecutor与processpoolexecutor来使用线程进程池
threadpoolexecutor
下面是一个简单的例子
from concurrent.futures import threadpoolexecutor import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') pool = threadpoolexecutor(max_workers=3) start = time.time() for url in url_list: future = pool.submit(get_url,url) # print(future) end = time.time() print(end-start)
输出的结果为:
0.0016434192657470703
例子中max_workers为指定线程个数,pool.submit为提交任务到线程执行,get_url为方法,url为参数
并且通过输出顺序可以看到线程池的执行并不会阻塞主线程的运行
print(future)被打了注释,现在我们取消注释运行一下:
future at 0x7ff6cfaa8860 state=running
future at 0x7ff6ce965860 state=running
future at 0x7ff6ce96e278 state=running
0.006175518035888672
每提交一个任务后都会返回一个future对象,通过它可以查看任务运行的状态,state=running表示正在运行
future对象还有许多方法:
future.done()
from concurrent.futures import threadpoolexecutor import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') pool = threadpoolexecutor(max_workers=3) future_list = [] start = time.time() for url in url_list: future = pool.submit(get_url,url) print(future.done()) future_list.append(future) end = time.time() print(end-start) time.sleep(5) for future in future_list: print(future.done())
这里添加了future_list,为了显示效果中间添加sleep,最后结果为:
false
false
false
0.001546621322631836
true
true
true
future.done()可以显示当前允许状态
future.result()
from concurrent.futures import threadpoolexecutor import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') return url pool = threadpoolexecutor(max_workers=3) future_list = [] start = time.time() for url in url_list: future = pool.submit(get_url,url) print(future.result()) future_list.append(future) end = time.time() print(end-start) for future in future_list: print(future.result())
结果为:
2.0975613594055176
可见result()方法可以得到任务的返回值,但会阻塞,因为不运行完怎么会得到返回值呢?
除此之外还有很多方法:
使用map方法
from concurrent.futures import threadpoolexecutor import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') return url pool = threadpoolexecutor(max_workers=3) pool.map(get_url,url_list)
与内建函数用法类似
使用wait方法
from concurrent.futures import threadpoolexecutor,wait import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') return url pool = threadpoolexecutor(max_workers=3) future_list = [] start = time.time() for url in url_list: future = pool.submit(get_url,url) future_list.append(future) print(wait(future_list)) end = time.time() print(end-start)
doneandnotdonefutures(done={future at 0x7f7506447da0 state=finished returned str, future at 0x7f75074c9828 state=finished returned str, future at 0x7f75064477f0 state=finished returned str}, not_done=set())6.678021430969238
wait返回值是一个元组,元组里是已完成和未完成的两个集合,它的return_when参数接受3个选项first_completed, first_exception 和all_complete,默认是all_complete,意味着所有都完成,first_completed意味着有一个完成了就可以了, first_exception是第一个出现异常就会停止wait
例如:
from concurrent.futures import threadpoolexecutor,wait import requests,time url_list = ['https://www.cnblogs.com/', 'https://www.csdn.net/', 'https://github.com/'] def get_url(url): content = requests.get(url).content.decode() print(url+'已获取') return url def error(url): gg pool = threadpoolexecutor(max_workers=4) future_list = [] start = time.time() future_list.append(pool.submit(error,'https://www.cnblogs.com/')) for url in url_list: future = pool.submit(get_url,url) future_list.append(future) print(wait(future_list,return_when='first_exception')) end = time.time() print(end-start)
doneandnotdonefutures(done={future at 0x7fd1a5b95320 state=finished raised nameerror}, not_done={future at 0x7fd1a4b11a90 state=running, future at 0x7fd1a4b11a20 state=running, future at 0x7fd1a4c897f0 state=running})
0.001996755599975586
processpoolexecutor
进程池与线程池的使用方式基本相同,套用即可
下一篇: Python的垃圾回收机制深入分析