使用Beanstalk搭建队列服务
程序员文章站
2022-05-18 14:47:21
...
使用Beanstalk搭建队列服务
Beanstalkd介绍
一个高性能、轻量级的分布式内存队列系统。高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。
组成部分
组件 | 说明 |
---|---|
管道(tube) | 一个有名称的任务队列,用来存储统一类型的job,是producer和consumer的操作对象 |
任务(job) | 一个需要异步处理的任务,需要放在tube中 |
生产者(producer) | job的生产者,通过put命令来将一个job放到一个tube中 |
消费者(consumer) | job的消费者,通过reserve、release、bury、delete命令来获取job或改变job的状态 |
特性
- 优先级: 可以设置任务的优先级
- 延迟: 设置任务多少秒后才允许被消费者使用
- 持久化: 定时刷新数据到文件,服务器挂掉后数据依旧存在
- 超时控制: 消费者必须在指定时间内完成任务,否则就会重新放入管道任务预留:消费者先暂时跳过任务不处理
- 分布式容错: 分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。
任务状态
状态 | 说明 |
---|---|
ready | 已经准备好的任务,可以给消费者获取 |
delayed | 延迟执行的任务,设置时候设置了延迟时间 |
reserved | 已被消费者获取,正在执行的任务,Beanstalkd服务负责检查任务是否 在TTR(time-to-run)内完成 |
buried | 保留的任务,任务不会被执行,也不会消失 |
delete | 消息被彻底删除,Beanstalkd不再维护这些消息 |
适用场景
- 用作延时队列: 比如可以用于如果用户30分钟内不操作,任务关闭。
- 用作循环队列: 用release命令可以循环执行任务,比如可以做负载均衡任务分发。
- 用作兜底机制: 比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。
- 用作定时任务: 比如可以用于专门的后台任务。
- 用作异步操作: 这是所有消息队列都最常用的,先将任务仍进去,顺序执行。
服务安装
- 下载beanstalkd-1.11
wget https://codeload.github.com/beanstalkd/beanstalkd/tar.gz/v1.11
- 安装
tar xzvf beanstalkd-1.11.tar.gz
cd beanstalkd-1.11
make & make install
beanstalkd -v
- 启动服务
beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -F
队列应用(PHP)
composer安装 Pheanstalk 类库
//PHP版本要求 7.1+
composer require pda/pheanstalk:~4.0
执行composer后,在项目composer.json配置文件中将增加pda/pheanstalk依赖包
"require": {
"pda/pheanstalk": "~4.0"
}
Producer添加任务
//创建实例
$client = Pheanstalk::create($host, $port, $timeout);
//设置使用的tube,添加任务数据
//$data 任务数据
//$priority 任务优先级.小优先级数值的job将会排在大优先级 数值的job前面执行。
//最高优先级是0,最低优先级是4,294,967,295
//$delay 任务延迟执行秒数
//$ttr 允许一个消费者执行该job的秒数
$client->useTube($tube)->put($data, $priority, $delay, $ttr);
Consumer消费任务
ini_set('default_socket_timeout', 24*60*60);
$client = Pheanstalk::create($host, $port, $timeout);
$client->watchOnly($tube);
while (true) {
//阻塞获取任务
$job = $client->reserve();
if (is_null($job)) {
continue;
}
//设置重新计算ttr
$client->touch($job);
//获取任务数据
$data = $job->getData();
//开始执行任务
//任务执行逻辑
$res = true
//结束任务执行
//删除任务
$client->delete($job);
if ($res === true) {
//任务执行成功,删除任务
$client->delete($job);
} else {
//否则将任务重新放回队列
$client->release($job, 1024, 10);
}
}
default_socket_timeout
这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误。
客户端操作类
以下基于pda/pheanstalk依赖包实现的Beanstalk操作类,供参考。。
<?php
namespace App\Libs;
use Pheanstalk\Pheanstalk;
/**
* Beanstalk工具类
* @since 2020-02-26
*/
class Beanstalk {
/**
* Beanstalk配置信息
* @var array
*/
protected $configs = [];
/**
* client实例
* @var array
*/
protected $clients = [];
/**
* 当前连接的服务端的配置名称
* @var string
*/
protected $connection = 'default';
/**
* 连接超时时间
* @var int
*/
protected $clientTimeOut = 3000;
/**
* 初始化配置信息
* @param array $configs
* @return void
*/
public function __construct(array $configs = []) {
$this->configs = $configs;
}
/**
* 添加任务
* @param string $tube 队列管道
* @param array $parameters
* @param int $priority 优先级
* @param int $delay 延迟执行时间
* @param int $ttr 任务超时时间
* @return bool|mixed
*/
public function addTask($tube, array $parameters,
$priority = Pheanstalk::DEFAULT_PRIORITY,
$delay = Pheanstalk::DEFAULT_DELAY,
$ttr = Pheanstalk::DEFAULT_TTR) {
$client = $this->createClient();
$stream = serialize($parameters);
$client->useTube($tube)->put($stream, $priority, $delay, $ttr);
}
/**
* 创建客户端连接
* @return mixed
*/
public function createClient() {
if (! isset($this->clients[$this->connection])) {
$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);
$this->clients[$this->connection] = $client;
}
return $this->clients[$this->connection];
}
/**
* 创建后台工作进程
* @param $tube
* @param array $service
* @return mixed
* @author huangweizhang
* @throws \Exception
*/
public function createWorker($tube, array $service) {
ini_set('default_socket_timeout', 24*60*60);
$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);
$client->watchOnly($tube);
while (true) {
if (count($service) != 2) {
throw new \Exception('parameter service error.');
}
list($classname, $method) = $service;
if (! class_exists($classname)) {
throw new \Exception('worker service class not exists.');
}
if (! method_exists($classname, $method)) {
throw new \Exception('worker service method not exists.');
}
//获取任务
$job = $client->reserve();
if (is_null($job)) {
continue;
}
$client->touch($job);
//获取任务参数
$stream = $job->getData();
$parameters = unserialize($stream);
try {
//执行任务
$class = new $classname();
call_user_func_array(array($class, $method), array($parameters));
unset($class);
//删除任务
$client->delete($job);
} catch (\Exception $e) {
//任务执行失败操作
}
}
}
/**
* 设置连接
* @param string $connection
*/
public function setConnection($connection) {
$this->connection = $connection;
}
/**
* 设置客户端连接超时时间
* @param int $clientTimeOut
*/
public function setClientTimeOut($clientTimeOut) {
$this->clientTimeOut = $clientTimeOut;
}
}
上一篇: TP6.0消息队列处理
下一篇: Linux下RabbitMQ的安装及使用