欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Swoole和Redis实现的并发队列处理系统

程序员文章站 2023-11-08 17:59:52
由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。 大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。 在我们的系统中,主系统作为生产者,任务系统作为消费者。 具体的工作流程如下: 1、主系统将需 ......

由于php不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

 

具体的工作流程如下:

1、主系统将需要需要处理的任务名称+任务参数push到队列中。

2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

 

具体代码如下:

 1 /**
 2  * 启动守护进程
 3  */
 4 public function runaction() {
 5     tools::log_message('error', 'daemon/run' . ' | action: restart', 'daemon-');
 6     while (true) {
 7         $this->fork_process();
 8     }
 9     exit;
10 }
11 
12 /**
13  * 创建子进程
14  */
15 private function fork_process() {
16     $ppid = getmypid();
17     $pid = pcntl_fork();
18     if ($pid == 0) {//子进程
19         $pid = posix_getpid();
20         //echo "* process {$pid} was created \n\n";
21         $this->mq_process();
22         exit;
23     } else {//主进程
24         $pid = pcntl_wait($status, wuntraced); //取得子进程结束状态
25         if (pcntl_wifexited($status)) {
26             //echo "\n\n* sub process: {$pid} exited with {$status}";
27             //tools::log_message('info', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
28         } else {
29             tools::log_message('error', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
30         }
31     }
32 }
33 
34 /**
35  * 业务任务队列处理
36  */
37 private function mq_process() {
38     $data_pop = $this->masterredis->rpop($this->redis_list_key);
39     $data = json_decode($data_pop, 1);
40     if (!$data) {
41         return false;
42     }
43     $worker = '_task_' . $data['worker'];
44     $class_name = isset($data['class']) ? $data['class'] : 'taskpromodel';
45     $params = $data['params'];
46     $class = new $class_name();
47     $class->$worker($params);
48     return true;
49 }

 

 

这是一个简单的任务处理系统。

通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。

但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。

这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!

第一个问题还好,但第二个问题就很严重。

当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

 

新的设计

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在php7之前不支持多线程,所以我们采用多进程。

从网上找了不少资料,大多所谓的多进程都是n个进程同时在后台运行。

显然这是不合适的。

我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

 

遇到的问题

1、如何控制最大进程数

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行google,这里直接看代码。

1 // install signal handler for dead kids
2 pcntl_signal(sigchld, array($this, "sig_handler"));

 

 

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

 

 

declare是一个控制结构语句,具体的用法也请去google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

 

2、如何解决进程残留

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行n个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

 

新系统的评估

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用swoole扩展中的process。

具体代码如下:

 1 declare(ticks = 1);
 2 class jobdaemoncontroller extends yaf_controller_abstract{
 3 
 4     use trait_redis;
 5 
 6     private $maxprocesses = 800;
 7     private $child;
 8     private $masterredis;
 9     private $redis_task_wing = 'task:wing'; //待处理队列
10 
11     public function init(){
12         // install signal handler for dead kids
13         pcntl_signal(sigchld, array($this, "sig_handler"));
14         set_time_limit(0);
15         ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection
16     }
17 
18     private function redis_client(){
19         $rds = new redis();
20         $rds->connect('redis.master.host',6379);
21         return $rds;
22     }
23 
24     public function process(swoole_process $worker){// 第一个处理
25         $globals['worker'] = $worker;
26         swoole_event_add($worker->pipe, function($pipe) {
27             $worker = $globals['worker'];
28             $recv = $worker->read();            //send data to master
29 
30             sleep(rand(1, 3));
31             echo "from master: $recv\n";
32             $worker->exit(0);
33         });
34         exit;
35     }
36 
37     public function testaction(){
38         for ($i = 0; $i < 10000; $i++){
39             $data = [
40                 'abc' => $i,
41                 'timestamp' => time().rand(100,999)
42             ];
43             $this->masterredis->lpush($this->redis_task_wing, json_encode($data));
44         }
45         exit;
46     }
47 
48     public function runaction(){
49         while (1){
50 //            echo "\t now we de have $this->child child processes\n";
51             if ($this->child < $this->maxprocesses){
52                 $rds = $this->redis_client();
53                 $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待
54                 if (!$data_pop){
55                     continue;
56                 }
57                 echo "\t starting new child | now we de have $this->child child processes\n";
58                 $this->child++;
59                 $process = new swoole_process([$this, 'process']);
60                 $process->write(json_encode($data_pop));
61                 $pid = $process->start();
62             }
63         }
64     }
65 
66     private function sig_handler($signo) {
67 //        echo "recive: $signo \r\n";
68         switch ($signo) {
69             case sigchld:
70                 while($ret = swoole_process::wait(false)) {
71 //                    echo "pid={$ret['pid']}\n";
72                     $this->child--;
73                 }
74         }
75     }
76 }

 

最终,经过测试,单核1g的服务器执行1到3秒的任务可以做到800的并发。