异步网络爬虫的Python实现(4)
本文继续上一节的话题:异步网络爬虫的实现。
开启协程
现在我们正式回到我们之前实现网络爬虫的话题。
一个网络爬虫的实现主要有以下几个步骤:抓取一个页面,分析页面链接,加载链接到一个队列。这样直到整个网站页面抓取完成,但是这里受限于客户端和服务器,我们只抓取一定数量的页面。为了保证效率,当一个页面抓取完成后,程序要立即从队列中获取下一个页面的链接,并进行抓取。如果队列中的链接没有那么多的话,程序需要暂停一部分worker
, 不过一个抓到一个含有很多链接的页面,那么链接队列一下就会添加很多链接,这就需要暂停的worker
迅速唤醒,以保证快速抓取,最后一旦所有的链接抓取完成,程序应该可以自动停止运行。
如果采用线程的方式实现,我们来大概看一下具体的实现过程:首先我们需要用标准的Python库创建一个同步队列,每次向这个队列中添加一个新的链接,队列增加一个任务。当一个Worker
抓取完一个页面的时候,调用一个task_done
的方法。主线程会同步队列的操作,直到所有的队列任务完成,程序退出。
实际协程用到类似的工作方式,首先我们创建一个异步队列:
try:
from asyncio import JoinableQueue as Queue
except ImportError:
# In Python 3.5, asyncio.JoinableQueue is
# merged into Queue.
from asyncio import Queue
之后在crawler
类中收集所有worker
的状态,并在crawler
的方法中编写主程序,最后我们通过协程的方式开启主循环直到所有的任务执行完毕。
loop = asyncio.get_event_loop()
crawler = crawling.Crawler('http://xkcd.com',
max_redirect=10)
loop.run_until_complete(crawler.crawl())
爬虫通过上述方式开始抓取操作,两个参数,一个是最开始抓取的网址,另一个是最大跳转数量,这个后面会讲到。
class Crawler:
def __init__(self, root_url, max_redirect):
self.max_tasks = 10
self.max_redirect = max_redirect
self.q = Queue()
self.seen_urls = set()
# aiohttp's ClientSession does connection pooling and
# HTTP keep-alives for us.
self.session = aiohttp.ClientSession(loop=loop)
# Put (URL, max_redirect) in the queue.
self.q.put((root_url, self.max_redirect))
现在队列中尚未抓取的链接数目是1,也就是刚刚添加的链接,回到之前的爬虫开始运行的代码:
loop.run_until_complete(crawler.crawl())
爬虫的协程相当于一个主线程:直到所有任务完成之前,它会阻止新的任务,而任务的运行是在后台的。
@asyncio.coroutine
def crawl(self):
"""Run the crawler until all work is done."""
workers = [asyncio.Task(self.work())
for _ in range(self.max_tasks)]
# When all work is done, exit.
yield from self.q.join()
for w in workers:
w.cancel()
如果任务是以线程的方式执行的,那么我们不会同时开启所有的线程。我们应当尽量避免不必要的线程开销。不过要是任务是以协程的方式执行的,那么我们可以直接开启尽量多的协程,因为它的开销很小。
还有一点需要仔细考虑一下,这些开启的线程怎样停止运行呢?想象一下,如果队列中没有足够的任务,那么这些worker
都会处于等待状态,而实际已经没有新的链接产生了,它们就这样一直等待下去吗?显然这并不是我们需要的结果。因此需要在主程序中停止这些不再需要的协程,否则,如果通过Python来强制停止这些线程则会出现意外:
ERROR:asyncio:Task was destroyed but it is pending!
那么怎样才能停止worker
呢?这就需要用到一个我们还没提到的一个特性:你可以通过抛出一个异常来停止一个协程:
>>> gen = gen_fn()
>>> gen.send(None) # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "<input>", line 2, in gen_fn
Exception: error
这里,生成器通过throw
唤醒,现在协程直接抛出一个异常,如果生成器中没有捕获这个异常的代码,那么这个异常就会一直到达最顶端的程序,这样顶端程序也可以停止这个协程。因此结束一个任务协程可以通过以下方式实现:
# Method of Task class.
def cancel(self):
self.coro.throw(CancelledError)
这里当停止一个协程会抛出一个异常,我们可以在step
方法中处理这个异常:
# Method of Task class.
def step(self, future):
try:
next_future = self.coro.send(future.result)
except CancelledError:
self.cancelled = True
return
except StopIteration:
return
next_future.add_done_callback(self.step)
现在子协程将知道何时当停止工作,当它在停止工作的时候它应当关闭它占有的一些资源。当爬虫关闭所有worker
,那么可以认为所有页面都已抓取完成,总的大循环看到所有协程退出时,这个循环也应退出:
loop.run_until_complete(crawler.crawl())
crawl
方法中包含爬虫的所有操作步骤:从队列中获取新的即将抓取的链接,抓取链接内容,解析链接内容并提取新的链接,添加新的链接到抓取队列。而每个work
会独立地执行页面抓取操作。
@asyncio.coroutine
def work(self):
while True:
url, max_redirect = yield from self.q.get()
# Download page and add new links to self.q.
yield from self.fetch(url, max_redirect)
self.q.task_done()
上一篇: go 中recover捕获异常
下一篇: 打印的日志哪里去了?