使用协程抓取空气质量指数
程序员文章站
2022-05-19 13:16:19
...
介绍
这次爬取的是实时空气污染指数(AQI)。关于这个项目的介绍可以参考联系世界的空气质量指数项目团队,对我而言,它是一个能够比较准确的提供空气污染指数。这个网站也提供了API用来获得数据,但是请求数量有限制,不得超过16PRS。后面发现中国的监测点有2534个,因此16PRS是远远不够的,所以我选择将监测点的目录抓下来,然后自己访问每个监测点页面并抓取数据。
监测点目录获取
监测点目录在URL: http://aqicn.org/city/all/cn/上,通过正则抓取,不赘述了。
import re
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
def db_init():
req = requests.ohRequests()
content = req.get("http://aqicn.org/city/all/cn/")
pattern = re.compile("中国</div><br>(.*?)五家渠农水大厦</a>", re.S)
data = pattern.findall(content)[0] + "五家渠农水大厦</a>"
soup = BeautifulSoup(data, 'lxml')
links = soup.find_all('a')
with sqlite.ohSqlite3(DB_NAME) as db:
db.execute("CREATE TABLE aqicn (location text, url text)")
for link in links:
db.execute("INSERT INTO aqicn VALUES (?,?)", (link.text, link.get('href'),))
db.execute("DELETE FROM aqicn WHERE location = ' '")
if __name__ == "__main__":
db_init()
单线程抓取
上一节提到的监测点总共有2534个,对应了2534条URL,如果通过遍历的方式抓个抓取,抓取函数如下:
import re
import time
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
def parser_single(location, url):
req = requests.ohRequests()
content = req.get(url)
pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
data = pattern.findall(content)
if data:
data = "<table class='api' {} </table>".format(data[0])
soup = BeautifulSoup(data, 'lxml')
aqi = soup.find(id='aqiwgtvalue').text
if aqi == '-':
return None
t = soup.find(id='aqiwgtutime').get('val')
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
return [location, aqi, t]
最后程序跑了1463s,将近24min。
使用协程
理论上使用多进程也是可以的,这里我使用了协程,代码如下:
import re
import time
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
async def parser():
req = requests.ohRequests()
while URLS_LIST:
url = URLS_LIST.pop(0)
header = {'user-agent': req.faker_user_agent()}
async with aiohttp.ClientSession() as session:
async with session.get(url[1], headers=header) as response:
content = await response.text()
pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
data = pattern.findall(content)
if not data:
print ("Something is wrong. Might be station removed:[{}]({})".format(url[0], url[1]))
continue
data = "<table class='api' {} </table>".format(data[0])
soup = BeautifulSoup(data, 'lxml')
aqi = soup.find(id='aqiwgtvalue').text
if aqi == '-':
print ("No Data:[{}]({})".format(url[0], url[1]))
continue
t = soup.find(id='aqiwgtutime').get('val')
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
print ([url[0], aqi, t])
def main():
global URLS_LIST
req = requests.ohRequests()
with sqlite.ohSqlite3(DB_NAME) as db:
URLS_LIST = db.execute("select * from aqicn")
coroutine_cnts = 10
t = time.time()
coros = []
loop = asyncio.get_event_loop()
for i in range(coroutine_cnts):
coros.append(parser())
loop.run_until_complete(asyncio.gather(*coros))
print ("Total {}s".format(time.time()-t))
if __name__ == "__main__":
main()
协程的数量不能太高,虽然IP不会被封,但是会导致大量的失败请求。我猜想这个网站其实用的就是它们提供的数据API,因此对它的访问是受到16PRS的限制的。
使用10条协程时,执行的时间为1160s;50条协程时,时间为321s。效果还是很明显的。
一个不难理解的事实是,协程不断增加不代表时间会越来越短,达到某个阈值的时候,时间就不会再变了。这是因为当第一个协程收到响应时,本来应该醒来继续执行,但是因为协程数量过多,导致其不会立刻苏醒,就等于继续阻塞了。如果无限增加协程数量,一方面会导致资源消耗增大,另一方面也会导致性能下降。
结果比较
序号 | 使用方法 | 执行时间(s) |
---|---|---|
1 | 单线程 | 1463 |
2 | 10条协程 | 1160 |
3 | 50条协程 | 321 |