欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

利用Redis keyspace notification(键空间通知)实现过期提醒

程序员文章站 2022-06-23 23:34:17
一、序言: 本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。 这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 ......

一、序言:

本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 crontab 就能搞定了。

这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于是这就不是 cron 范畴里面的内容了。

举个最简单的例子,一个用户推荐了另一个用户,我们定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去

二、需求分析:

  1. 设置了生存时间的key,在过期时能不能有所提示?

  2. 如果能对过期key有个监听,如何对过期key进行一个回调处理?

  3. 如何使用 redis 来实现定时任务?

  4. 更具体需求:

    现在需要做一个拍卖活动,如何在拍卖结束那一刻,就执行任务进行相关逻辑;

    如何在订单交易有效期时间结束的那一刻,进行相关逻辑

三、redis介绍

在 redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(redis keyspace notifications),它配合 2.0.0 版本之后的 subscribe 就能完成这个定时任务

的操作了,不过定时的单位是秒。

(1)publish / subscribe

redis 在 2.0.0 之后推出了 pub / sub 的指令,大致就是说一边给 redis 的特定频道发送消息,另一边从 redis 的特定频道取值——形成了一个简易的消息队列。

(2)redis keyspace notifications

在 redis 里面有一些事件,比如键到期、键被删除等。然后我们可以通过配置一些东西来让 redis 一旦触发这些事件的时候就往特定的 channel 推一条消息。

大致的流程就是我们给 redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,我在自己的客户端这边就一直消费这个频道就好了。

以后一来一条定时任务,我们就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,redis 自然会把过期消息推去,我们的客户端就能接收到了。这样一来就起到了定时任务的作用。

配置

因为开启键空间通知功能需要消耗一些 cpu , 所以在默认配置下, 该功能处于关闭状态。

可以通过修改 redis.conf 文件, 或者直接使用 config set 命令来开启或关闭键空间通知功能:

  • notify-keyspace-events 选项的参数为空字符串时,功能关闭。

  • 另一方面,当参数不是空字符串时,功能开启。

notify-keyspace-events 的参数可以是以下字符的任意组合, 它指定了服务器该发送哪些类型的通知:

字符 发送的通知
  k 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
  e 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
  g delexpirerename 等类型无关的通用命令的通知
  $ 字符串命令的通知
  l 列表命令的通知
  s 集合命令的通知
  h 哈希命令的通知
  z 有序集合命令的通知
  x 过期事件:每当有过期键被删除时发送
  e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
  a 参数 g$lshzxe 的别名

输入的参数中至少要有一个 k 或者 e , 否则的话, 不管其余的参数是什么, 都不会有任何通知被分发。

举个例子, 如果只想订阅键空间中和列表相关的通知, 那么参数就应该设为 kl , 诸如此类。

将参数设为字符串 "ake" 表示发送所有类型的通知。

监听过期事件需要设置redis 配置文件

notify-keyspace-events "ex"

命令产生的通知

