欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

swoole与php协程实现异步非阻塞IO开发

程序员文章站 2022-05-09 15:07:02
“协程可以在遇到阻塞的时候中断主动让渡资源,调度程序选择其他的协程运行。从而实现非阻塞IO” 然而php是不支持原生协程的,遇到阻塞时如不交由异步进程来执行是没有任何意义的,代码还是同步执行的,如下所示: function foo() { $db=new Db(); $result=(yield $ ......
“协程可以在遇到阻塞的时候中断主动让渡资源,调度程序选择其他的协程运行。从而实现非阻塞io”
然而php是不支持原生协程的,遇到阻塞时如不交由异步进程来执行是没有任何意义的,代码还是同步执行的,如下所示:
function foo()
{
    $db=new db();
    $result=(yield $db->query());
    yield $result;
}
上面的数据库查询操作是阻塞的,当调度器调度该协程到这一步时发现执行了阻塞操作,此时调度器该怎么办?选择其余协程执行?那该协程的阻塞操作又该何时执行,交由谁执行呢?所以说在php协程中抛开异步调用谈非阻塞io属于耍流氓。
而swoole的异步task提供了一个实现异步的解决方案,关于swoole_task可以参考官方文档
核心功能实现
 
将一次请求形成一个协程
 
首先创建一个swoole_server并设置回调
class httpserver implements server
{
    private $swoolehttpserver;
 
    public function __construct(\swoole_http_server $swoolehttpserver)
    {
        $this->swoolehttpserver = $swoolehttpserver;
    }
 
    public function start()
    {
        $this->swoolehttpserver->on('start', [$this, 'onstart']);
        $this->swoolehttpserver->on('shutdown', [$this, 'onshutdown']);
 
        $this->swoolehttpserver->on('workerstart', [$this, 'onworkerstart']);
        $this->swoolehttpserver->on('workerstop', [$this, 'onworkerstop']);
        $this->swoolehttpserver->on('workererror', [$this, 'onworkererror']);
        $this->swoolehttpserver->on('task', [$this, 'ontask']);
        $this->swoolehttpserver->on('finish', [$this, 'onfinish']);
 
 
        $this->swoolehttpserver->on('request', [$this, 'onrequest']);
 
        $this->swoolehttpserver->start();
    }
onrequest方法:
 public function onrequest(\swoole_http_request $request, \swoole_http_response $response)
    {
        $requesthandler = new requesthandler($request, $response);
        $requesthandler->handle();
    }
在reqeusthandler中执行handle方法,来解析请求的路由,并创建控制器,调用相应的方法,相
 public function handle()
    {
        $this->context = new context($this->request, $this->response, $this->getfd());
        $this->router = new router($this->request);
 
        try {
            if (false === $this->router->parse()) {
                $this->response->output('');
                return;
            }
            $coroutine = $this->dorun();
            $task = new task($coroutine, $this->context);
            $task->run();
        } catch (\exception $e) {
            pcsexceptionhandler::handle($e, $this->response);
        }
    }
    
 private function dorun()
    {
        $ret = (yield $this->dispatch());
        yield $this->response->send($ret);
    }
上面代码中的ret是action()的调用结果,yield $this->response->send($ret);是向对客户端请求的应答。
$coroutine是这一次请求形成的一个协程(genetator对象),包含了整个请求的流程,接下来就要对这个协程进行调度来获取真正的执行结果。
 
协程调度
 
namespace pcs\coroutine;
 
use pcs\network\context\context;
 
class task
{
    private $coroutine;
    private $context;
    private $status;
    private $scheduler;
    private $sendvalue;
 
    public function __construct(\generator $coroutine, context $context)
    {
        $this->coroutine = $coroutine;
        $this->context = $context;
        $this->scheduler = new scheduler($this);
 
    }
 
    public function run()
    {
        while (true) {
            try {
                $this->status = $this->scheduler->schedule();
                switch ($this->status) {
                    case taskstatus::task_wait:
                        echo "task status: task_wait\n";
                        return null;
 
                    case taskstatus::task_done:
                        echo "task status: task_done\n";
                        return null;
 
                    case taskstatus::task_continue;
                        echo "task status: task_continue\n";
                        break;
                }
 
            } catch (\exception $e) {
                $this->scheduler->throwexception($e);
            }
        }
    }
    public function setcoroutine($coroutine)
    {
        $this->coroutine = $coroutine;
    }
 
    public function getcoroutine()
    {
        return $this->coroutine;
    }
 
    public function valid()
    {
        if ($this->coroutine->valid()) {
            return true;
        } else {
            return false;
        }
    }
 
    public function send($value)
    {
        $this->sendvalue = $value;
        $ret = $this->coroutine->send($value);
        return $ret;
    }
 
    public function getsendval()
    {
        return $this->sendvalue;
    }
}
task依赖于generator对象$coroutine,在task类中定义了一些get/set方法,以及一些generator的方法,task::run()方法用来执行对协程的调度,调度行为由schedule来执行,每次调度都会返回当前这次调度的状态。多个协程共用一个调度器,而这里run方法会为每个协程创建一个调度器,原因是每个协程都是一个客户端的请求,使用一个单独的调度器能减少相互间的影响,而且多个协程之间的调度顺序是swoole来处理的,这里的调度器不用关心。下面给出调度的代码:
namespace pcs\coroutine;
 
class scheduler
{
    private $task;
    private $stack;
    const schedule_continue = 10;
 
    public function __construct(task $task)
    {
        $this->task = $task;
        $this->stack = new \splstack();
    }
    
    public function schedule()
    {
        $coroutine = $this->task->getcoroutine();
        $value = $coroutine->current();
 
        $status = $this->handlesystemcall($value);
        if ($status !== self::schedule_continue) return $status;
 
        $status = $this->handlestackpush($value);
        if ($status !== self::schedule_continue) return $status;
 
        $status = $this->handleasyncjob($value);
        if ($status !== self::schedule_continue) return $status;
 
        $status = $this->handelyieldvalue($value);
        if ($status !== self::schedule_continue) return $status;
 
        $status = $this->handelstackpop();
        if ($status !== self::schedule_continue) return $status;
 
 
        return taskstatus::task_done;
    }
 
    public function isstackempty()
    {
        return $this->stack->isempty();
    }
 
    private function handlesystemcall($value)
    {
        if (!$value instanceof systemcall) {
            return self::schedule_continue;
        }
    }
 
    private function handlestackpush($value)
    {
        if (!$value instanceof \generator) {
            return self::schedule_continue;
        }
 
        $coroutine = $this->task->getcoroutine();
        $this->stack->push($coroutine);
        $this->task->setcoroutine($value);
 
        return taskstatus::task_continue;
    }
 
    private function handleasyncjob($value)
    {
        if (!is_subclass_of($value, async::class)) {
            return self::schedule_continue;
        }
 
        $value->execute([$this, 'asynccallback']);
 
        return taskstatus::task_wait;
    }
 
    public function asynccallback($response, $exception = null)
    {
        if ($exception !== null
            && $exception instanceof \exception
        ) {
            $this->throwexception($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run();
        }
    }
 
    private function handelyieldvalue($value)
    {
        if (!$this->task->valid()) {
            return self::schedule_continue;
        }
 
        $ret = $this->task->send($value);
        return taskstatus::task_continue;
    }
 
 
    private function handelstackpop()
    {
        if ($this->isstackempty()) {
            return self::schedule_continue;
        }
 
        $coroutine = $this->stack->pop();
        $this->task->setcoroutine($coroutine);
 
        $value = $this->task->getsendval();
        $this->task->send($value);
 
        return taskstatus::task_continue;
    }
 
    public function throwexception($e, $isfirstcall = false)
    {
        if ($this->isstackempty()) {
            $this->task->getcoroutine()->throw($e);
            return;
        }
 
        try {
            if ($isfirstcall) {
                $coroutine = $this->task->getcoroutine();
            } else {
                $coroutine = $this->stack->pop();
            }
 
            $this->task->setcoroutine($coroutine);
            $coroutine->throw($e);
 
            $this->task->run();
        } catch (\exception $e) {
            $this->throwexception($e);
        }
    }
}
scheduler中的schedule方法会获取当前task的协程,并通过current()方法获取当前中断点的返回值,接着依次调用5个方法来对返回值进行处理。
1:handlesystemcall
如果返回的值是systemcall类型的对象,则执行系统调用,如killtask之类的操作,systemcall是第一优先级。
2:handlestackpush
在a函数中调用b函数,则b函数称为a函数的子例程(子函数),然而在协程中却不能像普通函数那样调用。
function funca()
{
    return funcb();
}
 
function gena()
{
    yield genb();
}
在funca中funcb();会返回funcb的执行结果,但是在gena中,yield genb();会返回一个generator对象,而不是genb的最终执行结果。想得到genb的执行结果需要对genb进行调度,而genb中又可能有genc()gend()的协程嵌套,所以为了让协程像函数一眼正常调用,这里使用协程栈来实现。
swoole与php协程实现异步非阻塞IO开发

如上图,当调度器获取到gena(父协程)的返回值is instance of generator时,调度器会把父协程push到stack中,然后把子协程分配给task,继续调度子协程。如此反复直到最后一个子协程返回,然后开始pop,将stack中的协程依次取出
 
3:handleasyncjob
handleasyncjob是整个协程调度的核心
private function handleasyncjob($value)
    {
        if (!is_subclass_of($value, async::class)) {
            return self::schedule_continue;
        }
 
        $value->execute([$this, 'asynccallback']);
 
        return taskstatus::task_wait;
    }
 
    public function asynccallback($response, $exception = null)
    {
        if ($exception !== null
            && $exception instanceof \exception
        ) {
            $this->throwexception($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run();
        }
    }
当协程调度的返回值是继承了async的子类或者是实现了asycn接口的实例的时候,会执行async的execute方法。这里用mysqli数据库查询类举例。
    public function execute(callable $callback)
    {
        $this->callback = $callback;
        $serv = serverholder::getserver();
        $serv->task($this->sql, -1, [$this, 'queryready']);
 
    }
 
    public function queryready(\swoole_http_server $serv, $task_id, $data)
    {
        $queryresult = unserialize($data);
        $exception = null;
        if ($queryresult->errno != 0) {
 
            $exception = new \exception($queryresult->error);
        }
        call_user_func_array($this->callback, [$queryresult, $exception]);
    }
 
execute方法接收一个函数作为该异步操作完成之后的回调函数,在mysqli类中的execute方法中,启动了一个异步swoole_task,将sql操作交给swoole_task异步执行,在执行结束后会执行queryready方法,该方法在解析异步返回数据之后执行$this->callback()也就是之前在调度器中传入的 asynccallback方法,该方法在检测异常之后会执行send()方法将异步执行的结果发送到中断处,继续执行。
handleasyncjob不会等待异步操作的返回结果,而是直接返回task_wait信号,回到上面的task->run()方法可以看到task_wait信号会导致run()方法返回null,释放当前worker,调度流程图如下图所示,
swoole与php协程实现异步非阻塞IO开发

4:handleyieldvalue
private function handelyieldvalue($value)
    {
        if (!$this->task->valid()) {
            return self::schedule_continue;
        }
 
        $ret = $this->task->send($value);
        return taskstatus::task_continue;
    }
 
如果某次yield的返回值既不是异步调用也不是generator,那么判断当前的generator是否是valid(是否执行完)如果执行完毕,继续调度,执行下面的handlestackpush方法,否则的话返回task_continue继续调度,也就是说在一个generator中多次yield,最后只会取最后一次yield的返回值。
5:handlestackpush
当上一步中判断!$this->task->valid()也就是当前生成器执行完毕的时候,会执行本方法来控制之前的协程stack进行pop操作,首先检查stac是否是非空,非空的话pop出一个父协程,并将当前协程的返回值send()到父协程中断出继续执行。
协程优势在哪里
当一次请求遇到io的时候,同步操作会导致当前请求阻塞在io处等待io返回,体现在swoole上就是一个请求一直占用一个worker。
swoole与php协程实现异步非阻塞IO开发

但是当使用了协程调度之后,用户可以在阻塞的地方通过yield手动中断,交由swoole_task去异步操作,同时释放worker占用来处理其他请求。
当异步处理执行结束后再继续调度。
swoole与php协程实现异步非阻塞IO开发

注意 php的协程只负责中断,异步操作是swoole_task做的