欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Swoole Task 的应用

程序员文章站 2022-04-15 17:05:52
[TOC] 概述 Swoole 异步Task,主要实现调用异步任务的执行。 常用的场景:异步支付处理、异步订单处理、异步日志处理、异步发送邮件/短信等。 Swoole 的实现方式是 worker 进程处理数据请求,分配给 task 进程执行。 官方介绍: task 底层使用Unix Socket管道 ......

目录

概述

swoole 异步task,主要实现调用异步任务的执行。

常用的场景:异步支付处理、异步订单处理、异步日志处理、异步发送邮件/短信等。

swoole 的实现方式是 worker 进程处理数据请求,分配给 task 进程执行。

官方介绍:

task 底层使用unix socket管道通信,是全内存的,没有io消耗。单进程读写性能可达100万/s,不同的进程使用不同的管道通信,可以最大化利用多核。

本地版本:php 7.2.6、swoole 4.3.1。

不多说,先看效果图:

Swoole Task 的应用

代码

server.php

<?php

class server
{
    private $serv;

    public function __construct() {
        $this->serv = new swoole_server('0.0.0.0', 9501);
        $this->serv->set([
            'worker_num'      => 2, //开启2个worker进程
            'max_request'     => 4, //每个worker进程 max_request设置为4次
            'task_worker_num' => 4, //开启4个task进程
            'dispatch_mode'   => 2, //数据包分发策略 - 固定模式
        ]);

        $this->serv->on('start', [$this, 'onstart']);
        $this->serv->on('connect', [$this, 'onconnect']);
        $this->serv->on("receive", [$this, 'onreceive']);
        $this->serv->on("close", [$this, 'onclose']);
        $this->serv->on("task", [$this, 'ontask']);
        $this->serv->on("finish", [$this, 'onfinish']);

        $this->serv->start();
    }

    public function onstart($serv) {
        echo "#### onstart ####".php_eol;
        echo "swoole ".swoole_version . " 服务已启动".php_eol;
        echo "master_pid: {$serv->master_pid}".php_eol;
        echo "manager_pid: {$serv->manager_pid}".php_eol;
        echo "########".php_eol.php_eol;
    }

    public function onconnect($serv, $fd) {
        echo "#### onconnect ####".php_eol;
        echo "客户端:".$fd." 已连接".php_eol;
        echo "########".php_eol.php_eol;
    }

    public function onreceive($serv, $fd, $from_id, $data) {
        echo "#### onreceive ####".php_eol;
        echo "worker_pid: {$serv->worker_pid}".php_eol;
        echo "客户端:{$fd} 发来的email:{$data}".php_eol;
        $param = [
            'fd'    => $fd,
            'email' => $data
        ];
        $rs = $serv->task(json_encode($param));
        if ($rs === false) {
            echo "任务分配失败 task ".$rs.php_eol;
        } else {
            echo "任务分配成功 task ".$rs.php_eol;
        }
        echo "########".php_eol.php_eol;
    }

    public function ontask($serv, $task_id, $from_id, $data) {
        echo "#### ontask ####".php_eol;
        echo "#{$serv->worker_id} ontask: [pid={$serv->worker_pid}]: task_id={$task_id}".php_eol;

        //业务代码
        for($i = 1 ; $i <= 5 ; $i ++ ) {
            sleep(2);
            echo "task {$task_id} 已完成了 {$i}/5 的任务".php_eol;
        }

        $data_arr = json_decode($data, true);
        $serv->send($data_arr['fd'] , 'email:'.$data_arr['email'].',发送成功');
        $serv->finish($data);
        echo "########".php_eol.php_eol;
    }

    public function onfinish($serv,$task_id, $data) {
        echo "#### onfinish ####".php_eol;
        echo "task {$task_id} 已完成".php_eol;
        echo "########".php_eol.php_eol;
    }

    public function onclose($serv, $fd) {
        echo "client close.".php_eol;
    }
}

$server = new server();

client.php

<?php

class client
{
    private $client;

    public function __construct() {
        $this->client = new swoole_client(swoole_sock_tcp, swoole_sock_async);

        $this->client->on('connect', [$this, 'onconnect']);
        $this->client->on('receive', [$this, 'onreceive']);
        $this->client->on('close', [$this, 'onclose']);
        $this->client->on('error', [$this, 'onerror']);
    }

    public function connect() {
        if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
            echo "error: {$fp->errmsg}[{$fp->errcode}]".php_eol;
            return;
        }
    }

    public function onconnect($cli) {
        fwrite(stdout, "输入email:");
        swoole_event_add(stdin, function() {
            fwrite(stdout, "输入email:");
            $msg = trim(fgets(stdin));
            $this->send($msg);
        });
    }

    public function onreceive($cli, $data) {
        echo php_eol."received: ".$data.php_eol;
    }

    public function send($data) {
        $this->client->send($data);
    }

    public function onclose($cli) {
        echo "client close connection".php_eol;
    }

    public function onerror() {

    }
}

$client = new client();
$client->connect();

小结

一、上面的配置总共开启了几个进程?

总共8个进程(1个master进程、1个manager进程、4个task进程、2个worker进程)

重新运行的可能与上图进程号不一致:

Swoole Task 的应用

master进程:22481

manager进程:22485

task进程:22488、22489、22490、22491

worker进程:22492、22493

参考官方提供的进程图:

Swoole Task 的应用

二、为什么执行了5次后,worker进程号发生了改变?

因为我们设了置worker进程的max_request=4,一个worker进程在完成最大请求次数任务后将自动退出,进程退出会释放所有的内存和资源,这样的机制主要是解决php进程内存溢出的问题。

三、当task执行任务异常,我们kill一个task进程,会再新增一个吗?

会。

四、如何设置 task_worker_num ?

最大值不得超过 swoole_cpu_num * 1000。

查看本机 cpu 核数:

echo "swoole_cpu_num:".swoole_cpu_num().php_eol;

根据项目的任务量决定的,比如:1秒会产生200个任务,执行每个任务需要500ms。

想在1s中执行完成200个任务,需要100个task进程。

100 = 200/(1/0.5)

五、如何设置 worker_num ?

默认设置为本机的cpu核数,最大不得超过 swoole_cpu_num * 1000。

比如:1个请求耗时10ms,要提供1000qps的处理能力,那就必须配置10个进程。

10 = 0.01*1000

假设每个进程占用40m内存,10个进程就需要占用400m的内存。

扩展

  • server->taskwait
  • server->taskwaitmulti
  • server->taskco

参考文档

本文欢迎转发,转发请注明作者和出处,谢谢!