使用Redis实现延时任务的解决方案
最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。
候选方案对比
下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。
方案 | 优势 | 劣势 | 选用场景 |
---|---|---|---|
jdk 内置的延迟队列 delayqueue
|
实现简单 | 数据内存态,不可靠 | 一致性相对低的场景 |
调度框架和 mysql 进行短间隔轮询 |
实现简单,可靠性高 | 存在明显的性能瓶颈 | 数据量较少实时性相对低的场景 |
rabbitmq 的 dlx 和 ttl ,一般称为 死信队列 方案 |
异步交互可以削峰 | 延时的时间长度不可控,如果数据需要持久化则性能会降低 | - |
调度框架和 redis 进行短间隔轮询 |
数据持久化,高性能 | 实现难度大 | 常见于支付结果回调方案 |
时间轮 | 实时性高 | 实现难度大,内存消耗大 | 实时性高的场景 |
如果应用的数据量不高,实时性要求比较低,选用调度框架和 mysql
进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 mysql
实例造成比较大的压力。记得很早之前,看过一个ppt叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:
里面刚好用到了调度框架和 redis
进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。
由于ppt中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。
场景设计
实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 ordermessage
),订单消息需要延迟5到15秒后进行异步处理。
否决的候选方案实现思路
下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。
jdk内置延迟队列
delayqueue
是一个阻塞队列的实现,它的队列元素必须是 delayed
的子类,这里做个简单的例子:
public class delayqueuemain { private static final logger logger = loggerfactory.getlogger(delayqueuemain.class); public static void main(string[] args) throws exception { delayqueue<ordermessage> queue = new delayqueue<>(); // 默认延迟5秒 ordermessage message = new ordermessage("order_id_10086"); queue.add(message); // 延迟6秒 message = new ordermessage("order_id_10087", 6); queue.add(message); // 延迟10秒 message = new ordermessage("order_id_10088", 10); queue.add(message); executorservice executorservice = executors.newsinglethreadexecutor(r -> { thread thread = new thread(r); thread.setname("delayworker"); thread.setdaemon(true); return thread; }); logger.info("开始执行调度线程..."); executorservice.execute(() -> { while (true) { try { ordermessage task = queue.take(); logger.info("延迟处理订单消息,{}", task.getdescription()); } catch (exception e) { logger.error(e.getmessage(), e); } } }); thread.sleep(integer.max_value); } private static class ordermessage implements delayed { private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"); /** * 默认延迟5000毫秒 */ private static final long delay_ms = 1000l * 5; /** * 订单id */ private final string orderid; /** * 创建时间戳 */ private final long timestamp; /** * 过期时间 */ private final long expire; /** * 描述 */ private final string description; public ordermessage(string orderid, long expireseconds) { this.orderid = orderid; this.timestamp = system.currenttimemillis(); this.expire = this.timestamp + expireseconds * 1000l; this.description = string.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderid, localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), localdatetime.ofinstant(instant.ofepochmilli(expire), zoneid.systemdefault()).format(f)); } public ordermessage(string orderid) { this.orderid = orderid; this.timestamp = system.currenttimemillis(); this.expire = this.timestamp + delay_ms; this.description = string.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderid, localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), localdatetime.ofinstant(instant.ofepochmilli(expire), zoneid.systemdefault()).format(f)); } public string getorderid() { return orderid; } public long gettimestamp() { return timestamp; } public long getexpire() { return expire; } public string getdescription() { return description; } @override public long getdelay(timeunit unit) { return unit.convert(this.expire - system.currenttimemillis(), timeunit.milliseconds); } @override public int compareto(delayed o) { return (int) (this.getdelay(timeunit.milliseconds) - o.getdelay(timeunit.milliseconds)); } } }
注意一下, ordermessage
实现 delayed
接口,关键是需要实现 delayed#getdelay()
和 delayed#compareto()
。运行一下 main()
方法:
10:16:08.240 [main] info club.throwable.delay.delayqueuemain - 开始执行调度线程... 10:16:13.224 [delayworker] info club.throwable.delay.delayqueuemain - 延迟处理订单消息,订单[order_id_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13 10:16:14.237 [delayworker] info club.throwable.delay.delayqueuemain - 延迟处理订单消息,订单[order_id_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14 10:16:18.237 [delayworker] info club.throwable.delay.delayqueuemain - 延迟处理订单消息,订单[order_id_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18
调度框架 + mysql
使用调度框架对 mysql
表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:
mysql
引入 quartz
、 mysql
的java驱动包和 spring-boot-starter-jdbc
(这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):
<dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>5.1.48</version> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-jdbc</artifactid> <version>2.1.7.release</version> <scope>test</scope> </dependency> <dependency> <groupid>org.quartz-scheduler</groupid> <artifactid>quartz</artifactid> <version>2.3.1</version> <scope>test</scope> </dependency>
假设表设计如下:
create database `delaytask` character set utf8mb4 collate utf8mb4_unicode_520_ci; use `delaytask`; create table `t_order_message` ( id bigint unsigned primary key auto_increment, order_id varchar(50) not null comment '订单id', create_time datetime not null default current_timestamp comment '创建日期时间', edit_time datetime not null default current_timestamp comment '修改日期时间', retry_times tinyint not null default 0 comment '重试次数', order_status tinyint not null default 0 comment '订单状态', index idx_order_id (order_id), index idx_create_time (create_time) ) comment '订单信息表'; # 写入两条测试数据 insert into t_order_message(order_id) values ('10086'),('10087');
编写代码:
// 常量 public class orderconstants { public static final int max_retry_times = 5; public static final int pending = 0; public static final int success = 1; public static final int fail = -1; public static final int limit = 10; } // 实体 @builder @data public class ordermessage { private long id; private string orderid; private localdatetime createtime; private localdatetime edittime; private integer retrytimes; private integer orderstatus; } // dao @requiredargsconstructor public class ordermessagedao { private final jdbctemplate jdbctemplate; private static final resultsetextractor<list<ordermessage>> m = r -> { list<ordermessage> list = lists.newarraylist(); while (r.next()) { list.add(ordermessage.builder() .id(r.getlong("id")) .orderid(r.getstring("order_id")) .createtime(r.gettimestamp("create_time").tolocaldatetime()) .edittime(r.gettimestamp("edit_time").tolocaldatetime()) .retrytimes(r.getint("retry_times")) .orderstatus(r.getint("order_status")) .build()); } return list; }; public list<ordermessage> selectpendingrecords(localdatetime start, localdatetime end, list<integer> statuslist, int maxretrytimes, int limit) { stringjoiner joiner = new stringjoiner(","); statuslist.foreach(s -> joiner.add(string.valueof(s))); return jdbctemplate.query("select * from t_order_message where create_time >= ? and create_time <= ? " + "and order_status in (?) and retry_times < ? limit ?", p -> { p.settimestamp(1, timestamp.valueof(start)); p.settimestamp(2, timestamp.valueof(end)); p.setstring(3, joiner.tostring()); p.setint(4, maxretrytimes); p.setint(5, limit); }, m); } public int updateorderstatus(long id, int status) { return jdbctemplate.update("update t_order_message set order_status = ?,edit_time = ? where id =?", p -> { p.setint(1, status); p.settimestamp(2, timestamp.valueof(localdatetime.now())); p.setlong(3, id); }); } } // service @requiredargsconstructor public class ordermessageservice { private static final logger logger = loggerfactory.getlogger(ordermessageservice.class); private final ordermessagedao ordermessagedao; private static final list<integer> status = lists.newarraylist(); static { status.add(orderconstants.pending); status.add(orderconstants.fail); } public void executedelayjob() { logger.info("订单处理定时任务开始执行......"); localdatetime end = localdatetime.now(); // 一天前 localdatetime start = end.minusdays(1); list<ordermessage> list = ordermessagedao.selectpendingrecords(start, end, status, orderconstants.max_retry_times, orderconstants.limit); if (!list.isempty()) { for (ordermessage m : list) { logger.info("处理订单[{}],状态由{}更新为{}", m.getorderid(), m.getorderstatus(), orderconstants.success); // 这里其实可以优化为批量更新 ordermessagedao.updateorderstatus(m.getid(), orderconstants.success); } } logger.info("订单处理定时任务开始完毕......"); } } // job @disallowconcurrentexecution public class ordermessagedelayjob implements job { @override public void execute(jobexecutioncontext jobexecutioncontext) throws jobexecutionexception { ordermessageservice service = (ordermessageservice) jobexecutioncontext.getmergedjobdatamap().get("ordermessageservice"); service.executedelayjob(); } public static void main(string[] args) throws exception { hikariconfig config = new hikariconfig(); config.setjdbcurl("jdbc:mysql://localhost:3306/delaytask?usessl=false&characterencoding=utf8"); config.setdriverclassname(driver.class.getname()); config.setusername("root"); config.setpassword("root"); hikaridatasource datasource = new hikaridatasource(config); ordermessagedao ordermessagedao = new ordermessagedao(new jdbctemplate(datasource)); ordermessageservice service = new ordermessageservice(ordermessagedao); // 内存模式的调度器 stdschedulerfactory factory = new stdschedulerfactory(); scheduler scheduler = factory.getscheduler(); // 这里没有用到ioc容器,直接用quartz数据集合传递服务引用 jobdatamap jobdatamap = new jobdatamap(); jobdatamap.put("ordermessageservice", service); // 新建job jobdetail job = jobbuilder.newjob(ordermessagedelayjob.class) .withidentity("ordermessagedelayjob", "delayjob") .usingjobdata(jobdatamap) .build(); // 新建触发器,10秒执行一次 trigger trigger = triggerbuilder.newtrigger() .withidentity("ordermessagedelaytrigger", "delayjob") .withschedule(simpleschedulebuilder.simpleschedule().withintervalinseconds(10).repeatforever()) .build(); scheduler.schedulejob(job, trigger); // 启动调度器 scheduler.start(); thread.sleep(integer.max_value); } }
这个例子里面用了 create_time
做轮询,实际上可以添加一个调度时间 schedule_time
列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:
11:58:27.202 [main] info org.quartz.core.quartzscheduler - scheduler meta-data: quartz scheduler (v2.3.1) 'defaultquartzscheduler' with instanceid 'non_clustered' scheduler class: 'org.quartz.core.quartzscheduler' - running locally. not started. currently in standby mode. number of jobs executed: 0 using thread pool 'org.quartz.simpl.simplethreadpool' - with 10 threads. using job-store 'org.quartz.simpl.ramjobstore' - which does not support persistence. and is not clustered. 11:58:27.202 [main] info org.quartz.impl.stdschedulerfactory - quartz scheduler 'defaultquartzscheduler' initialized from default resource file in quartz package: 'quartz.properties' 11:58:27.202 [main] info org.quartz.impl.stdschedulerfactory - quartz scheduler version: 2.3.1 11:58:27.209 [main] info org.quartz.core.quartzscheduler - scheduler defaultquartzscheduler_$_non_clustered started. 11:58:27.212 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.core.quartzschedulerthread - batch acquisition of 1 triggers 11:58:27.217 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.simpl.propertysettingjobfactory - producing instance of job 'delayjob.ordermessagedelayjob', class=club.throwable.jdbc.ordermessagedelayjob 11:58:27.219 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@10eb8c53 11:58:27.220 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.core.quartzschedulerthread - batch acquisition of 0 triggers 11:58:27.221 [defaultquartzscheduler_worker-1] debug org.quartz.core.jobrunshell - calling execute on job delayjob.ordermessagedelayjob 11:58:34.440 [defaultquartzscheduler_worker-1] info club.throwable.jdbc.ordermessageservice - 订单处理定时任务开始执行...... 11:58:34.451 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@3d27ece4 11:58:34.459 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@64e808af 11:58:34.470 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@79c8c2b7 11:58:34.477 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@19a62369 11:58:34.485 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - added connection com.mysql.jdbc.jdbc4connection@1673d017 11:58:34.485 [hikaripool-1 connection adder] debug com.zaxxer.hikari.pool.hikaripool - hikaripool-1 - after adding stats (total=10, active=0, idle=10, waiting=0) 11:58:34.559 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql query 11:58:34.565 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql statement [select * from t_order_message where create_time >= ? and create_time <= ? and order_status in (?) and retry_times < ? limit ?] 11:58:34.645 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.datasource.datasourceutils - fetching jdbc connection from datasource 11:58:35.210 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - sqlwarning ignored: sql state '22007', error code '1292', message [truncated incorrect double value: '0,-1'] 11:58:35.335 [defaultquartzscheduler_worker-1] info club.throwable.jdbc.ordermessageservice - 处理订单[10086],状态由0更新为1 11:58:35.342 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql update 11:58:35.346 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql statement [update t_order_message set order_status = ?,edit_time = ? where id =?] 11:58:35.347 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.datasource.datasourceutils - fetching jdbc connection from datasource 11:58:35.354 [defaultquartzscheduler_worker-1] info club.throwable.jdbc.ordermessageservice - 处理订单[10087],状态由0更新为1 11:58:35.355 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql update 11:58:35.355 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.core.jdbctemplate - executing prepared sql statement [update t_order_message set order_status = ?,edit_time = ? where id =?] 11:58:35.355 [defaultquartzscheduler_worker-1] debug org.springframework.jdbc.datasource.datasourceutils - fetching jdbc connection from datasource 11:58:35.361 [defaultquartzscheduler_worker-1] info club.throwable.jdbc.ordermessageservice - 订单处理定时任务开始完毕...... 11:58:35.363 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.core.quartzschedulerthread - batch acquisition of 1 triggers 11:58:37.206 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.simpl.propertysettingjobfactory - producing instance of job 'delayjob.ordermessagedelayjob', class=club.throwable.jdbc.ordermessagedelayjob 11:58:37.206 [defaultquartzscheduler_quartzschedulerthread] debug org.quartz.core.quartzschedulerthread - batch acquisition of 0 triggers
rabbitmq死信队列
使用 rabbitmq
死信队列依赖于 rabbitmq
的两个特性: ttl
和 dlx
。
ttl
: time to live
,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。
dlx
: dead letter exchange
,死信交换器。
画个图描述一下这两个特性:
下面为了简单起见, ttl
使用了针对队列的维度。引入 rabbitmq
的java驱动:
<dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>5.7.3</version> <scope>test</scope> </dependency>
代码如下:
public class dlxmain { private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"); private static final logger logger = loggerfactory.getlogger(dlxmain.class); public static void main(string[] args) throws exception { connectionfactory factory = new connectionfactory(); connection connection = factory.newconnection(); channel producerchannel = connection.createchannel(); channel consumerchannel = connection.createchannel(); // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue producerchannel.exchangedeclare("dlx.exchange", "direct"); producerchannel.queuedeclare("dlx.queue", false, false, false, null); producerchannel.queuebind("dlx.queue", "dlx.exchange", "dlx.key"); map<string, object> queueargs = new hashmap<>(); // 设置队列消息过期时间,5秒 queueargs.put("x-message-ttl", 5000); // 指定dlx相关参数 queueargs.put("x-dead-letter-exchange", "dlx.exchange"); queueargs.put("x-dead-letter-routing-key", "dlx.key"); // 声明业务队列 producerchannel.queuedeclare("business.queue", false, false, false, queueargs); executorservice executorservice = executors.newsinglethreadexecutor(r -> { thread thread = new thread(r); thread.setdaemon(true); thread.setname("dlxconsumer"); return thread; }); // 启动消费者 executorservice.execute(() -> { try { consumerchannel.basicconsume("dlx.queue", true, new dlxconsumer(consumerchannel)); } catch (ioexception e) { logger.error(e.getmessage(), e); } }); ordermessage message = new ordermessage("10086"); producerchannel.basicpublish("", "business.queue", messageproperties.text_plain, message.getdescription().getbytes(standardcharsets.utf_8)); logger.info("发送消息成功,订单id:{}", message.getorderid()); message = new ordermessage("10087"); producerchannel.basicpublish("", "business.queue", messageproperties.text_plain, message.getdescription().getbytes(standardcharsets.utf_8)); logger.info("发送消息成功,订单id:{}", message.getorderid()); message = new ordermessage("10088"); producerchannel.basicpublish("", "business.queue", messageproperties.text_plain, message.getdescription().getbytes(standardcharsets.utf_8)); logger.info("发送消息成功,订单id:{}", message.getorderid()); thread.sleep(integer.max_value); } private static class dlxconsumer extends defaultconsumer { dlxconsumer(channel channel) { super(channel); } @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { logger.info("处理消息成功:{}", new string(body, standardcharsets.utf_8)); } } private static class ordermessage { private final string orderid; private final long timestamp; private final string description; ordermessage(string orderid) { this.orderid = orderid; this.timestamp = system.currenttimemillis(); this.description = string.format("订单[%s],订单创建时间为:%s", orderid, localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f)); } public string getorderid() { return orderid; } public long gettimestamp() { return timestamp; } public string getdescription() { return description; } } }
运行 main()
方法结果如下:
16:35:58.638 [main] info club.throwable.dlx.dlxmain - 发送消息成功,订单id:10086 16:35:58.641 [main] info club.throwable.dlx.dlxmain - 发送消息成功,订单id:10087 16:35:58.641 [main] info club.throwable.dlx.dlxmain - 发送消息成功,订单id:10088 16:36:03.646 [pool-1-thread-4] info club.throwable.dlx.dlxmain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-5] info club.throwable.dlx.dlxmain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-6] info club.throwable.dlx.dlxmain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58
时间轮
时间轮 timingwheel
是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:
这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用 netty
提供的 hashedwheeltimer
,引入依赖:
<dependency> <groupid>io.netty</groupid> <artifactid>netty-common</artifactid> <version>4.1.39.final</version> </dependency>
代码如下:
public class hashedwheeltimermain { private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss"); public static void main(string[] args) throws exception { atomicinteger counter = new atomicinteger(); threadfactory factory = r -> { thread thread = new thread(r); thread.setdaemon(true); thread.setname("hashedwheeltimerworker-" + counter.getandincrement()); return thread; }; // tickduration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位 // unit - tickduration的时间单位 // ticksperwhee - 时间轮中的槽位数 timer timer = new hashedwheeltimer(factory, 1, timeunit.seconds, 60); timertask timertask = new defaulttimertask("10086"); timer.newtimeout(timertask, 5, timeunit.seconds); timertask = new defaulttimertask("10087"); timer.newtimeout(timertask, 10, timeunit.seconds); timertask = new defaulttimertask("10088"); timer.newtimeout(timertask, 15, timeunit.seconds); thread.sleep(integer.max_value); } private static class defaulttimertask implements timertask { private final string orderid; private final long timestamp; public defaulttimertask(string orderid) { this.orderid = orderid; this.timestamp = system.currenttimemillis(); } @override public void run(timeout timeout) throws exception { system.out.println(string.format("任务执行时间:%s,订单创建时间:%s,订单id:%s", localdatetime.now().format(f), localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), orderid)); } } }
运行结果:
任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单id:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单id:10087
任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单id:10088
一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。
选用的方案实现过程
最终选用了基于 redis
的有序集合 sorted set
和 quartz
短轮询进行实现。具体方案是:
- 订单创建的时候,订单id和当前时间戳分别作为
sorted set
的member和score添加到订单队列sorted set
中。 - 订单创建的时候,订单id和推送内容
json
字符串分别作为field和value添加到订单队列内容hash
中。 - 第1步和第2步操作的时候用
lua
脚本保证原子性。 - 使用一个异步线程通过
sorted set
的命令zrevrangebyscore
弹出指定数量的订单id对应的订单队列内容hash
中的订单推送内容数据进行处理。
对于第4点处理有两种方案:
- 方案一:弹出订单内容数据的同时进行数据删除,也就是
zrevrangebyscore
、zrem
和hdel
命令要在同一个lua
脚本中执行,这样的话lua
脚本的编写难度大,并且由于弹出数据已经在redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。 - 方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列
sorted set
和订单队列内容hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
最终暂时选用了方案一,也就是从 sorted set
弹出订单id并且从 hash
中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:
这里先详细说明一下用到的 redis
命令。
sorted set相关命令
zadd
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
zadd key score1 value1.. scoren valuen
zrevrangebyscore
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
zrevrangebyscore key max min [withscores] [limit offset count]
max:分数区间 - 最大分数。 min:分数区间 - 最小分数。 withscores:可选参数,是否返回分数值,指定则会返回得分值。 limit:可选参数,offset和count原理和 mysql
的 limit offset,size
一致,如果不指定此参数则返回整个集合的数据。 zrem
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
zrem key member [member ...]
hash相关命令 hmset
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
hmset key_name field1 value1 ...fieldn valuen
hdel
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
hdel key_name field1.. fieldn
lua相关 加载 lua
脚本并且返回脚本的 sha-1
字符串: script load script
。 执行已经加载的 lua
脚本: evalsha sha1 numkeys key [key ...] arg [arg ...]
。 unpack
函数可以把 table
类型的参数转化为可变参数,不过需要注意的是 unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见 *
的提问 table.unpack() only returns the first element 。
ps:如果不熟悉lua语言,建议系统学习一下,因为想用好redis,一定离不开lua。
引入依赖:
<dependencymanagement> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-dependencies</artifactid> <version>2.1.7.release</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencymanagement> <dependencies> <dependency> <groupid>org.quartz-scheduler</groupid> <artifactid>quartz</artifactid> <version>2.3.1</version> </dependency> <dependency> <groupid>redis.clients</groupid> <artifactid>jedis</artifactid> <version>3.1.0</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-jdbc</artifactid> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context-support</artifactid> <version>5.1.9.release</version> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <version>1.18.8</version> <scope>provided</scope> </dependency> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> <version>1.2.59</version> </dependency> </dependencies>
编写 lua
脚本 /lua/enqueue.lua
和 /lua/dequeue.lua
:
-- /lua/enqueue.lua local zset_key = keys[1] local hash_key = keys[2] local zset_value = argv[1] local zset_score = argv[2] local hash_field = argv[3] local hash_value = argv[4] redis.call('zadd', zset_key, zset_score, zset_value) redis.call('hset', hash_key, hash_field, hash_value) return nil -- /lua/dequeue.lua -- 参考jesque的部分lua脚本实现 local zset_key = keys[1] local hash_key = keys[2] local min_score = argv[1] local max_score = argv[2] local offset = argv[3] local limit = argv[4] -- type命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next(redis.call('type', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('zrevrangebyscore', zset_key, max_score, min_score, 'limit', offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call('zrem', zset_key, unpack(list)) local result = redis.call('hmget', hash_key, unpack(list)) redis.call('hdel', hash_key, unpack(list)) return result end end end return nil
编写核心api代码:
// jedis提供者 @component public class jedisprovider implements initializingbean { private jedispool jedispool; @override public void afterpropertiesset() throws exception { jedispool = new jedispool(); } public jedis provide(){ return jedispool.getresource(); } } // ordermessage @data public class ordermessage { private string orderid; private bigdecimal amount; private long userid; } // 延迟队列接口 public interface orderdelayqueue { void enqueue(ordermessage message); list<ordermessage> dequeue(string min, string max, string offset, string limit); list<ordermessage> dequeue(); string enqueuesha(); string dequeuesha(); } // 延迟队列实现类 @requiredargsconstructor @component public class redisorderdelayqueue implements orderdelayqueue, initializingbean { private static final string min_score = "0"; private static final string offset = "0"; private static final string limit = "10"; private static final string order_queue = "order_queue"; private static final string order_detail_queue = "order_detail_queue"; private static final string enqueue_lua_script_location = "/lua/enqueue.lua"; private static final string dequeue_lua_script_location = "/lua/dequeue.lua"; private static final atomicreference<string> enqueue_lua_sha = new atomicreference<>(); private static final atomicreference<string> dequeue_lua_sha = new atomicreference<>(); private static final list<string> keys = lists.newarraylist(); private final jedisprovider jedisprovider; static { keys.add(order_queue); keys.add(order_detail_queue); } @override public void enqueue(ordermessage message) { list<string> args = lists.newarraylist(); args.add(message.getorderid()); args.add(string.valueof(system.currenttimemillis())); args.add(message.getorderid()); args.add(json.tojsonstring(message)); try (jedis jedis = jedisprovider.provide()) { jedis.evalsha(enqueue_lua_sha.get(), keys, args); } } @override public list<ordermessage> dequeue() { // 30分钟之前 string maxscore = string.valueof(system.currenttimemillis() - 30 * 60 * 1000); return dequeue(min_score, maxscore, offset, limit); } @suppresswarnings("unchecked") @override public list<ordermessage> dequeue(string min, string max, string offset, string limit) { list<string> args = new arraylist<>(); args.add(max); args.add(min); args.add(offset); args.add(limit); list<ordermessage> result = lists.newarraylist(); try (jedis jedis = jedisprovider.provide()) { list<string> eval = (list<string>) jedis.evalsha(dequeue_lua_sha.get(), keys, args); if (null != eval) { for (string e : eval) { result.add(json.parseobject(e, ordermessage.class)); } } } return result; } @override public string enqueuesha() { return enqueue_lua_sha.get(); } @override public string dequeuesha() { return dequeue_lua_sha.get(); } @override public void afterpropertiesset() throws exception { // 加载lua脚本 loadluascript(); } private void loadluascript() throws exception { try (jedis jedis = jedisprovider.provide()) { classpathresource resource = new classpathresource(enqueue_lua_script_location); string luacontent = streamutils.copytostring(resource.getinputstream(), standardcharsets.utf_8); string sha = jedis.scriptload(luacontent); enqueue_lua_sha.compareandset(null, sha); resource = new classpathresource(dequeue_lua_script_location); luacontent = streamutils.copytostring(resource.getinputstream(), standardcharsets.utf_8); sha = jedis.scriptload(luacontent); dequeue_lua_sha.compareandset(null, sha); } } public static void main(string[] as) throws exception { datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss"); jedisprovider jedisprovider = new jedisprovider(); jedisprovider.afterpropertiesset(); redisorderdelayqueue queue = new redisorderdelayqueue(jedisprovider); queue.afterpropertiesset(); // 写入测试数据 ordermessage message = new ordermessage(); message.setamount(bigdecimal.valueof(10086)); message.setorderid("order_id_10086"); message.setuserid(10086l); message.settimestamp(localdatetime.now().format(f)); list<string> args = lists.newarraylist(); args.add(message.getorderid()); // 测试需要,score设置为30分钟之前 args.add(string.valueof(system.currenttimemillis() - 30 * 60 * 1000)); args.add(message.getorderid()); args.add(json.tojsonstring(message)); try (jedis jedis = jedisprovider.provide()) { jedis.evalsha(enqueue_lua_sha.get(), keys, args); } list<ordermessage> dequeue = queue.dequeue(); system.out.println(dequeue); } }
这里先执行一次 main()
方法验证一下延迟队列是否生效:
[ordermessage(orderid=order_id_10086, amount=10086, userid=10086, timestamp=2019-08-21 08:32:22.885)]
确定延迟队列的代码没有问题,接着编写一个 quartz
的 job
类型的消费者 ordermessageconsumer
:
@disallowconcurrentexecution @component public class ordermessageconsumer implements job { private static final atomicinteger counter = new atomicinteger(); private static final executorservice business_worker_pool = executors.newfixedthreadpool(runtime.getruntime().availableprocessors(), r -> { thread thread = new thread(r); thread.setdaemon(true); thread.setname("ordermessageconsumerworker-" + counter.getandincrement()); return thread; }); private static final logger logger = loggerfactory.getlogger(ordermessageconsumer.class); @autowired private orderdelayqueue orderdelayqueue; @override public void execute(jobexecutioncontext jobexecutioncontext) throws jobexecutionexception { stopwatch stopwatch = new stopwatch(); stopwatch.start(); logger.info("订单消息处理定时任务开始执行......"); list<ordermessage> messages = orderdelayqueue.dequeue(); if (!messages.isempty()) { // 简单的列表等分放到线程池中执行 list<list<ordermessage>> partition = lists.partition(messages, 2); int size = partition.size(); final countdownlatch latch = new countdownlatch(size); for (list<ordermessage> p : partition) { business_worker_pool.execute(new consumetask(p, latch)); } try { latch.await(); } catch (interruptedexception ignore) { //ignore } } stopwatch.stop(); logger.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopwatch.gettotaltimemillis()); } @requiredargsconstructor private static class consumetask implements runnable { private final list<ordermessage> messages; private final countdownlatch latch; @override public void run() { try { // 实际上这里应该单条捕获异常 for (ordermessage message : messages) { logger.info("处理订单信息,内容:{}", message); } } finally { latch.countdown(); } } } }
上面的消费者设计的时候需要有以下考量:
- 使用
@disallowconcurrentexecution
注解不允许job
并发执行,其实多个job
并发执行意义不大,因为我们采用的是短间隔的轮询,而redis
是单线程处理命令,在客户端做多线程其实效果不佳。 - 线程池
business_worker_pool
的线程容量或者队列应该综合limit
值、等分订单信息列表中使用的size
值以及consumetask
里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。 -
consumetask
中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。
其他 quartz
相关的代码:
// quartz配置类 @configuration public class quartzautoconfiguration { @bean public schedulerfactorybean schedulerfactorybean(quartzautowiredjobfactory quartzautowiredjobfactory) { schedulerfactorybean factory = new schedulerfactorybean(); factory.setautostartup(true); factory.setjobfactory(quartzautowiredjobfactory); return factory; } @bean public quartzautowiredjobfactory quartzautowiredjobfactory() { return new quartzautowiredjobfactory(); } public static class quartzautowiredjobfactory extends adaptablejobfactory implements beanfactoryaware { private autowirecapablebeanfactory autowirecapablebeanfactory; @override public void setbeanfactory(beanfactory beanfactory) throws beansexception { this.autowirecapablebeanfactory = (autowirecapablebeanfactory) beanfactory; } @override protected object createjobinstance(triggerfiredbundle bundle) throws exception { object jobinstance = super.createjobinstance(bundle); // 这里利用autowirecapablebeanfactory从新建的job实例做一次自动装配,得到一个原型(prototype)的jobbean实例 autowirecapablebeanfactory.autowirebean(jobinstance); return jobinstance; } } }
这里暂时使用了内存态的 ramjobstore
去存放任务和触发器的相关信息,如果在生产环境最好替换成基于 mysql
也就是 jobstoretx
进行集群化,最后是启动函数和 commandlinerunner
的实现:
@springbootapplication(exclude = {datasourceautoconfiguration.class, transactionautoconfiguration.class}) public class application implements commandlinerunner { @autowired private scheduler scheduler; @autowired private jedisprovider jedisprovider; public static void main(string[] args) { springapplication.run(application.class, args); } @override public void run(string... args) throws exception { // 准备一些测试数据 prepareordermessagedata(); jobdetail job = jobbuilder.newjob(ordermessageconsumer.class) .withidentity("ordermessageconsumer", "delaytask") .build(); // 触发器5秒触发一次 trigger trigger = triggerbuilder.newtrigger() .withidentity("ordermessageconsumertrigger", "delaytask") .withschedule(simpleschedulebuilder.simpleschedule().withintervalinseconds(5).repeatforever()) .build(); scheduler.schedulejob(job, trigger); } private void prepareordermessagedata() throws exception { datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss"); try (jedis jedis = jedisprovider.provide()) { list<ordermessage> messages = lists.newarraylist(); for (int i = 0; i < 100; i++) { ordermessage message = new ordermessage(); message.setamount(bigdecimal.valueof(i)); message.setorderid("order_id_" + i); message.setuserid((long) i); message.settimestamp(localdatetime.now().format(f)); messages.add(message); } // 这里暂时不使用lua map<string, double> map = maps.newhashmap(); map<string, string> hash = maps.newhashmap(); for (ordermessage message : messages) { // 故意把score设计成30分钟前 map.put(message.getorderid(), double.valueof(string.valueof(system.currenttimemillis() - 30 * 60 * 1000))); hash.put(message.getorderid(), json.tojsonstring(message)); } jedis.zadd("order_queue", map); jedis.hmset("order_detail_queue", hash); } } }
输出结果如下:
2019-08-21 22:45:59.518 info 33000 --- [rybean_worker-1] club.throwable.ordermessageconsumer : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525 info 33000 --- [onsumerworker-4] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_91, amount=91, userid=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 info 33000 --- [onsumerworker-2] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_95, amount=95, userid=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 info 33000 --- [onsumerworker-1] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_97, amount=97, userid=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 info 33000 --- [onsumerworker-0] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_99, amount=99, userid=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 info 33000 --- [onsumerworker-3] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_93, amount=93, userid=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 info 33000 --- [onsumerworker-2] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_94, amount=94, userid=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 info 33000 --- [onsumerworker-1] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_96, amount=96, userid=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 info 33000 --- [onsumerworker-3] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_92, amount=92, userid=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 info 33000 --- [onsumerworker-0] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_98, amount=98, userid=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 info 33000 --- [onsumerworker-4] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_90, amount=90, userid=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540 info 33000 --- [rybean_worker-1] club.throwable.ordermessageconsumer : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515 info 33000 --- [rybean_worker-2] club.throwable.ordermessageconsumer : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-5] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_89, amount=89, userid=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-6] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_87, amount=87, userid=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-7] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_85, amount=85, userid=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-5] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_88, amount=88, userid=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-2] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_83, amount=83, userid=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-1] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_81, amount=81, userid=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-6] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_86, amount=86, userid=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-2] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_82, amount=82, userid=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-7] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_84, amount=84, userid=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [onsumerworker-1] club.throwable.ordermessageconsumer : 处理订单信息,内容:ordermessage(orderid=order_id_80, amount=80, userid=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 info 33000 --- [rybean_worker-2] club.throwable.ordermessageconsumer : 订单消息处理定时任务执行完毕,耗时:1 ms......
......
首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:
- 切换
jobstore
为jdbc
模式,quartz
官方有完整教程,或者看笔者之前翻译的quartz
文档。 - 需要监控或者收集任务的执行状态,添加预警等等。
这里其实有一个性能隐患,命令 zrevrangebyscore
的时间复杂度可以视为为 o(n)
, n
是集合的元素个数,由于这里把所有的订单信息都放进了同一个 sorted set
( order_queue
)中,所以在一直有新增数据的时候, dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。
小结
这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于 redis
和 quartz
给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:
- 分片。
- 监控。
还有一点, 架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码 。
以上所述是小编给大家介绍的使用redis实现延时任务的解决方案,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧!
上一篇: 数据库概述与入门
下一篇: JavaSE实训第一天