欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

小白对异步IO的理解

程序员文章站 2022-06-22 11:11:47
前言 看到越来越多的大佬都在使用python的异步IO,协程等概念来实现高效的IO处理过程,可是我对这些概念还不太懂,就学习了一下。 因为是初学者,在理解上有很多不到位的地方,如果有错误,还希望能够有人积极帮我指出。 下面就使用一个简单的爬虫的例子,通过一步一步的改进,最后来用异步IO的方式实现。 ......

前言

看到越来越多的大佬都在使用python的异步io,协程等概念来实现高效的io处理过程,可是我对这些概念还不太懂,就学习了一下。 因为是初学者,在理解上有很多不到位的地方,如果有错误,还希望能够有人积极帮我指出。

下面就使用一个简单的爬虫的例子,通过一步一步的改进,最后来用异步io的方式实现。

1. 阻塞的io

我们要实现一个爬虫,去爬百度首页n次,最简单的想法就是依次下载,从建立socket连接到发送网络请求再到读取响应数据,顺序进行。

代码如下:

 1 import time 
 2 import socket
 3 import sys
 4  
 5 def dorequest():
 6     sock = socket.socket()
 7     sock.connect(('www.baidu.com',80))
 8     sock.send("get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n".encode("utf-8"))
 9     response = sock.recv(1024)
10     return response
11  
12 def main():
13     start = time.time()
14     for i in range(int(sys.argv[1])):
15         dorequest()
16     print("spend time : %s" %(time.time()-start))
17  
18 main()

因为socket是阻塞方式调用的,所以cpu执行到sock.connect(),sock.recv(),就会一直卡在那里直到socket的状态就绪,所以浪费了很多的cpu时间。

 

请求10次和20次的时间分别如下所示:

1 ➜ python3 1.py  10
2 spend time : 0.9282660484313965
3 ➜ python3 1.py  20
4 spend time : 1.732438087463379

可以看到,速度慢的跟蜗牛一样。

2. 改进1-并发

为了加快请求的速度,很容易想到我们可以使用并发的方式进行,那么最好的方式就是使用多线程了。修改后的代码如下:

 1 # 多线程
 2  
 3 import time 
 4 import socket
 5 import sys
 6 import threading 
 7  
 8 def dorequest():
 9     sock = socket.socket()
10     sock.connect(('www.baidu.com',80))
11     sock.send("get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n".encode("utf-8"))
12     response = sock.recv(1024)
13     return response
14  
15 def main():
16     start = time.time()
17     threads = []
18     for i in range(int(sys.argv[1])):
19         # dorequest()
20         threads.append(threading.thread(target=dorequest,args=()))
21     for i in threads:
22         i.start()
23     for i in threads:
24         i.join()
25     print("spend time : %s" %(time.time()-start))

使用线程之后,看一下请求10次和20次的时间:

1 ➜  python3 2.py  10
2 spend time : 0.1124269962310791
3 ➜ python3 2.py  20
4 spend time : 0.15438294410705566

速度明显快了很多,几乎是刚才的10倍了。

但是python的线程是有问题的,因为一个python进程中,同一时刻只允许一个线程运行,正在执行的线程会获取到gpl。做阻塞的系统调用时,例如sock.connect(),sock.recv()时,当前线程会释放gil,让别的线程有机会获取gpl,然后执行。但是这种获取gpl的调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件。这种竞争有可能使某些线程处于劣势,导致一直获取不到gpl

比如如下的情况,线程1执行的代码如下:

1 flag = true
2 while flag:
3     .....  # 这里省略一些复杂的操作,会调用多次io操作
4     time.sleep(1)

可以看到,线程1的任务非常简单,而线程2的任务非常复杂,这就会导致cpu不停地去执行线程1,而真正做实际工作的线程2却很少被调度到,导致了浪费了大量的cpu资源。

3. 改进2-非阻塞方式

在第一个例子中,我们意识到浪费了大量的时间,是因为我们用了阻塞的io,导致cpu在卡在那里等待io的就绪,那使用非阻塞的io,是不是就可以解决这个问题了。

代码如下:

 1 import time 
 2 import socket
 3 import sys
 4  
 5 def dorequest():
 6     sock = socket.socket()
 7     sock.setblocking(false)
 8     try:
 9         sock.connect(('www.baidu.com',80))
