欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

异步网络爬虫的Python实现(2)

程序员文章站 2022-05-04 11:28:48
...

本文继续上一节的话题:异步网络爬虫的实现。

回调函数的编写

就目前我们写的异步框架,实现一个网络爬虫还是十分困难的,即使是实现一个简单的读取一个页面都很难。

不过我们还是来尝试写一下抓取URL的代码,首先我们定义两个全局的集合来保存需要爬取的网址

urls_todo = set(['/'])
seen_urls = set(['/'])

这里seen_urlsurls_todo和已经抓取过的网址的一个合集,两个集合都首先初始化为一个根地址”/”。

我们的程序中抓取一个网页需要一系列的回调函数。在前面的例子中一旦网络连接建立,就会调用回调函数connected,在这个回调函数中我们会向服务器发送一个GET请求。不过在这之后,程序收到服务器的返回信息之后还需要回调一个Fetcher的对象。这个对象需要网址,连接对象,保存地址三个参数:

class Fetcher:
    def __init__(self, url):
        self.response = b'' # Empty array of bytes.
        self.url = url
        self.sock = None

它的fetch方法:

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(  self.sock.fileno(),
                            EVENT_WRITE, 
                            self.connected)

程序通过fetch方法创建一个socket连接。但是这个方法在连接尚未建立的时候就返回了。因此它必须返回到等待连接的循环:

fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

所有的事件都在这个循环中处理。也就是fetch控制这个循环,程序也是通过select状态得到什么时候连接正常建立。只有连接正常建立时,程序才会调用connected回调函数,这个函数也在fetch里面绑定。

下面是connected函数的实现:

        # Method on Fetcher class.
        def connected(self, key, mask):
            print('connected!')
            selector.unregister(key.fd)
            request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
            self.sock.send(request.encode('ascii'))

            # Register the next callback.
            selector.register(key.fd,
                              ENENT_READ,
                              self.read_response)

这个方法向服务器发送一个GET请求。好的程序实际应该在发送这个请求后检查send的状态,以防止send没有正常发送。但是这里因为我们的程序比较小,就简单地直接调用send方法,然后等待响应。当然它还需要注册一个处理返回数据的函数,这个read_response的函数负责处理来自服务器的数据:

        # Method on Fether class.
        def read_response(self, key, mask):
            global stopped

            chunk = self.sock.recv(4096) # 4K chunk size
            if chunk:
                self.response += chunk
            else:
                selector.unregister(key.fd)
                links = self.parse_links()

                # Python set-logic:
                for link in links.difference(seen_urls):
                    urls_todo.add(link)
                    Fetcher(link).fetch() # <- New Fetcher.

                seen_urls.update(links)
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True

这样在socket连接可以读的时候,程序就会调用这个回调函数,不过这个可以读的信号还可能表示这个连接关闭。

这个回调函数每次请求4K的数据,如果的收到的数据没有4K,那么chunk中就会包含已经读取到的数据;而如果数据多于4K,那么连接依然是可以读的,程序就会继续读取数据,直至所有数据都读完。当程序读完所有数据的时候连接就会关闭,chunk也会清除掉已经存储的数据。

parse_links方法主要用于处理读到的内容,到这里还没有实现,不过可以知道它返回的是读到数据中的一系列url。这样我们就可以抓取新的连接的内容,这个过程中是没有并发损耗的。注意,好的异步程序一定要注意数据共享,比如这里我们用seen_links保存url,因为没有实际意义上的并发扣件,所以我们不必担心程序会被中断。

我们还在程序中添加了一个全局的stopped标志,用以控制我们何时中断我们的循环。

stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, envent_mask in events:
            callback = event_key.data
            callback()

一旦所有的页面都被抓取完成,这个全局变量就会中止这个循环,程序也会退出。

这种堆砌的的代码虽然是一种异步的解决方法,但是代码还是十分混乱的。我们需要一种连贯的IO操作和处理方式,一种多任务的管理方式。但是如果不采用线程,很难把这一系列的操作集中到一个函数中去:这样的函数一旦开始一个IO操作,就必须明确地IO操作完成之后的操作,然后执行操作并返回,这种基于状态保存的方式值得深思。

为了更好地解释这个问题,我们来看一个基于阻塞线程的抓取网址的方法:

# Blocking version

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

这种基于阻塞线程的程序是怎么保存程序的状态的呢?在这个函数内部有一个socket对象,一个累加的响应response。程序通过一个在堆栈上的变量保存需要的状态,因为线程已经帮你搞定了函数的切换,所以你不必考虑何时需要切换函数。

然而基于回调的编程方式,程序并不会自动执行这些切换,在等待IO操作的时候,函数必须显式地保存它的状态以及后面要执行的操作,作为局部变量的替代,我们在self属性sockresponse保存所需要的局部数据。并且通过注册不同的回调函数来执行不同的操作。不过一旦程序后面变得复杂,相应的状态也会增加,程序将会变得十分难以维护。

更为糟糕的是,一旦某个回调函数抛出一个异常,下一个回调函数就不会正常执行,程序也就退出了,这并不是我们希望见到的。比如我们在执行parse_links中抛出一个异常:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in <module>
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

堆栈调试信息只能告诉我们在运行回调函数的时候发生了异常,并不会提示异常究竟是如何发生的,这也对代码的调试增加了难度。

抛开异步方式各多线程方式的效率问题,还有一个需要关注的问题:线程有可能因为线程同步问题而产生数据丢失,异步回调则可能因为堆栈问题而难以处理各种异常。

协程的实现方式

实际我们还有一种更好的实现方式,通过这种方式可以兼具多线程和异步的优点,这种实现方式称为协程。在Python 3.4中,有一个专用的异步的库,其中与http相关的库为aiohttp,通过这个库去访问一个url将十分简洁:

    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

这样的实现代码上更为简洁,并且相较于多线程的实现,这种实现方式更加节省资源,一般一个线程的开销是50K左右,而这样的协程的实现方式只需要3k内存的额外开销。因此Python可以轻易地实现上万的协程。

协程的概念来自先前的计算机科学,它实际上很简单:一个可以暂停和恢复的子程序。线程的实现是通过操作系统直接实现的,协程则不一样,协程可以自己行选择你何时停止,并在恢复的时候继续运行。

协程的实现也有很多种,实际在python中就有多种实现方式。在异步IO库中是通过内建的生成器产生的,主要是上述代码中的yield from。而在python 3.5中,协程成为python语言的一个组成部分。不管如何,我们还是通过python3.4对协程作一个简单了解,这也是后续使用协程的一个重要基础。

为了更好地理解python中基于协程的实现方式,我们先来看一个例子,并通过这个例子来理解协程的概念。在充分理解协程概念之后,我们将再次回到网络爬虫的例子中来。

未完待续。。。。