欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

终结python协程----从yield到actor模型的实现

程序员文章站 2022-03-12 08:05:19
把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程 我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。 ......

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程

我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

终结python协程----从yield到actor模型的实现

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

终结python协程----从yield到actor模型的实现

所以,关于协程可以总结以下两点:

(1)线程的调度是由操作系统负责,协程调度是程序自行负责。

(2)与线程相比,协程减少了无畏的操作系统切换。

实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

python中的yield 关键字用来实现生成器,但是生成器在一定的程度上与协程其实也是差不多。我们来看个例子:

def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')

    
if __name__ == "__main__":
    sayHello(5)  # 测试1
    # next(sayHello(5))  # 测试2
    
    # 测试3
    # for i in sayHello(5):
    #     pass

挨个测试,你会发现第一个测试是不能通过的,什么都不会输出,这就是我们的生成器特性了,一旦函数内部有yield关键字,此函数就是生成器,只有调用next 或是 for之类的能够迭代的才能够使得生成器执行。那么这与我们的协程有什么关系呢?请看代码:

from collections import deque
 
def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print('say hello')
 
def sayHi(n):
    x = 0
    while x < n:
        print('hi~', x)
        yield
        x += 1
    print("say hi")
 
# 使用yield语句,实现简单任务调度器
class TaskScheduler(object):
    def __init__(self):
        self._task_queue = deque()
 
    def new_task(self, task):
        '''
        向调度队列添加新的任务
        '''
        self._task_queue.append(task)
 
    def run(self):
        '''
        不断运行,直到队列中没有任务
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # 生成器结束
                pass


if __name__ == "__main__":
    sched = TaskScheduler()
    sched.new_task(sayHello(10))
    sched.new_task(sayHi(15))
    sched.run()

代码运行下,你就发现了,这就是我们对协程的定义了。接下来我们说下actor模型。actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

from collections import deque
 
class ActorScheduler:
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()
 
    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor
 
    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))
 
    def run(self):
        while self._msg_queue:
            # print("队列:", self._msg_queue)
            actor, msg = self._msg_queue.popleft()
            # print("actor", actor)
            # print("msg", msg)
            try:
                 actor.send(msg)
            except StopIteration:
                 pass
 
 
if __name__ == '__main__':
    def say_hello():
        while True:
            msg = yield
            print("say hello", msg)
 
    def say_hi():
        while True:
            msg = yield
            print("say hi", msg)
 
    def counter(sched):
        while True:
            n = yield
            print("counter:", n)
            if n == 0:
                break
            sched.send('say_hello', n)
            sched.send('say_hi', n)
            sched.send('counter', n-1)
 
    sched = ActorScheduler()
    # 创建初始化 actors
    sched.new_actor('say_hello', say_hello())
    sched.new_actor('say_hi', say_hi())
    sched.new_actor('counter', counter(sched))
 
    sched.send('counter', 10)
    sched.run()

(1) ActorScheduler 负责事件循环
(2) counter() 负责控制终止
(3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

而这就是廖雪峰的python官网教程里面的协程代码的最好解释,这也是之前一直在思考的问题,请看代码:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

我之前一直纳闷send()函数是如何激活生成器的,原来是实现了actor模型的协程!

相关链接:再议Python协程——从yield到asyncio