10     except blockingioerror:
11         pass   
12  
13     # 因为设置为非阻塞模式了,不知道何时socket就绪,需要不停的监控socket的状态
14     while true:
15         try:
16             sock.send("get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n".encode("utf-8"))
17             # 直到send 不抛出异常,就发送成功了 
18             break
19         except oserror:
20             pass
21     while true:
22         try:
23             response = sock.recv(1024)
24             break
25         except oserror:
26             pass
27     return response
28 def main():
29     start = time.time()
30     for i in range(int(sys.argv[1])):
31         dorequest()
32     print("spend time : %s" %(time.time()-start))
33  
34 main()

sock.setblocking(false)把socket设置为非阻塞式的,也就是说执行完sock.connect()sock.recv()之后,cpu不再等待io了,会继续往下执行,来看一下执行时间:

1 ➜  python3 3.py  10
2 spend time : 1.0597507953643799
3 ➜  python3 3.py  20
4 spend time : 2.0327329635620117

感觉被骗了,速度还是跟第一个一样啊,看来非阻塞io并没有什么速度上的提升啊,问题出在哪里呢?看代码发现多了两个while循环:

 1 while true:
 2     try:
 3         sock.send("get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n".encode("utf-8"))
 4         # 直到send 不抛出异常,就发送成功了 
 5         break
 6     except oserror:
 7         pass
 8 while true:
 9     try:
10         response = sock.recv(1024)
11         break
12     except oserror:
13         pass

因为把socket设置为非阻塞模式了,所以cpu并不知道io什么时候就绪,所以必须在这里不停的尝试,直到io可以使用了为止。

虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段cpu没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。

有没有办法让cpu空闲出来的时间,不用来不停的询问io,而是干别的更有意义的事情呢,等io就绪后再通知cpu回来处理呢?当然有了,那就是回调。

4. 改进3-回调

操作系统已经把io状态的改变封装成了事件,如可读事件、可写事件。并且可以为这些事件绑定处理函数。所以我们可以使用这种方式,为socket的io状态的变化绑定处理函数,交给系统进行调动,这样就是回调方式。python的select模块支持这样的操作。

代码如下:

 1 import socket
 2 from selectors import defaultselector, event_write, event_read
 3 import sys
 4 selector = defaultselector()
 5 stopped = false
 6 urls_todo = []
 7  
 8 class crawler:
 9     def __init__(self, url):
10         self.url = url
11         self.sock = none
12         self.response = b''
13  
14     def fetch(self):
15         self.sock = socket.socket()
16         self.sock.setblocking(false)
17         try:
18             self.sock.connect(('www.baidu.com', 80))
19         except blockingioerror:
20             pass
21         selector.register(self.sock.fileno(), event_write, self.connected)
22  
23     def connected(self, key, mask):
24         selector.unregister(key.fd)
25         get = 'get {0} http/1.0\r\nhost: www.baidu.com\r\n\r\n'.format(self.url)
26         self.sock.send(get.encode('ascii'))
27         selector.register(key.fd, event_read, self.read_response)
28  
29     def read_response(self, key, mask):
30         global stopped
31         # 如果响应大于4kb,下一次循环会继续读
32         chunk = self.sock.recv(4096)
33         if chunk:
34             self.response += chunk
35         else:
36             selector.unregister(key.fd)
37             urls_todo.remove(self.url)
38             if not urls_todo:
39                 stopped = true
40  
41 def loop():
42     while not stopped:
43         # 阻塞, 直到一个事件发生
44         events = selector.select()
45         for event_key, event_mask in events:
46             callback = event_key.data
47             callback(event_key, event_mask)
48  
49 if __name__ == '__main__':
50     import time
51     start = time.time()
52     for i in range(int(sys.argv[1])):
53         urls_todo.append("/"+str(i))
54         crawler = crawler("/"+str(i))
55         crawler.fetch()
56     loop()
57     print("spend time : %s" %(time.time()-start))  

监控socket的状态,如果变为可写的,就往里面写数据

selector.register(self.sock.fileno(), event_write, self.connected)

监控socket的状态,如果变为可读的,就外读数据

