终结python协程----从yield到actor模型的实现
把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程
我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?
如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:
把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:
所以,关于协程可以总结以下两点:
(1)线程的调度是由操作系统负责,协程调度是程序自行负责。
(2)与线程相比,协程减少了无畏的操作系统切换。
实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。
python中的yield 关键字用来实现生成器,但是生成器在一定的程度上与协程其实也是差不多。我们来看个例子:
def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') if __name__ == "__main__": sayHello(5) # 测试1 # next(sayHello(5)) # 测试2 # 测试3 # for i in sayHello(5): # pass
挨个测试,你会发现第一个测试是不能通过的,什么都不会输出,这就是我们的生成器特性了,一旦函数内部有yield关键字,此函数就是生成器,只有调用next 或是 for之类的能够迭代的才能够使得生成器执行。那么这与我们的协程有什么关系呢?请看代码:
from collections import deque def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') def sayHi(n): x = 0 while x < n: print('hi~', x) yield x += 1 print("say hi") # 使用yield语句,实现简单任务调度器 class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): ''' 向调度队列添加新的任务 ''' self._task_queue.append(task) def run(self): ''' 不断运行,直到队列中没有任务 ''' while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: # 生成器结束 pass if __name__ == "__main__": sched = TaskScheduler() sched.new_task(sayHello(10)) sched.new_task(sayHi(15)) sched.run()
代码运行下,你就发现了,这就是我们对协程的定义了。接下来我们说下actor模型。actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:
from collections import deque class ActorScheduler: def __init__(self): self._actors = {} self._msg_queue = deque() def new_actor(self, name, actor): self._msg_queue.append((actor, None)) self._actors[name] = actor def send(self, name, msg): actor = self._actors.get(name) if actor: self._msg_queue.append((actor, msg)) def run(self): while self._msg_queue: # print("队列:", self._msg_queue) actor, msg = self._msg_queue.popleft() # print("actor", actor) # print("msg", msg) try: actor.send(msg) except StopIteration: pass if __name__ == '__main__': def say_hello(): while True: msg = yield print("say hello", msg) def say_hi(): while True: msg = yield print("say hi", msg) def counter(sched): while True: n = yield print("counter:", n) if n == 0: break sched.send('say_hello', n) sched.send('say_hi', n) sched.send('counter', n-1) sched = ActorScheduler() # 创建初始化 actors sched.new_actor('say_hello', say_hello()) sched.new_actor('say_hi', say_hi()) sched.new_actor('counter', counter(sched)) sched.send('counter', 10) sched.run()
(1) ActorScheduler 负责事件循环
(2) counter() 负责控制终止
(3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。
所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。
而这就是廖雪峰的python官网教程里面的协程代码的最好解释,这也是之前一直在思考的问题,请看代码:
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
我之前一直纳闷send()函数是如何激活生成器的,原来是实现了actor模型的协程!