以下列表记录了不同命令所产生的不同通知:

  • [del key 命令为每个被删除的键产生一个 del 通知。

  • rename key newkey 产生两个通知:为来源键(source key)产生一个 rename_from 通知,并为目标键(destination key)产生一个 rename_to 通知。

  • expire key secondsexpireat key timestamp 在键被正确设置过期时间时产生一个 expire 通知。当 expireat key timestamp 设置的时间已经过期,或者 expire key seconds传入的时间为负数值时,键被删除,并产生一个 del 通知。

  • [sort key [by pattern] [limit offset count] [get pattern [get pattern …]] [asc | desc] [alpha] [store destination]]() 在命令带有 store 参数时产生一个 sortstore 事件。如果 store 指示的用于保存排序结果的键已经存在,那么程序还会发送一个 del 事件。

  • set key value [ex seconds] [px milliseconds] [nx|xx] 以及它的所有变种(setex key seconds valuesetnx key valuegetset key value)都产生 set 通知。其中 setex key seconds value 还会产生 expire 通知。

  • [mset key value 为每个键产生一个 set 通知。

  • setrange key offset value 产生一个 setrange 通知。

  • incr keydecr keyincrby key incrementdecrby key decrement 都产生 incrby通知。

  • incrbyfloat key increment 产生 incrbyfloat 通知。

  • append key value 产生 append 通知。

  • [lpush key value lpushx key value 都产生单个 lpush 通知,即使有多个输入元素时,也是如此。

  • [rpush key value rpushx key value 都产生单个 rpush 通知,即使有多个输入元素时,也是如此。

  • rpop key 产生 rpop 通知。如果被弹出的元素是列表的最后一个元素,那么还会产生一个 del 通知。

  • lpop key 产生 lpop 通知。如果被弹出的元素是列表的最后一个元素,那么还会产生一个 del 通知。

  • linsert key before|after pivot value 产生一个 linsert 通知。

  • lset key index value 产生一个 lset 通知。

  • ltrim key start stop 产生一个 ltrim 通知。如果 ltrim key start stop 执行之后,列表键被清空,那么还会产生一个 del 通知。

  • rpoplpush source destinationbrpoplpush source destination timeout 产生一个 rpop 通知,以及一个 lpush 通知。两个命令都会保证 rpop 的通知在 lpush 的通知之前分发。如果从键弹出元素之后,被弹出的列表键被清空,那么还会产生一个 del 通知。

  • hset hash field valuehsetnx hash field valuehmset 都只产生一个 hset 通知。

  • hincrby 产生一个 hincrby 通知。

  • hincrbyfloat 产生一个 hincrbyfloat 通知。

  • hdel 产生一个 hdel 通知。如果执行 hdel 之后,哈希键被清空,那么还会产生一个 del 通知。

  • [sadd key member 产生一个 sadd 通知,即使有多个输入元素时,也是如此。

  • [srem key member 产生一个 srem 通知,如果执行 [srem key member 之后,集合键被清空,那么还会产生一个 del 通知。

  • smove source destination member 为来源键(source key)产生一个 srem 通知,并为目标键(destination key)产生一个 sadd 事件。

  • spop key 产生一个 spop 事件。如果执行 spop key 之后,集合键被清空,那么还会产生一个 del 通知。

  • [sinterstore destination key 、 [sunionstore destination key 和 [sdiffstore destination key 分别产生 sinterstoresunionostoresdiffstore三种通知。如果用于保存结果的键已经存在,那么还会产生一个 del 通知。

  • zincrby key increment member 产生一个 zincr 通知。(译注:非对称,请注意。)

  • [zadd key score member [[score member] [score member] …]]() 产生一个 zadd 通知,即使有多个输入元素时,也是如此。

  • [zrem key member 产生一个 zrem 通知,即使有多个输入元素时,也是如此。如果执行 [zrem key member 之后,有序集合键被清空,那么还会产生一个 del 通知。

  • zremrangebyscore key min max 产生一个 zrembyscore 通知。(译注:非对称,请注意。)如果用于保存结果的键已经存在,那么还会产生一个 del 通知。

  • zremrangebyrank key start stop 产生一个 zrembyrank 通知。(译注:非对称,请注意。)如果用于保存结果的键已经存在,那么还会产生一个 del 通知。

  • [zinterstore destination numkeys key [key …] [weights weight [weight …]] [aggregate sum|min|max]]() 和 [zunionstore destination numkeys key [key …] [weights weight [weight …]] [aggregate sum|min|max]]() 分别产生 zinterstorezunionstore 两种通知。如果用于保存结果的键已经存在,那么还会产生一个 del 通知。

  • 每当一个键因为过期而被删除时,产生一个 expired 通知。

  • 每当一个键因为 maxmemory 政策而被删除以回收内存时,产生一个 evicted 通知。

note

所有命令都只在键真的被改动了之后,才会产生通知。

比如说,当 [srem key member 试图删除不存在于集合的元素时,删除操作会执行失败,因为没有真正的改动键,所以这一操作不会发送通知。

如果对命令所产生的通知有疑问, 最好还是使用以下命令, 自己来验证一下:

$ redis-cli config set notify-keyspace-events kea
$ redis-cli --csv psubscribe '__key*__:*'
reading messages... (press ctrl-c to quit)
"psubscribe","__key*__:*",1

然后, 只要在其他终端里用 redis 客户端发送命令, 就可以看到产生的通知了:

"pmessage","__key*__:*","__keyspace@0__:foo","set"
"pmessage","__key*__:*","__keyevent@0__:set","foo"
...

过期通知的发送时间

redis 使用以下两种方式删除过期的键:

  • 当一个键被访问时,程序会对这个键进行检查,如果键已经过期,那么该键将被删除。

  • 底层系统会在后台渐进地查找并删除那些过期的键,从而处理那些已经过期、但是不会被访问到的键。

当过期键被以上两个程序的任意一个发现、 并且将键从数据库中删除时, redis 会产生一个 expired 通知。

redis 并不保证生存时间(ttl)变为 0 的键会立即被删除: 如果程序没有访问这个过期键, 或者带有生存时间的键非常多的话, 那么在键的生存时间变为 0 , 直到键真正被删除这中间, 可能会有一段比较显著的时间间隔。

因此, redis 产生 expired 通知的时间为过期键被删除的时候, 而不是键的生存时间变为 0 的时候。

四、高可用性

因为 redis 目前的订阅与发布功能采取的是发送即忘(fire and forget)策略, 所以如果你的程序需要可靠事件通知(reliable notification of events), 那么目前的键空间通知可能并不适合你:当订阅事件的客户端断线时, 它会丢失所有在断线期间分发给它的事件。并不能确保消息送达。未来有计划允许更可靠的事件传递,但可能这将在更一般的层面上解决,或者为pub / sub本身带来可靠性,或者允许lua脚本拦截pub / sub消息来执行诸如推送将事件列入清单。

事件类型

对于每个修改数据库的操作,键空间通知都会发送两种不同类型的事件消息:keyspace 和 keyevent。以 keyspace 为前缀的频道被称为键空间通知(key-space notification), 而以 keyevent 为前缀的频道则被称为键事件通知(key-event notification)。

事件是用  __keyspace@db__:keypattern 或者  __keyevent@db__:opstype 的格式来发布消息的。
db表示在第几个库;keypattern则是表示需要监控的键模式(可以用通配符,如:__key*__:*);opstype则表示操作类型。因此,如果想要订阅特殊的key上的事件,应该是订阅keyspace。
比如说,对 0 号数据库的键 mykey 执行 del 命令时, 系统将分发两条消息, 相当于执行以下两个 publish 命令:
publish __keyspace@0__:samplekey del
publish __keyevent@0__:del samplekey
订阅第一个频道 __keyspace@0__:mykey 可以接收 0 号数据库中所有修改键 mykey 的事件, 而订阅第二个频道 __keyevent@0__:del 则可以接收 0 号数据库中所有执行 del 命令的键。

五、实现步骤

为了高可用性,为了确保解决过期事件的执行,将 定时事件存入mysql数据库。触发键过期事件后,再查询一次数据库,检查一下过期事件是否全部执行了。

数据表结构

create table `tb_time_limit_task` (
  `id` int(10) unsigned not null auto_increment,
  `key` varchar(255) character set utf8 collate utf8_bin not null comment 'redis键',
  `status` tinyint(3) unsigned not null comment '状态,0未处理,1已处理',
  `start_time` decimal(13,3) unsigned not null comment '开始时间(小数部分为毫秒)',
  `end_time` decimal(13,3) unsigned not null comment '结束时间(小数部分为毫秒)',
  primary key (`id`),
  key `we` (`key`) using btree
) engine=innodb default charset=utf8 comment='这个表用于记录需要时间控制的任务key,配合redis、以及回调脚本使用';
​
key存储规则是 类名@方法名@参数...   (参数可为空,多个参数以@分隔) 
例子: ptcountdown@countdown@218

 

实现思路:

  1. (查询数据库)任务状态检查,执行未正常执行的任务

    任务状态检查

    查询 ”结束时间 < 当前时间“ 的未处理的任务

    如果存在,则执行任务,

    1.先解析key,类名@方法名@参数... 2.然后根据类名去执行相应方法

  2. 连接redis

    • 连接成功

      • (查询数据库)任务状态检查,查看在脚本未运行期间是否有部分任务未处理,可能很长时间才连上redis,需要查看连接时间内的任务状况;

    • 可能会永远连不上,则每10s,尝试重连

  3. 生成订阅消息丢失控制键

    向redis初始新增 10个有效期(900/1800/...)的键

    #silck`1 900
    #silck`2 1800
    #silck`3 2700
    ...
    #silck`10 9000

这一步的目的是 每900秒(15)分钟,查询数据库,检查任务执行情况

  1. 订阅过期事件

    • 正常键过期

      • 执行任务

    • 订阅消息控制键过期

      • 检查任务状态

        • 如果超过一半的控制键都过期了,那么重新生成10个

具体代码:

监听脚本
<?php
/**
 * description:时间结点任务监听
 * created by dong.cx
 * datetime: 2019-03-15 10:58
 */

namespace wladmin\cmd;

\think\loader::addnamespace('wlmis', './wlmis/');

use wlmis\logic\timelimittask\base\timelimittasklogic;
use think\config;
use think\console\input;
use think\console\output;
use think\console\command;
use think\log;
use wlmis\common\redis\redis;
use wlmis\logic\timelimittask\base\logrecord;

class timelimittask extends command
{
    use logrecord;
    /**
     * 订阅信息丢失控制键最大数量
     * @var int
     */
    protected $subscription_info_loss_control_key_max = 10;

    /**
     * 订阅信息丢失控制键最后执行的索引,键的索引从1开始,为0表示未执行过,这个变量用于控制订阅信息控制键自动生成
     * @var int
     */
    protected $subscription_info_loss_control_key_last = 0;

    public function __construct($name = null)
    {
        parent::__construct($name);
        // 日志记录初始化
        log::init([
            'type' => 'file',
            'path' => runtime_path . 'redis-logs/',
            // error和sql日志单独记录
            'apart_level' => ['log', 'error', 'sql', 'debug', 'info', 'notice'],
        ]);
    }

    /**
     * 运行方式 php tp5cornnew.php timelimittask
     * @author dong.cx 2019-04-02 10:59
     */
    protected function configure()
    {
        $this->setname('timelimittask')->setdescription('redis keyspace notification subscription script');
    }

    protected function execute(input $input, output $output)
    {
        // 配置断线重连
        config::set('database.break_reconnect', true);
        $config = config::get('redis_db');
        $reconnect_str = '';
        redisreconnect:
        try {
            $this->logrecord('info', "thinkphp version: " . think_version);
            $this->logrecord('info', $reconnect_str . "redis host: " . $config['host'], true, true);
            // 进行任务状态检查
            $this->taskstatuscheck();
            $redis = new redis(get_class($this), true);
            if ($redis->ping() == '+pong') {
                $this->logrecord('info', 'connection succeeded', true, true);
                // 查看在脚本未运行期间是否有部分任务未处理
                $this->taskstatuscheck();
            }
            // 生成订阅消息丢失控制键
            $this->subscription_info_loss_control(true);
            $this->logrecord('info', 'start listening', true, true);
            // 订阅消息
            $redis->psubscribe(array(
                '__keyevent@' . $config['db'] . '__:expired'
            ), function ($redis, $pattern, $channelname, $message) {
                $msg_split = explode('`', $message);
                if (count($msg_split) == 2 && $msg_split[0] == '#silck' && is_numeric($msg_split[1])) {
                    $this->subscription_info_loss_control_key_last = $msg_split[1];
                    $this->taskstatuscheck();
                    if ($this->subscription_info_loss_control_key_last > ($this->subscription_info_loss_control_key_max / 2)) {
                        $this->subscription_info_loss_control();
                        $this->subscription_info_loss_control_key_last = 0;
                    }
                } else {
                    // 这里代表是redis回调执行
                    $this->task($message);
                }
            });
        } catch (\redisexception $redisthrow) {
            // redis抛出异常,一般的情况是失去连接,执行重新连接
            $this->logrecord('notice', "redis loses connection and is reconnecting...", true, true);
            try {
                $redis->close();
            } catch (\exception $ee) {
            }
            sleep(10);
            $reconnect_str = 'reconnect ';
            goto redisreconnect;
        } catch (\exception $e) {
            // 运行错误,这里抛出错误的原因为这个文件中的代码有误,其他任务执行代码抛出错误,不会导致运行中断 - 执行到这里运行中断
            $this->logrecord('error', 'run-time error' . php_eol . 'file location: ' . $e->getfile() . php_eol . 'line: ' . $e->getline() . php_eol . 'error message: ' . $e->getmessage() . php_eol, true, true);
        }
    }

    /**
     * 任务执行
     * @param string $key 任务键名,记录于redis中的键名
     *                         键名规则:类名@方法名@参数...(后续的多个参数都用@分隔),在时间限制任务基类中有生成键的封装函数
     * @author: dong.cx
     */
    private function task($key)
    {
        try {
            $params = explode('@', $key, 3);
            if (count($params) < 2) {
                return;
            }
            $class = new \reflectionclass('wlmis\\logic\\timelimittask\\' . $params[0]);
            $instance = $class->newinstance();
            $transfer = array();
            if (count($params) == 3) {
                $transfer = explode('@', $params[2]);
            }
            $instance->call_func($params[1], $transfer);
        } catch (\exception $e) {
            $this->logrecord('notice', 'task execution class or method not found! or call the method to throw an error.'
                . php_eol . 'pass key parameter: ' . $key . php_eol . 'file location: ' . get_class($this)
                . php_eol . 'line: ' . $e->getline() . php_eol . 'error message: ' . $e->getmessage() . php_eol . php_eol);
        }
    }

    /**
     * 任务状态检查,执行未正常执行的任务
     * @author dong.cx 2019-04-02 10:57
     */
    private function taskstatuscheck()
    {
        try {
            $result = (new timelimittasklogic())->getnotperformedtask();
            if (!empty($result)) {
                $this->logrecord('info', 'find ' . count($result) . ' unprocessed task:');
                foreach ($result as $value) {
                    $this->task($value['key']);
                }
            }
        } catch (\exception $e) {
            $this->logrecord('notice', 'an exception occurred during task status checking.');
        }
    }

    /**
     * 生成订阅消息丢失控制键
     * @param boolean $always_output_screen 不管不否在调试模式都输出到屏幕
     *
     * @author dong.cx 2019-04-02 10:58
     */
    private function subscription_info_loss_control($always_output_screen = false)
    {
        try {
            $this->logrecord('info', 'generates subscription information loss control keys.', true, $always_output_screen);
            $success = 0;
            $error = 0;
            $redis = new redis();
            for ($i = 1; $i <= $this->subscription_info_loss_control_key_max; $i++) {
                $redis->setex('#silck`' . $i, $i * 900, '') ? $success++ : $error++;
            }
            $this->logrecord('info', 'generates loss control keys: ' . $this->subscription_info_loss_control_key_max . ' total, ' . $success . ' success, ' . $error . ' error', true, $always_output_screen);
            $redis->close();
        } catch (\exception $e) {
            $this->logrecord('notice', 'an exception occurs when the subscription information loss control key is created.', true, $always_output_screen);
        }
    }
}

 

键事件回调操作
<?php
/**
 * description:拍卖倒计时操作
 * created by dong.cx
 * datetime: 2019-03-18 10:04
 */

namespace wlmis\logic\timelimittask;


use think\config;
use think\exception;
use wlmis\common\redis\redis;
use wlmis\dao\addons\auction\auctiongoodsdao;
use wlmis\logic\oper\addons\auction\auctionlogic;
use wlmis\logic\timelimittask\base\timelimitbaselogic;

class auctioncutdownlogic extends timelimitbaselogic
{
    private $auctiongoodsdao;
    public function __construct()
    {
        parent::__construct();
        $this->auctiongoodsdao = new auctiongoodsdao();
    }

    /**
     * 拍卖结束, 更新拍品表/保单表 操作
     * @param $params
     *
     * @author dong.cx 2019-03-18 18:39
     */
    public function auctionendcutdown($params)
    {
        $auctionid = $params[0];
        $auctionlogic = new auctionlogic();
        try {
            if (!$auctionid || !is_numeric($auctionid)) throw new exception('params error');
            $goodsinfo = $this->auctiongoodsdao->load($auctionid, 'final_end_time');
            if (!$goodsinfo) {
                $this->logrecord('notice', 'tb_auction_goods主键:' . $auctionid . '不存在');
            } else {
                parent::starttrans();
                // 拍卖结束
                $result = $auctionlogic->auctionended($auctionid);
                if ($result['code'] == 0) {
                    $this->logrecord('notice', $result['msg']);
                }
                // 更改mysql中键的状态为已处理
                $this->recording_process_mysql($this->key_splice(__function__, [$auctionid]));
                // 删除 redis 当前价
                $redis = new redis();
                $redis->del('auction_gid@' . $auctionid . '@current_bid');

                websocket_send($auctionid . 'bid/index', true, 2, '拍卖结束');
            }
            parent::commit();

        } catch (exception $e) {
            parent::rollback();
            $this->throw_message(__function__, $e);
        }
    }

    /**
     * 拍卖交易结束
     *     无订单/未付款,不释放保证金
     * @param $params
     *
     * @author dong.cx 2019-03-18 20:15
     */
    public function dealcutdown($params)
    {
        $auctionid = $params[0];
        $auctionlogic = new auctionlogic();
        try {
            parent::starttrans();
            if (!$auctionid || !is_numeric($auctionid)) throw new exception('params error');
            $goodsinfo = $this->auctiongoodsdao->load($auctionid, 'final_end_time');
            if (!$goodsinfo) {
                $this->logrecord('notice', 'tb_auction_goods主键:' . $auctionid . '不存在');
            } elseif (!$goodsinfo['final_end_time']) {
                $this->logrecord('notice', 'tb_auction_goods主键:' . $auctionid . '的拍品还未结束或最终结束时间为空');
            } else {
                $result = $auctionlogic->checkstatus($auctionid);
                if ($result['code'] == 0) $this->logrecord('notice', $result['msg']);
                // 更改mysql中键的状态为已处理
                $this->recording_process_mysql($this->key_splice(__function__, [$auctionid]));
            }
            parent::commit();
        } catch (exception $e) {
            parent::rollback();
            $this->throw_message(__function__, $e);
        }
    }

    /**
     * 创建拍卖结束倒计时任务
     * @param $auctionid
     * @param int $ttl
     *
     * @throws exception
     * @author dong.cx 2019-04-01 09:49
     */
    public function auction_end_countdown_create($auctionid, $ttl=0)
    {
        return $this->create('auctionendcutdown', $ttl, [$auctionid]);
    }

    /**
     * 删除拍卖结束倒计时任务
     * @param int $auctionid 拍卖商品表主键
     *
     * @return bool|int
     * @throws exception
     * @author dong.cx 2019-04-01 10:08:49
     */
    public function auction_end_countdown_delete($auctionid)
    {
        return $this->del_key('auctionendcutdown', [$auctionid]);
    }

    /**
     * 创建交易倒计时任务
     * @param int $auctionid 拍卖商品表主键
     * @param int $ttl 生存时间
     *
     * @throws exception
     * 异常代码:
     * 500     redis操作失败
     * @author dong.cx 2019-03-22 15:36
     */
    public function deal_countdown_create($auctionid, $ttl=0)
    {
        $this->create('dealcutdown', $ttl + config::get('auction_deal_limit_time'), [$auctionid]);
    }

    /**
     * 删除交易倒计时任务
     * @param int $auctionid 拍卖商品表主键
     *
     * @return bool|int
     * @throws exception
     * @author dong.cx 2019-03-22 15:36
     */
    public function deal_countdown_delete($auctionid)
    {
        return $this->del_key('countdown', [$auctionid]);
    }
}

 

 

任务基类
<?php
/**
 * created by dong.cx
 * date: 2019/3/27 17:13
 * description: 时间限制任务基类
 *              每一个子类继承这个基类实现时间任务调度
 *              子类中开放给redis调度的函数设置访问权限为protected,防止外部误触发
 *              子类中其他开放给内部调用的访问权限为public
 * ************************************************
 * 存储到redis中的键名规则为:类名@方法名@参数...(参数可为空,多个参数则以@分隔) key_splice 函数可生成键
 * 所有的参数通过一个数组传入方法(一维索引数组,跟存储函数 create 传入参数时一样)
 * 类名、方法名,尽量精简,能节约带宽以及redis查询速度
 * 参数设计也尽量精简,所有操作都在服务端内部完成,所以能用1个条件准确查询数据库的,不要用两个条件查询
 *
 * 存储键直接使用 create 方法,以秒为单位,会自动拼接键键
 * 如果以毫秒为单位则 create_ms 方法
 * ************************************************
 */

namespace wlmis\logic\timelimittask\base;


use think\exception;
use wlmis\common\redis\redis;
use wlmis\model\sys\timelimittaskmodel;
use wlmis\logic\baselogic;

class timelimitbaselogic extends baselogic
{
    use logrecord;

    /**
     * redis连接实例
     * @var redis
     */
    protected $redis;

    /**
     * timelimitbaselogic constructor.
     * @author dong.cx
     */
    public function __construct()
    {
        parent::__construct();
        $this->redis = new redis();
    }

    /**
     * 任务调度入口
     * @param string $funcname 调用方法名
     * @param array $params 传递参数
     * @author: dong.cx
     */
    public function call_func($funcname, $params = array())
    {
        call_user_func(array($this, $funcname), $params);
    }

    /**
     * 键拼接
     * 键用 @ 符号作为分隔符,所以方法名、参数中不可出现
     * 键名规则中的类名会自动生成
     * @param string $funcname 方法名
     * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
     * @return string                  返回键
     * @author: dong.cx
     */
    protected function key_splice($funcname, $params = array())
    {
        $class = explode('\\', get_class($this));
        $paramsstr = '';
        foreach ($params as $value) {
            $paramsstr .= '@' . $value;
        }
        return $class[count($class) - 1] . '@' . $funcname . $paramsstr;
    }

    /**
     * 向redis存储键(延时单位秒)
     * 会自动将参数进行拼接,然后存入redis
     * @param string $funcname 调用方法名
     * @param int $ttl 延时(秒)
     * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
     * @throws exception
     * *********************
     * 异常代码:
     * 500     redis操作失败
     * *********************
     * @author: dong.cx
     */
    public function create($funcname, $ttl = 0, $params = array())
    {
        $key = $this->key_splice($funcname, $params);
        $this->recording_mysql($key, $ttl);
        if (!($this->redis->setex($key, $ttl, ''))) {
            throw new exception('redis存储失败', 500);
        }
    }

    /**
     * 向redis存储键(延时单位毫秒)
     * 会自动将参数进行拼接,然后存入redis
     * @param string $funcname 调用方法名
     * @param int $ttl 延时(毫秒)
     * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
     * @throws exception
     * *********************
     * 异常代码:
     * 500     redis操作失败
     * *********************
     * @author: dong.cx
     */
    public function create_ms($funcname, $ttl = 0, $params = array())
    {
        $key = $this->key_splice($funcname, $params);
        $this->recording_mysql($key, $ttl, true);
        if (!($this->redis->psetex($key, $ttl, ''))) {
            throw new exception('redis存储失败', 500);
        }
    }

    /**
     * 获取指定键的剩余生存时间(秒)
     * @param string $funcname  任务方法名
     * @param array $params 任务参数
     * @return bool|int         如果为false,说明redis连接失败
     *                          如果为-1,说明改键不是定时键
     *                          如果为-2,说明键不存在(已消失)
     *                          其他为剩余生存时间(秒)
     * @throws exception
     * @author: dong.cx
     */
    protected function getttl($funcname, $params = array())
    {
        $key = $this->key_splice($funcname, $params);
        return $this->redis->ttl($key);
    }

    /**
     * 获取指定键的剩余生存时间(毫秒)
     * @param string $funcname  任务方法名
     * @param array $params 任务参数
     * @return bool|int         如果为false,说明redis连接失败
     *                          如果为-1,说明改键不是定时键
     *                          如果为-2,说明键不存在(已消失)
     *                          其他为剩余生存时间(秒)
     * @throws exception
     * @author: dong.cx
     */
    protected function getpttl($funcname, $params = array())
    {
        $key = $this->key_splice($funcname, $params);
        return $this->redis->pttl($key);
    }

    /**
     * 删除指定键
     * ***********************************************
     * 删除不会触发事件,用于无用记录的删除
     * 如生成支付订单二次提交时删除前面一个未处理任务。
     * 一般在设计任务处理流程时需要考虑到无用任务的触发,并进行规避,必要时进行主动删除任务可以减轻服务器负担
     * 任务处理流程应该做到无用记录的触发不会影响到系统正常运行
     * ***********************************************
     * @param $funcname
     * @param array $params
     * @return bool|int         返回false则redis实例获取失败,连接不上,返回int则为影响的记录条数
     * @throws exception
     * @author: dong.cx
     */
    protected function del_key($funcname, $params = array())
    {
        $key = $this->key_splice($funcname, $params);
        timelimittaskmodel::where('key', $key)->update([
            'sts' => 1
        ]);
        return $this->redis->del($key);
    }

    /**
     * 记录键到mysql中,
     * @param string $key 键
     * @param int $ttl 触发时间
     * @param bool $mode 当为false时,触发时间为秒,当为true时,触发时间为毫秒
     * @throws \think\db\exception\datanotfoundexception
     * @throws \think\db\exception\modelnotfoundexception
     * @throws \think\exception\dbexception
     * @author: dong.cx
     */
    private function recording_mysql($key, $ttl, $mode = false)
    {
        if ($mode) {
            // 这里说明 ttl 以毫秒为单位
            $currenttime = bcmul(microtime(true), '1', 3);
            $endtime = bcadd($currenttime, bcdiv($ttl, '1000', 3), 3);
        } else {
            // 这里说明 ttl 以秒为单位
            $currenttime = time();
            $endtime = $currenttime + $ttl;
        }
        if (timelimittaskmodel::field('id')->where('key', $key)->find() !== null) {
            timelimittaskmodel::where('key', $key)->update([
                'status'     => 0,
                'start_time' => $currenttime,
                'end_time'   => $endtime
            ]);
        } else {
            timelimittaskmodel::create([
                'key'        => $key,
                'status'     => 0,
                'start_time' => $currenttime,
                'end_time'   => $endtime,
                'sts'        => 0
            ]);
        }
    }

    /**
     * 更改键在mysql中的状态为已处理
     * @param $key
     * @author: dong.cx
     */
    protected function recording_process_mysql($key)
    {
        $tlm = new timelimittaskmodel();
        $tlm->where('key', $key)->update([
            'status' => 1
        ]);
    }

    /**
     * 抛出错误信息
     * @param string $funcname 出错方法名(__function__)
     * @param \exception $e 错误信息
     * @author: dong.cx
     */
    protected function throw_message($funcname, \exception $e)
    {
        $this->logrecord('error', 'the task logic has made an error:' . php_eol . 'class:' . get_class($this)
            . php_eol . 'method name:' . $funcname . php_eol . 'file:' . $e->getfile() . php_eol . 'line: ' . $e->getline()
            . php_eol . 'error message:' . $e->getmessage() . php_eol);
    }

    /**
     * 析构函数
     * @author dong.cx
     */
    public function __destruct()
    {
        $this->redis->close();
    }

}

 

运行

 ✘  ~/documents/card253  php tp5cornnew.php timelimittask
【2019-04-08 11:40:02】thinkphp version: 5.0.7
【2019-04-08 11:40:02】redis host: 127.0.0.1
【2019-04-08 11:40:02】connection succeeded
【2019-04-08 11:40:02】generates subscription information loss control keys.
【2019-04-08 11:40:02】generates loss control keys: 10 total, 10 success, 0 error
【2019-04-08 11:40:02】start listening

使用:

只需要启动脚本,

在需要的时候,新增任务即可

参考资料:

redis实践操作之—— keyspace notification(键空间通知)