selector.register(key.fd, event_read, self.read_response)

测试一下速度:

1 ➜  python3 4.py 10
2 spend time : 0.03910994529724121
3 ➜  python3 4.py 20
4 spend time : 0.04195284843444824

我们看到速度已经有个一个质的飞跃了,但是回调用一些严重的问题,会破坏代码的本来的逻辑结构,造成代码可读性很差。

比如我们有函数 funca,funcb,funcc三个函数,如果funcc处理的结果依赖于funcb的处理结果,funcb的处理结果依赖于funca的处理结果,而funca又是回调的方式调用的,所以就不知道funca什么时候返回,所以只能将后续的处理都作为callback的方式传入funca中,让funca执行完了可以执行funcb,funcb执行完了可以执行funcc,看起来像下面这样:

funca(funcb(funcc()))

这就形成了一个链式的回调,跟最初的代码逻辑完全相反,本来的逻辑应该是这样的。

funcc(funcb(funca()))

因为这样的链式回调的出现,导致了理解代码逻辑困难,并且错误处理困难。

有没有方法避免这种地狱式的链式回调的呢?

5 .改进4-利用生成器

可以利用python的生成器,把发请求的函数写成一个生成器,然后只监控io的状态,当io状态发生改变之后,就给生成器传送值,驱动生成器进行下一步操作,这样就可以避免回调了,具体实现如下:

 1 import select
 2 import socket
 3 import time
 4 import sys
 5  
 6 num = int(sys.argv[1])
 7  
 8 def coroutine():
 9     sock = socket.socket()
10     sock.setblocking(0)
11     address = yield sock
12     try:
13         sock.connect(address)
14     except blockingioerror:
15         pass
16     data = yield
17     size = yield sock.send(data)
18     yield sock.recv(size)
19  
20 def main():
21     inputs = []
22     outputs = []
23     coros = []
24     coro_dict = dict()
25     for i in range(num):
26         coros.append(coroutine())
27         sock = coros[-1].send(none)   # 发送一个none值来启动生成器
28         outputs.append(sock.fileno()) # 
29         # print(outputs)
30         coro_dict[sock.fileno()] = coros[-1]
31         coros[-1].send(('www.baidu.com', 80))
32     while true:
33         r_list,w_list,e_list = select.select(inputs,outputs, ())
34         for i in w_list:
35             # print(type(i))
36             coro_dict[i].send(b'get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n')
37             outputs.remove(i)
38             inputs.append(i)
39         for i in r_list:
40             coro_dict[i].send(1024)
41             inputs.remove(i)
42         if len(inputs) == len(outputs) == 0:
43             break  
44     # time.sleep(2)
45     # coro.send(b'get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n')
46     # select.select(wait_list, (), ())
47     # print(coro.send(1024))
48  
49 start  = time.time()
50 main()
51 print("spend time : %s" %(time.time()-start))

可以看到把发起请求的函数写成了一个生成器:

 1 def coroutine():
 2     sock = socket.socket()
 3     sock.setblocking(0)
 4     address = yield sock
 5     try:
 6         sock.connect(address)
 7     except blockingioerror:
 8         pass
 9     data = yield
10     size = yield sock.send(data)
11     yield sock.recv(size)

然后监控io状态,当io状态发生改变之后,驱动生成器继续运行。

 1 while true:
 2         r_list,w_list,e_list = select.select(inputs,outputs, ())
 3         for i in w_list:
 4             # print(type(i))
 5             coro_dict[i].send(b'get / http/1.1\r\nhost: www.baidu.com\r\nconnection: close\r\n\r\n')
 6             outputs.remove(i)
 7             inputs.append(i)
 8         for i in r_list:
 9             coro_dict[i].send(1024)
10             inputs.remove(i)

看一下程序执行时间:

1 ➜  python3 5.py 10
2 spend time : 0.058114051818847656
3 ➜  python3 5.py 20
4 spend time : 0.0949699878692627

效果貌似非常的棒啊,执行的太快了,但是当我执行300次请求的时候,我就发现问题了,返回的非常慢,。估计原因可能是select是顺序遍历每一个io描述符的去做状态检查,当io描述符过多的时候,会导致遍历的速度比较慢,所以造成时间花费很大。