Lavavel5.5源代码 - RedisQueue是怎么实现
程序员文章站
2022-06-21 16:02:58
队列的基本功能: 1、立即执行;yes 2、延迟执行;yes 3、保证至少执行一次;yes 4、必须执行且最多执行一次;no 用到的数据结构: list、Sorted sets 延迟执行的机制: 1、先把数据放入SortedSets类型的queues:queue_000:delayed中 2、在执行 ......
队列的基本功能:
1、立即执行;yes
2、延迟执行;yes
3、保证至少执行一次;yes
4、必须执行且最多执行一次;no
用到的数据结构:
list、sorted sets
延迟执行的机制:
1、先把数据放入sortedsets类型的queues:queue_000:delayed中
2、在执行pop的时候,执行lua脚本,把sortedsets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中
保证执行成功的机制:
1、把要执行的数据先放入sortedsets类型的queues:queue_000:reserved中
2、在执行pop的时候,执行lua脚本,把sortedsets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中
3、任务执行成功,从sortedsets类型的queues:queue_000:reserved中执行删除预存的数据
class redisqueue extends queue implements queuecontract
{
public function pushraw($payload, $queue = null, array $options = [])
{
$this->getconnection()->rpush(
$this->getqueue($queue), // list类型的queues:queue_000
$payload // $payload === "标准化后的数据,进行json格式化"的数据
);
return json_decode($payload, true)['id'] ?? null;
}
protected function laterraw($delay, $payload, $queue = null)
{
$this->getconnection()->zadd(
$this->getqueue($queue).':delayed', // sortedsets类型的queues:queue_000:delayed
$this->availableat($delay), // 延迟执行
$payload // $payload === "标准化后的数据,进行json格式化"的数据
);
return json_decode($payload, true)['id'] ?? null;
}
public function pop($queue = null)
{
// 执行lua脚本,把sortedsets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中
// 执行lua脚本,把sortedsets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中
$this->migrate($prefixed = $this->getqueue($queue));
// 执行lua脚本,从list类型的queues:queue_000中lpop出数据,attempts加1,然后设定超时时间并放入结构把sortedsets类型的queues:queue_000:reserved 中
list($job, $reserved) = $this->retrievenextjob($prefixed);
if ($reserved) {
return new redisjob(
$this->container, $this, $job,
$reserved, $this->connectionname, $queue ?: $this->default
);
}
}
}
下一篇: 史上最难的一道Java面试题