欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用协程抓取空气质量指数

程序员文章站 2022-05-19 13:16:19
...

介绍

这次爬取的是实时空气污染指数(AQI)。关于这个项目的介绍可以参考联系世界的空气质量指数项目团队,对我而言,它是一个能够比较准确的提供空气污染指数。这个网站也提供了API用来获得数据,但是请求数量有限制,不得超过16PRS。后面发现中国的监测点有2534个,因此16PRS是远远不够的,所以我选择将监测点的目录抓下来,然后自己访问每个监测点页面并抓取数据。

监测点目录获取

监测点目录在URL: http://aqicn.org/city/all/cn/上,通过正则抓取,不赘述了。

import re
from bs4 import BeautifulSoup

import ohSqlite3 as sqlite
import ohRequests as requests

def db_init():
    req = requests.ohRequests()
    content = req.get("http://aqicn.org/city/all/cn/")
    
    pattern = re.compile("中国</div><br>(.*?)五家渠农水大厦</a>", re.S)
    data = pattern.findall(content)[0] + "五家渠农水大厦</a>"

    soup = BeautifulSoup(data, 'lxml')

    links = soup.find_all('a')

    with sqlite.ohSqlite3(DB_NAME) as db:
        db.execute("CREATE TABLE aqicn (location text, url text)")
        for link in links:
            db.execute("INSERT INTO aqicn VALUES (?,?)", (link.text, link.get('href'),))
        db.execute("DELETE FROM aqicn WHERE location = ' '")   

if __name__ == "__main__":
    db_init()

单线程抓取

上一节提到的监测点总共有2534个,对应了2534条URL,如果通过遍历的方式抓个抓取,抓取函数如下:

import re
import time
from bs4 import BeautifulSoup

import ohSqlite3 as sqlite
import ohRequests as requests

def parser_single(location, url):
    req = requests.ohRequests()
    content = req.get(url)
 
    pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
    data = pattern.findall(content)

    if data:
        data = "<table class='api' {} </table>".format(data[0])
    soup = BeautifulSoup(data, 'lxml')

    aqi = soup.find(id='aqiwgtvalue').text

    if aqi == '-':
        return None

    t = soup.find(id='aqiwgtutime').get('val')

    t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))

    return [location, aqi, t]

最后程序跑了1463s,将近24min。

使用协程

理论上使用多进程也是可以的,这里我使用了协程,代码如下:

import re
import time
import aiohttp
import asyncio
from bs4 import BeautifulSoup

import ohSqlite3 as sqlite
import ohRequests as requests

async def parser():
    req = requests.ohRequests()
    while URLS_LIST:
        url = URLS_LIST.pop(0)
        header = {'user-agent': req.faker_user_agent()}

        async with aiohttp.ClientSession() as session:
            async with session.get(url[1], headers=header) as response:
                content = await response.text()
        
        pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
        data = pattern.findall(content)

        if not data:
            print ("Something is wrong. Might be station removed:[{}]({})".format(url[0], url[1]))
            continue

        data = "<table class='api' {} </table>".format(data[0])
        soup = BeautifulSoup(data, 'lxml')

        aqi = soup.find(id='aqiwgtvalue').text

        if aqi == '-':
            print ("No Data:[{}]({})".format(url[0], url[1]))
            continue

        t = soup.find(id='aqiwgtutime').get('val')

        t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))

        print ([url[0], aqi, t])


def main():
    global URLS_LIST
    req = requests.ohRequests()

    with sqlite.ohSqlite3(DB_NAME) as db:
        URLS_LIST = db.execute("select * from aqicn")


    coroutine_cnts = 10
    t = time.time()
    
    coros = []
    loop = asyncio.get_event_loop()
    for i in range(coroutine_cnts):
        coros.append(parser())

    loop.run_until_complete(asyncio.gather(*coros))

    print ("Total {}s".format(time.time()-t))

if __name__ == "__main__":
    main()

协程的数量不能太高,虽然IP不会被封,但是会导致大量的失败请求。我猜想这个网站其实用的就是它们提供的数据API,因此对它的访问是受到16PRS的限制的。

使用10条协程时,执行的时间为1160s;50条协程时,时间为321s。效果还是很明显的。

一个不难理解的事实是,协程不断增加不代表时间会越来越短,达到某个阈值的时候,时间就不会再变了。这是因为当第一个协程收到响应时,本来应该醒来继续执行,但是因为协程数量过多,导致其不会立刻苏醒,就等于继续阻塞了。如果无限增加协程数量,一方面会导致资源消耗增大,另一方面也会导致性能下降。

结果比较

序号 使用方法 执行时间(s)
1 单线程 1463
2 10条协程 1160
3 50条协程 321