欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用Beanstalk搭建队列服务

程序员文章站 2022-05-18 14:47:21
...

使用Beanstalk搭建队列服务

Beanstalkd介绍

一个高性能、轻量级的分布式内存队列系统。高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。

组成部分

使用Beanstalk搭建队列服务

组件 说明
管道(tube) 一个有名称的任务队列,用来存储统一类型的job,是producer和consumer的操作对象
任务(job) 一个需要异步处理的任务,需要放在tube中
生产者(producer) job的生产者,通过put命令来将一个job放到一个tube中
消费者(consumer) job的消费者,通过reserve、release、bury、delete命令来获取job或改变job的状态

特性

  • 优先级: 可以设置任务的优先级
  • 延迟: 设置任务多少秒后才允许被消费者使用
  • 持久化: 定时刷新数据到文件,服务器挂掉后数据依旧存在
  • 超时控制: 消费者必须在指定时间内完成任务,否则就会重新放入管道任务预留:消费者先暂时跳过任务不处理
  • 分布式容错: 分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。

任务状态

状态 说明
ready 已经准备好的任务,可以给消费者获取
delayed 延迟执行的任务,设置时候设置了延迟时间
reserved 已被消费者获取,正在执行的任务,Beanstalkd服务负责检查任务是否 在TTR(time-to-run)内完成
buried 保留的任务,任务不会被执行,也不会消失
delete 消息被彻底删除,Beanstalkd不再维护这些消息

适用场景

  • 用作延时队列: 比如可以用于如果用户30分钟内不操作,任务关闭。
  • 用作循环队列: 用release命令可以循环执行任务,比如可以做负载均衡任务分发。
  • 用作兜底机制: 比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。
  • 用作定时任务: 比如可以用于专门的后台任务。
  • 用作异步操作: 这是所有消息队列都最常用的,先将任务仍进去,顺序执行。

服务安装

  1. 下载beanstalkd-1.11
wget https://codeload.github.com/beanstalkd/beanstalkd/tar.gz/v1.11
  1. 安装
tar xzvf beanstalkd-1.11.tar.gz
cd  beanstalkd-1.11
make & make install
beanstalkd -v
  1. 启动服务
beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -F

队列应用(PHP)

composer安装 Pheanstalk 类库

//PHP版本要求 7.1+
composer require pda/pheanstalk:~4.0

执行composer后,在项目composer.json配置文件中将增加pda/pheanstalk依赖包

"require": {
       "pda/pheanstalk": "~4.0"
}

Producer添加任务

//创建实例
$client = Pheanstalk::create($host, $port, $timeout);

//设置使用的tube,添加任务数据
//$data 任务数据
//$priority 任务优先级.小优先级数值的job将会排在大优先级 数值的job前面执行。
//最高优先级是0,最低优先级是4,294,967,295
//$delay 任务延迟执行秒数
//$ttr 允许一个消费者执行该job的秒数
$client->useTube($tube)->put($data, $priority, $delay, $ttr);

Consumer消费任务

ini_set('default_socket_timeout', 24*60*60);

$client = Pheanstalk::create($host, $port, $timeout);
$client->watchOnly($tube);
while (true) {    
       
    //阻塞获取任务    
    $job = $client->reserve();   
    if (is_null($job)) {        
        continue;   
    }    
    //设置重新计算ttr
    $client->touch($job);    
    //获取任务数据
    $data = $job->getData();
    
    //开始执行任务
    
    //任务执行逻辑
    $res = true
    
    //结束任务执行 
    
    //删除任务    
    $client->delete($job); 
    
    if ($res === true) { 
        //任务执行成功,删除任务
        $client->delete($job);    
    } else {        
        //否则将任务重新放回队列
        $client->release($job, 1024, 10);   
    }
 }

default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误。

客户端操作类

以下基于pda/pheanstalk依赖包实现的Beanstalk操作类,供参考。。

<?php

namespace App\Libs;
use Pheanstalk\Pheanstalk;

/**
 * Beanstalk工具类
 * @since 2020-02-26
 */
class Beanstalk {
    
	/**
	 * Beanstalk配置信息
	 * @var array
	 */
	protected $configs = [];
	
	/**
	 * client实例
	 * @var array
	 */
	protected $clients = [];
	
	/**
	 * 当前连接的服务端的配置名称
	 * @var string
	 */
	protected $connection = 'default';
	
	/**
	 * 连接超时时间
	 * @var int
	 */
	protected $clientTimeOut = 3000;
	
	/**
	 * 初始化配置信息
	 * @param array $configs
	 * @return void
	 */
	public function __construct(array $configs = []) {
	    $this->configs = $configs;
	}

    /**
     * 添加任务
     * @param string $tube 队列管道
     * @param array $parameters
     * @param int $priority 优先级
     * @param int $delay 延迟执行时间
     * @param int $ttr 任务超时时间
     * @return bool|mixed
     */
	public function addTask($tube, array $parameters,
                            $priority = Pheanstalk::DEFAULT_PRIORITY,
                            $delay = Pheanstalk::DEFAULT_DELAY,
                            $ttr = Pheanstalk::DEFAULT_TTR) {

        $client = $this->createClient();
        $stream = serialize($parameters);
        $client->useTube($tube)->put($stream, $priority, $delay, $ttr);
	}

    /**
     * 创建客户端连接
     * @return mixed
     */
	public function createClient() {
	    
	    if (! isset($this->clients[$this->connection])) {
            $client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);
	        $this->clients[$this->connection] = $client;
	    }
	    
	    return $this->clients[$this->connection];
	}

    /**
     * 创建后台工作进程
     * @param $tube
     * @param array $service
     * @return mixed
     * @author huangweizhang
     * @throws \Exception
     */
	public function createWorker($tube, array $service) {

        ini_set('default_socket_timeout', 24*60*60);
        $client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);
        $client->watchOnly($tube);
        while (true) {

            if (count($service) != 2) {
                throw new \Exception('parameter service error.');
            }

            list($classname, $method) = $service;

            if (! class_exists($classname)) {
                throw new \Exception('worker service class not exists.');
            }
            if (! method_exists($classname, $method)) {
                throw new \Exception('worker service method not exists.');
            }

            //获取任务
            $job = $client->reserve();
            if (is_null($job)) {
                continue;
            }
            $client->touch($job);

            //获取任务参数
            $stream = $job->getData();
            $parameters = unserialize($stream);

            try {
                //执行任务
                $class = new $classname();
                call_user_func_array(array($class, $method), array($parameters));
                unset($class);

                //删除任务
                $client->delete($job);
            } catch (\Exception $e) {
                //任务执行失败操作
            }
        }
	}
	
	/**
	 * 设置连接
	 * @param string $connection
	 */
	public function setConnection($connection) {
	    
	    $this->connection = $connection;
	}
	
	/**
	 * 设置客户端连接超时时间
	 * @param int $clientTimeOut
	 */
	public function setClientTimeOut($clientTimeOut) {
	    
	    $this->clientTimeOut = $clientTimeOut;
	}
}