concurrent.futures进行并发编程
python中进行并发编程一般使用threading和multiprocessing模块,不过大部分的并发编程任务都是派生一系列线程,从队列中收集资源,然后用队列收集结果。在这些任务中,往往需要生成线程池,concurrent.futures模块对threading和multiprocessing模块进行了进一步的包装,可以很方便地实现池的功能。
下载
python3中concurrent.futures是标准库,在python2中还需要自己安装futures:
pip install futures
executor与future
concurrent.futures供了threadpoolexecutor和processpoolexecutor两个类,都继承自executor,分别被用来创建线程池和进程池,接受max_workers参数,代表创建的线程数或者进程数。processpoolexecutor的max_workers参数可以为空,程序会自动创建基于电脑cpu数目的进程数。
from concurrent.futures import threadpoolexecutor, processpoolexecutor
import requests
def load_url(url):
return requests.get(url)
url = 'http://httpbin.org'
executor = threadpoolexecutor(max_workers=1)
future = executor.submit(load_url, url)
executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。
print future.done()
print future.result().status_code
submit()方法只能进行单个任务,用并发多个任务,需要使用map与as_completed。
map
urls = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
return requests.get(url)
with threadpoolexecutor(max_workers=3) as executor:
for url, data in zip(urls, executor.map(load_url, urls)):
print('%r page status_code %s' % (url, data.status_code))
结果:
'http://httpbin.org' page status_code 200
'http://example.com/' page status_code 200
'https://api.github.com/' page status_code 200
map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器。
由上面可以看出返回结果与序列结果的顺序是一致的
as_completed
as_completed()方法返回一个future组成的生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,直到所有的任务结束。
def load_url(url):
return url, requests.get(url).status_code
with threadpoolexecutor(max_workers=3) as executor:
tasks = [executor.submit(load_url, url) for url in urls]
for future in as_completed(tasks):
print future.result()
结果:
('http://example.com/', 200)
('http://httpbin.org', 200)
('https://api.github.com/', 200)
可以看出,结果与序列顺序不一致,先完成的任务会先通知主线程。
wait
wait方法可以让主线程阻塞,直到满足设定的要求。有三种条件all_completed, first_completed,first_exception。
from concurrent.futures import threadpoolexecutor, processpoolexecutor, wait, all_completed, first_completed
from concurrent.futures import as_completed
import requests
urls = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
requests.get(url)
print url
with threadpoolexecutor(max_workers=3) as executor:
tasks = [executor.submit(load_url, url) for url in urls]
wait(tasks, return_when=all_completed)
print 'all_cone'
返回:
http://example.com/
http://httpbin.org
https://api.github.com/
all_cone
可以看出阻塞到任务全部完成。
processpoolexecutor
使用processpoolexecutor与threadpoolexecutor方法基本一致,注意文档中有一句:
the __main__
module must be importable by worker subprocesses. this means that processpoolexecutor
will not work in the interactive interpreter.
需要__main__模块。
def main():
with processpoolexecutor() as executor:
tasks = [executor.submit(load_url, url) for url in urls]
for f in as_completed(tasks):
ret = f.done()
if ret:
print f.result().status_code
if __name__ == '__main__':
main()
上一篇: 创建动态SQL存储过程的代码实例