异步网络爬虫的Python实现(2)
本文继续上一节的话题:异步网络爬虫的实现。
回调函数的编写
就目前我们写的异步框架,实现一个网络爬虫还是十分困难的,即使是实现一个简单的读取一个页面都很难。
不过我们还是来尝试写一下抓取URL的代码,首先我们定义两个全局的集合来保存需要爬取的网址
urls_todo = set(['/'])
seen_urls = set(['/'])
这里seen_urls
是urls_todo
和已经抓取过的网址的一个合集,两个集合都首先初始化为一个根地址”/”。
我们的程序中抓取一个网页需要一系列的回调函数。在前面的例子中一旦网络连接建立,就会调用回调函数connected
,在这个回调函数中我们会向服务器发送一个GET
请求。不过在这之后,程序收到服务器的返回信息之后还需要回调一个Fetcher
的对象。这个对象需要网址,连接对象,保存地址三个参数:
class Fetcher:
def __init__(self, url):
self.response = b'' # Empty array of bytes.
self.url = url
self.sock = None
它的fetch
方法:
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
# Register next callback.
selector.register( self.sock.fileno(),
EVENT_WRITE,
self.connected)
程序通过fetch方法创建一个socket连接。但是这个方法在连接尚未建立的时候就返回了。因此它必须返回到等待连接的循环:
fetcher = Fetcher('/353/')
fetcher.fetch()
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
所有的事件都在这个循环中处理。也就是fetch控制这个循环,程序也是通过select状态得到什么时候连接正常建立。只有连接正常建立时,程序才会调用connected
回调函数,这个函数也在fetch里面绑定。
下面是connected
函数的实现:
# Method on Fetcher class.
def connected(self, key, mask):
print('connected!')
selector.unregister(key.fd)
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
self.sock.send(request.encode('ascii'))
# Register the next callback.
selector.register(key.fd,
ENENT_READ,
self.read_response)
这个方法向服务器发送一个GET
请求。好的程序实际应该在发送这个请求后检查send
的状态,以防止send
没有正常发送。但是这里因为我们的程序比较小,就简单地直接调用send
方法,然后等待响应。当然它还需要注册一个处理返回数据的函数,这个read_response
的函数负责处理来自服务器的数据:
# Method on Fether class.
def read_response(self, key, mask):
global stopped
chunk = self.sock.recv(4096) # 4K chunk size
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
links = self.parse_links()
# Python set-logic:
for link in links.difference(seen_urls):
urls_todo.add(link)
Fetcher(link).fetch() # <- New Fetcher.
seen_urls.update(links)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
这样在socket连接可以读的时候,程序就会调用这个回调函数,不过这个可以读的信号还可能表示这个连接关闭。
这个回调函数每次请求4K的数据,如果的收到的数据没有4K,那么chunk
中就会包含已经读取到的数据;而如果数据多于4K,那么连接依然是可以读的,程序就会继续读取数据,直至所有数据都读完。当程序读完所有数据的时候连接就会关闭,chunk
也会清除掉已经存储的数据。
parse_links
方法主要用于处理读到的内容,到这里还没有实现,不过可以知道它返回的是读到数据中的一系列url
。这样我们就可以抓取新的连接的内容,这个过程中是没有并发损耗的。注意,好的异步程序一定要注意数据共享,比如这里我们用seen_links
保存url,因为没有实际意义上的并发扣件,所以我们不必担心程序会被中断。
我们还在程序中添加了一个全局的stopped
标志,用以控制我们何时中断我们的循环。
stopped = False
def loop():
while not stopped:
events = selector.select()
for event_key, envent_mask in events:
callback = event_key.data
callback()
一旦所有的页面都被抓取完成,这个全局变量就会中止这个循环,程序也会退出。
这种堆砌的的代码虽然是一种异步的解决方法,但是代码还是十分混乱的。我们需要一种连贯的IO操作和处理方式,一种多任务的管理方式。但是如果不采用线程,很难把这一系列的操作集中到一个函数中去:这样的函数一旦开始一个IO操作,就必须明确地IO操作完成之后的操作,然后执行操作并返回,这种基于状态保存的方式值得深思。
为了更好地解释这个问题,我们来看一个基于阻塞线程的抓取网址的方法:
# Blocking version
def fetch(url):
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
# Page is now downloaded.
links = parse_links(response)
q.add(links)
这种基于阻塞线程的程序是怎么保存程序的状态的呢?在这个函数内部有一个socket对象,一个累加的响应response
。程序通过一个在堆栈上的变量保存需要的状态,因为线程已经帮你搞定了函数的切换,所以你不必考虑何时需要切换函数。
然而基于回调的编程方式,程序并不会自动执行这些切换,在等待IO操作的时候,函数必须显式地保存它的状态以及后面要执行的操作,作为局部变量的替代,我们在self
属性sock
和response
保存所需要的局部数据。并且通过注册不同的回调函数来执行不同的操作。不过一旦程序后面变得复杂,相应的状态也会增加,程序将会变得十分难以维护。
更为糟糕的是,一旦某个回调函数抛出一个异常,下一个回调函数就不会正常执行,程序也就退出了,这并不是我们希望见到的。比如我们在执行parse_links
中抛出一个异常:
Traceback (most recent call last):
File "loop-with-callbacks.py", line 111, in <module>
loop()
File "loop-with-callbacks.py", line 106, in loop
callback(event_key, event_mask)
File "loop-with-callbacks.py", line 51, in read_response
links = self.parse_links()
File "loop-with-callbacks.py", line 67, in parse_links
raise Exception('parse error')
Exception: parse error
堆栈调试信息只能告诉我们在运行回调函数的时候发生了异常,并不会提示异常究竟是如何发生的,这也对代码的调试增加了难度。
抛开异步方式各多线程方式的效率问题,还有一个需要关注的问题:线程有可能因为线程同步问题而产生数据丢失,异步回调则可能因为堆栈问题而难以处理各种异常。
协程的实现方式
实际我们还有一种更好的实现方式,通过这种方式可以兼具多线程和异步的优点,这种实现方式称为协程。在Python 3.4中,有一个专用的异步的库,其中与http相关的库为aiohttp
,通过这个库去访问一个url将十分简洁:
def fetch(self, url):
response = yield from self.session.get(url)
body = yield from response.read()
这样的实现代码上更为简洁,并且相较于多线程的实现,这种实现方式更加节省资源,一般一个线程的开销是50K左右,而这样的协程的实现方式只需要3k内存的额外开销。因此Python可以轻易地实现上万的协程。
协程的概念来自先前的计算机科学,它实际上很简单:一个可以暂停和恢复的子程序。线程的实现是通过操作系统直接实现的,协程则不一样,协程可以自己行选择你何时停止,并在恢复的时候继续运行。
协程的实现也有很多种,实际在python中就有多种实现方式。在异步IO库中是通过内建的生成器产生的,主要是上述代码中的yield from
。而在python 3.5中,协程成为python语言的一个组成部分。不管如何,我们还是通过python3.4对协程作一个简单了解,这也是后续使用协程的一个重要基础。
为了更好地理解python中基于协程的实现方式,我们先来看一个例子,并通过这个例子来理解协程的概念。在充分理解协程概念之后,我们将再次回到网络爬虫的例子中来。
未完待续。。。。
上一篇: Python 2 的网络爬虫
下一篇: python2爬虫的入门知识