欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

异步网络爬虫的Python实现(1)

程序员文章站 2022-06-05 17:30:23
...

本文翻译自500L系列文章,原文链接, 原文作者A. Jesse Jiryu Davis 和 Guido van Rossum.

A. Jesse Jiryu Davis是纽约MongoDB的一名工程师,他是MongoDB Python 驱动Motor的主要作者,同时他也是MongoDB C语言驱动项目的重要成员。他的个人博客地址

Guido van Rossum是Python的创建者,Python社区称他为BDFL(Benevolent Dictator For Life), 他的个人博客地址

简介

传统的计算机和程序设计主要目的是使程序可以更快地运行,然而很多网络程序的设计,更多的时间花费在等待而非计算,因为网络中常用到的是缓慢的连接。在设计这些程序的时候,我们的挑战在于如何有效地处理这些延时,常用的算法是采用异步IO处理。

这一章节我们主要实现一个简单的网络爬虫。这个网络爬虫采用异步算法,其中只有少量的运算,它可以同时等待多个网络响应,十分高效。爬虫抓取的网页越多,用时越短。它会为每个等待中人连接提供一个进程,因此它有可能在抓取过程中耗尽内存,或者其它的一些资源。

我们将分三步来阐述这个例子。首先,我们会先看一下异步事件循环,并实现一个带回调的异步事件循环:它很高效,但是如果在它基础上对程序进行扩展就变得十分麻烦;之后,我们会实现一个高效又易于扩展的方案,这个方案主要基于生成器实现;最后,我们会基于异步函数库和异步队列实现一个更为好的方案。

主要任务

网络爬虫的主要任务就是实现对网页内容的爬取,也有可能会对其进行排序或压缩。它一般由一个网址作为开始,抓取网页内容并进行分析,从中找到没有爬取的链接,并将这些链接放到一个队列中,当找不到新的链接的时候,爬虫就会停止。

这个过程中我们可以同时处理多个网页以提高效率,也就是当爬虫抓到一些网页链接的时候,它可以同时通过多个进程和多个Socket抓取网页,不过有时候链接过多时,我们就需要控制同时抓取的数量。

传统方法

传统的并行抓取的方法是创建一个线程池,每一个线程池通用一个Socket下载一个网页,比如下载xkcd.com网页可以通用以下代码实现。

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    reponse = b''
    chunk = sock.rev(4096)
    while chunk:
        reponse += chunk
        chunk = sock.rev(4096)

    # page is now downloaded.
    links = parse_links(response)
    q.add(links)

默认情况下,socket的操作是阻塞的,也就是在程序调用connectrecv的时候,程序会一直等待操作完成,这样,如果我们需要同时下载多个链接的话,我们就需要很多线程。一个更好一点的做法是将这些空闲的线程放到一个线程池,在需要的时候把它们拿出来执行新的任务,这也类似于socket的连接池。

从目前我们的程序来看线程依然是十分宝贵的资源,在我(作者)的电脑上一个python的线程大致会耗费50k的内存。如果同时开启成千上万的线程的话就会导致系统崩溃,因此这些线程的数量便成为程序的一个瓶颈。

在Dan Kegel的文章”The C10K Problem”中,作者就阐述了这样的问题,他在文章中写道:

现在的服务器应该有同时处理上万个客户端的访问了,毕竟互联网的世界很大。

Kegel在1999年也提到了这个问题,虽然上万的数据量现在听起来并不大,但是科技在进步,相同类型的问题依然存在,只是规模上有些许差异。即使线程消耗的问题可以解决,我们还将面临socket链接用尽的问题。

异步算法

异步IO框架可以在一个线程上通过非阻塞Socket连接实现类似的并发操作。在我们的异步爬虫算法里,我们将连接到服务器的连接设置为非阻塞的方式:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

不过非阻塞的连接在建立的时候会抛出一个异常,这个异常和C语言里将errno设置为EINPROGERSS类似,目的就是通知程序连接已经开始创建。

现在我们还需要知道连接什么时间创建完成,因为只有连接正常建立我们才能向服务器发送数据,这个查询我们就通过一个简单的循环实现就好。

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break # Done
    except OSError as e:
        pass

print('Sent')

不过这种方法虽然简单,却并不适合多线程。实际在BSD Unix中对于这种问题早就有相应的解决方法了,select就是Unix中通过C语言实现的一个专门用于等待类似事件的对象。如今网络应用中这样的需求越来越多,也就有了新的解决方法,比如poll, BSD中的kqueue,linux中的epoll等等。它们的API与select类似,不过更适合管理大规模的连接。

Python3.4中原生库DefaultSelector会自动选择系统中相应的select类函数。比如使用这个函数库注册相应的非阻塞网络IO代码如下:

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected')

selector.register(sock.fileno(), EVENT_WIRTE, connected)

在上述代码中,我们忽略了产生的BlockingIOError异常,并且通过selector.register函数将socket文件的ID与我们期望的写事件相绑定。这样一旦socket文件变得可以写,系统就会调用已经注册的回调函数,这里我们注册的回调函数是connected

之后我们在一个循环中处理收到的通知:

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

connected回调函数存储在event_key.data中,这样我们就可就可以一次执行一个非阻塞socket的数据接收和处理。

通过这种方式程序就可以集中处理已经接收的数据,而那些尚未完成的事件将一直处于等待状态。

在上面这片例子中,我们通过一个异步框架实现了一个线程中在并了操作。现在我们虽然实现了并发操作,但并不是真正意义上的并行处理,因为在涉及IO的操作依然没有并行化。现在的问题就在于IO的瓶颈。

不过相比于之前多线程的操作,我们没有线程上的开销,这是我们还要纠正一个错误理念:异步要比多线程快。实际很多情况下并非如此,比如在python中,当处理少量活跃连接的时候一个事件循环就没有多线程快。如果没有线程锁,实际多线程一般要比异步操作快。异步操作的优势在于处理并不活跃的连接。

上一篇: mysql(一)

下一篇: MySQL入门(三)