欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

TP6.0消息队列处理

程序员文章站 2022-05-18 14:47:27
...

1.简介

thinkphp-queue是thinkphp的一个第三方扩展, 内置了 Redis,Database,Topthink ,Sync这四种驱动,推荐使用redis

2. 下载 和安装

composer require topthink/think-queue

下载好了之后然后去配置文件中配置以下数据

TP6.0消息队列处理

这些好了呢就开始贴代码了,写的越多不如在实践中探索废话不多说代码如下:下面是我的队列处理的栗子,

<?php
namespace app\task\controller;

use think\facade\Queue;
use think\queue\Job;

class Task{

	//接收数据
	public function fire(Job $job, $data)
    {
    	
        $isJobDone = $this->actionJob($data);
        // 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
        if ($isJobDone) {
           $job->delete();
        } else {
            //通过这个方法可以检查这个任务已经重试了几次了
            $attempts = $job->attempts();
            if ($attempts == 0 || $attempts == 1) {
                // 重新发布这个任务
                $job->release(2); //$delay为延迟时间,延迟2S后继续执行
            } elseif ($attempts == 2) {
                $job->release(5); // 延迟5S后继续执行
            }
        }
    }
    /**
     * @Desc: 加入的队列任务
     */
    private function actionJob($data=[])
    {
		$class=base64_decode($data['class']);
		$res=$class::{$data['action']}($data['data']);
		if($res['code']==0){
			return true;
		}else{
			return false;
		}
    }
	/**
	* @Desc: 链接处理函数
	*/
	static public function UrlHandle($data=[])
	{
	    // 当前任务归属的队列名称,如果为新队列,会自动创建
	    $queueName = 'TASK';
	    // 将该任务推送到消息队列,等待对应的消费者去执行
	    $isPushed = Queue::push(Task::class, $data, $queueName);
	    // database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false
	    if ($isPushed !== false) {
	    	
	       return ['code'=>0,'msg'=>'加入队列成功'];
	    } else {
	    	
	       return ['code'=>1,'msg'=>'队列错误'];
	    }
	}
}

使用的栗子:如下

<?php
namespace app\task\controller;
use think\facade\Db;
class Tesk{
	//发送通知信息
	public function test(){
		//php think queue:listen --queue TASK

		$data=[
			'class'=>base64_encode('\app\task\controller\Tesk'),
			'action'=>'receive',
			'data'=>[
				'username'=>'username'.rand(10,20),
				'phone'=>'158160162'.rand(10,20)
			],
		];
		$url=Task::UrlHandle($data);
		
	}
	//队列接收处理
	static public function receive($data=[]){
		if($data){
			Db::name('redis_data')->insert([
				'content'=>json_encode($data),
			]);
		}
		return ['code'=>0];
	}

}