关于 Laravel Redis 多个进程同时取队列问题详解
程序员文章站
2022-04-29 12:10:59
前言
最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。
使用 supervi...
前言
最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。
使用 supervisor 监听 laravel 队列任务,其中 supervisor 的配置如下:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon autostart=true autorestart=true numprocs=8 redirect_stderr=true stdout_logfile=/var/www/xxx.cn/worker.log
注意: numprocs = 8
,代表开启 8 个进程来执行 command 中的命令。
如下:
ps c:\users\tanteng\website\laradock> docker-compose exec php-worker sh /etc/supervisor/conf.d # ps -ef | grep php 7 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 8 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 9 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 10 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 11 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 12 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 13 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 14 root 0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon 44 root 0:00 grep php
laravel 多进程读取队列内容是否会重复
在 laravel 的某个控制器方法,一次放入多个任务队列:
public function index(request $request) { $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); $this->dispatch((new sendfile3())->onqueue('sendfile')); }
在队列处理的方法打印日志,打印处理的队列的 id:
app/jobs/sendfile3.php
public function handle() { info('invoke sendfile3'); dump('invoke handle'); $rawbody = $this->job->getrawbody(); $info = json_decode($rawbody, true); info('queue id:' . $info['id']); }
laravel 使用 redis 的 list 作为队列的数据结构,并会为每个队列分配一个 id,数据结构如下:
{ "job": "illuminate\\queue\\callqueuedhandler@call", "data": { "commandname": "app\\jobs\\sendfile3", "command": "o:18:\"app\\jobs\\sendfile3\":4:{s:6:\"\u0000*\u0000job\";n;s:10:\"connection\";n;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";n;}" }, "id": "hadbcy3ipnsnoofqqdhohsa451okqs88", "attempts": 1 }
请求这个控制器路由(或者命令行方式),就可以看到 redis 中多了很多队列任务了,如图:
这个时候开启 supervisor 处理队列任务,并查看日志:
[2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:jacljzhdevntzlcriz6urqkcvlbe8y9c [2017-12-23 19:01:01] local.info: queue id:ukhv0li4p2vgpa55qu6yeojm27mo5ywj [2017-12-23 19:01:01] local.info: queue id:obmpwdtmnavebuku7aan5abt3agyt90l [2017-12-23 19:01:01] local.info: queue id:fo2qzn2ftsdqtdnkocimk7ijb4qlhrge [2017-12-23 19:01:01] local.info: queue id:uljfmoou7wk7boad4zphb3ccrmjhbtr6 [2017-12-23 19:01:01] local.info: queue id:87ulqpbobfmgr16nl5wxfvoi71zgcerm [2017-12-23 19:01:01] local.info: queue id:9uvl0muqlzbqlri99rchgw2elxwvemie [2017-12-23 19:01:01] local.info: queue id:a0vgyzuz9htmh7dghepxqesftcqu3qaf [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:2cxuxxoppkgyiv4wo8gv9cj6cwxektyl [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:9actaya8cxpjx6q3gb1sulokotp8reqz [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:bphqvbbochlv4gr2i0vylvyw9bijttyj [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:fm6tnajdxyktdqbdmydmwwjflnnikryg [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:nyabcvskbvpbah3e2itqkoljlp1ficib [2017-12-23 19:01:01] local.info: queue id:wbhssvztp43569uopxxflljcvympw7cp [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:blipnkcrsdapwvmklnxehakelhm0rdey [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:eoaoquceiwrz9uz64xm6idkgiqj9xc3w [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:lzise9eiqqqinrhalbmai4qng7qylpb2 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:wxykvcfohs1ppnwowutsenomv5l5euxe [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:xth5jiwlgnrwwzi02oyi70pihaokujud [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:9ehme5himlpnubpy0xwn8uvrozxemqws [2017-12-23 19:01:01] local.info: queue id:c1sk87cpzl47edla0zhfo7pj9miecoyx [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:2kwl51oh4lyyrrljcregucknijrdl7oe [2017-12-23 19:01:01] local.info: queue id:obrpoqrytpyiyv2delmloxu3sappwjln [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:6qgu6w3tapljsrt688yv9hrxvddlxntz [2017-12-23 19:01:01] local.info: queue id:witlerhwn7s9cqkfuf9lllnadpxjknci [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:zslw0vlfbdpl4wjtjzu3yb3v45pne807 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:qhzlxlgfgwrluienm7vbllmtjzyb2h5n [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:lux1ibyd3l2psnl9bzwhhk2knxyrpzw6 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:m2respjyo5hpafxxl0eqbwwsuq4jpmwn [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:husgaiiaoo6zfgqc5kghgpsv5rporpyo [2017-12-23 19:01:01] local.info: queue id:cehjsoy6blez4nbncpziahqlarmeyyef [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:w4bkfijkmu5saqg2xkn3zrl5byxgatmk [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:0zbuwbxlrehhxkfybkvyty4z35f154si [2017-12-23 19:01:01] local.info: queue id:mvozvydpvq4tcpjey9g7pmth3mwpkpik [2017-12-23 19:01:01] local.info: invoke sendfile3 [2017-12-23 19:01:01] local.info: queue id:tlvf74eeidecwktjzqwvw03ujtrptl9r [2017-12-23 19:01:01] local.info: queue id:me8wypfgcz0nf9xvcxz0hf2xvxqa1ffs
这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 id. 我们再看一下 laravel 如何使用 redis 处理队列的。
分析一下 laravel 队列的处理
laravel 中入队列方法
public function pushraw($payload, $queue = null, array $options = []) { $this->getconnection()->rpush($this->getqueue($queue), $payload); return arr::get(json_decode($payload, true), 'id'); }
用的是 redis 的 rpush 命令。
laravel 中取队列方法
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getqueue($queue); $this->migrateexpiredjobs($queue.':delayed', $queue); if (! is_null($this->expire)) { $this->migrateexpiredjobs($queue.':reserved', $queue); } list($job, $reserved) = $this->getconnection()->eval( luascripts::pop(), 2, $queue, $queue.':reserved', $this->gettime() + $this->expire ); if ($reserved) { return new redisjob($this->container, $this, $job, $reserved, $original); } }
这里用的是 lua 脚本取队列,如下:
public static function pop() { return <<<'lua' local job = redis.call('lpop', keys[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', keys[2], argv[1], reserved) end return {job, reserved} lua; }
那么结论是:从 laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。