史上最全的延迟任务实现方式汇总!附代码(强烈推荐)
这篇文章的诞生要感谢一位读者,是他让这篇优秀的文章有了和大家见面的机会,重点是优秀文章,哈哈。
事情的经过是这样的...
不用谢我,送人玫瑰,手有余香。相信接下来的内容一定不会让你失望,因为它将是目前市面上最好的关于“延迟任务”的文章,这也一直是我写作追求的目标,让我的每一篇文章都比市面上的好那么一点点。
好了,话不多说,直接进入今天的主题,本文的主要内容如下图所示:
什么是延迟任务?
顾明思议,我们把需要延迟执行的任务叫做延迟任务。
延迟任务的使用场景有以下这些:
- 红包 24 小时未被查收,需要延迟执退还业务;
- 每个月账单日,需要给用户发送当月的对账单;
- 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。
等事件都需要使用延迟任务。
延迟任务实现思路分析
延迟任务实现的关键是在某个时间节点执行某个任务。基于这个信息我们可以想到实现延迟任务的手段有以下两个:
- 自己手写一个“死循环”一直判断当前时间节点有没有要执行的任务;
- 借助 jdk 或者第三方提供的工具类来实现延迟任务。
而通过 jdk 实现延迟任务我们能想到的关键词是:delayqueue、scheduledexecutorservice,而第三方提供的延迟任务执行方法就有很多了,例如:redis、netty、mq 等手段。
延迟任务实现
下面我们将结合代码来讲解每种延迟任务的具体实现。
1.无限循环实现延迟任务
此方式我们需要开启一个无限循环一直扫描任务,然后使用一个 map 集合用来存储任务和延迟执行的时间,实现代码如下:
import java.time.instant; import java.time.localdatetime; import java.util.hashmap; import java.util.iterator; import java.util.map; /** * 延迟任务执行方法汇总 */ public class delaytaskexample { // 存放定时任务 private static map<string, long> _taskmap = new hashmap<>(); public static void main(string[] args) { system.out.println("程序启动时间:" + localdatetime.now()); // 添加定时任务 _taskmap.put("task-1", instant.now().plusseconds(3).toepochmilli()); // 延迟 3s // 调用无限循环实现延迟任务 looptask(); } /** * 无限循环实现延迟任务 */ public static void looptask() { long itemlong = 0l; while (true) { iterator it = _taskmap.entryset().iterator(); while (it.hasnext()) { map.entry entry = (map.entry) it.next(); itemlong = (long) entry.getvalue(); // 有任务需要执行 if (instant.now().toepochmilli() >= itemlong) { // 延迟任务,业务逻辑执行 system.out.println("执行任务:" + entry.getkey() + " ,执行时间:" + localdatetime.now()); // 删除任务 _taskmap.remove(entry.getkey()); } } } } }
以上程序执行的结果为:
程序启动时间:2020-04-12t18:51:28.188
执行任务:task-1 ,执行时间:2020-04-12t18:51:31.189
可以看出任务延迟了 3s 钟执行了,符合我们的预期。
2.java api 实现延迟任务
java api 提供了两种实现延迟任务的方法:delayqueue 和 scheduledexecutorservice。
① scheduledexecutorservice 实现延迟任务
我们可以使用 scheduledexecutorservice 来以固定的频率一直执行任务,实现代码如下:
public class delaytaskexample { public static void main(string[] args) { system.out.println("程序启动时间:" + localdatetime.now()); scheduledexecutorservicetask(); } /** * scheduledexecutorservice 实现固定频率一直循环执行任务 */ public static void scheduledexecutorservicetask() { scheduledexecutorservice executor = executors.newscheduledthreadpool(1); executor.schedulewithfixeddelay( new runnable() { @override public void run() { // 执行任务的业务代码 system.out.println("执行任务" + " ,执行时间:" + localdatetime.now()); } }, 2, // 初次执行间隔 2, // 2s 执行一次 timeunit.seconds); } }
以上程序执行的结果为:
程序启动时间:2020-04-12t21:28:10.416
执行任务 ,执行时间:2020-04-12t21:28:12.421
执行任务 ,执行时间:2020-04-12t21:28:14.422
......
可以看出使用 scheduledexecutorservice#schedulewithfixeddelay(...) 方法之后,会以某个频率一直循环执行延迟任务。
② delayqueue 实现延迟任务
delayqueue 是一个支持延时获取元素的*阻塞队列,队列中的元素必须实现 delayed 接口,并重写 getdelay(timeunit) 和 compareto(delayed) 方法,delayqueue 实现延迟队列的完整代码如下:
public class delaytest { public static void main(string[] args) throws interruptedexception { delayqueue delayqueue = new delayqueue(); // 添加延迟任务 delayqueue.put(new delayelement(1000)); delayqueue.put(new delayelement(3000)); delayqueue.put(new delayelement(5000)); system.out.println("开始时间:" + dateformat.getdatetimeinstance().format(new date())); while (!delayqueue.isempty()){ // 执行延迟任务 system.out.println(delayqueue.take()); } system.out.println("结束时间:" + dateformat.getdatetimeinstance().format(new date())); } static class delayelement implements delayed { // 延迟截止时间(单面:毫秒) long delaytime = system.currenttimemillis(); public delayelement(long delaytime) { this.delaytime = (this.delaytime + delaytime); } @override // 获取剩余时间 public long getdelay(timeunit unit) { return unit.convert(delaytime - system.currenttimemillis(), timeunit.milliseconds); } @override // 队列里元素的排序依据 public int compareto(delayed o) { if (this.getdelay(timeunit.milliseconds) > o.getdelay(timeunit.milliseconds)) { return 1; } else if (this.getdelay(timeunit.milliseconds) < o.getdelay(timeunit.milliseconds)) { return -1; } else { return 0; } } @override public string tostring() { return dateformat.getdatetimeinstance().format(new date(delaytime)); } } }
以上程序执行的结果为:
开始时间:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
结束时间:2020-4-12 20:40:43
3.redis 实现延迟任务
使用 redis 实现延迟任务的方法大体可分为两类:通过 zset 数据判断的方式,和通过键空间通知的方式。
① 通过数据判断的方式
我们借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 jedis 框架):
import redis.clients.jedis.jedis; import utils.jedisutils; import java.time.instant; import java.util.set; public class delayqueueexample { // zset key private static final string _key = "mydelayqueue"; public static void main(string[] args) throws interruptedexception { jedis jedis = jedisutils.getjedis(); // 延迟 30s 执行(30s 后的时间) long delaytime = instant.now().plusseconds(30).getepochsecond(); jedis.zadd(_key, delaytime, "order_1"); // 继续添加测试数据 jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_2"); jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_3"); jedis.zadd(_key, instant.now().plusseconds(7).getepochsecond(), "order_4"); jedis.zadd(_key, instant.now().plusseconds(10).getepochsecond(), "order_5"); // 开启延迟队列 dodelayqueue(jedis); } /** * 延迟队列消费 * @param jedis redis 客户端 */ public static void dodelayqueue(jedis jedis) throws interruptedexception { while (true) { // 当前时间 instant nowinstant = instant.now(); long lastsecond = nowinstant.plusseconds(-1).getepochsecond(); // 上一秒时间 long nowsecond = nowinstant.getepochsecond(); // 查询当前时间的所有任务 set<string> data = jedis.zrangebyscore(_key, lastsecond, nowsecond); for (string item : data) { // 消费任务 system.out.println("消费:" + item); } // 删除已经执行的任务 jedis.zremrangebyscore(_key, lastsecond, nowsecond); thread.sleep(1000); // 每秒轮询一次 } } }
② 通过键空间通知
默认情况下 redis 服务器端是不开启键空间通知的,需要我们通过 config set notify-keyspace-events ex
的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:
import redis.clients.jedis.jedis; import redis.clients.jedis.jedispubsub; import utils.jedisutils; public class taskexample { public static final string _topic = "__keyevent@0__:expired"; // 订阅频道名称 public static void main(string[] args) { jedis jedis = jedisutils.getjedis(); // 执行定时任务 dotask(jedis); } /** * 订阅过期消息,执行定时任务 * @param jedis redis 客户端 */ public static void dotask(jedis jedis) { // 订阅过期消息 jedis.psubscribe(new jedispubsub() { @override public void onpmessage(string pattern, string channel, string message) { // 接收到消息,执行定时任务 system.out.println("收到消息:" + message); } }, _topic); } }
4.netty 实现延迟任务
netty 是由 jboss 提供的一个 java 开源框架,它是一个基于 nio 的客户、服务器端的编程框架,使用 netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 tcp 和 udp 的 socket 服务开发。
可以使用 netty 提供的工具类 hashedwheeltimer 来实现延迟任务,实现代码如下。
首先在项目中添加 netty 引用,配置如下:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency> <groupid>io.netty</groupid> <artifactid>netty-common</artifactid> <version>4.1.48.final</version> </dependency>
netty 实现的完整代码如下:
public class delaytaskexample { public static void main(string[] args) { system.out.println("程序启动时间:" + localdatetime.now()); nettytask(); } /** * 基于 netty 的延迟任务 */ private static void nettytask() { // 创建延迟任务实例 hashedwheeltimer timer = new hashedwheeltimer(3, // 时间间隔 timeunit.seconds, 100); // 时间轮中的槽数 // 创建一个任务 timertask task = new timertask() { @override public void run(timeout timeout) throws exception { system.out.println("执行任务" + " ,执行时间:" + localdatetime.now()); } }; // 将任务添加到延迟队列中 timer.newtimeout(task, 0, timeunit.seconds); } }
以上程序执行的结果为:
程序启动时间:2020-04-13t10:16:23.033
执行任务 ,执行时间:2020-04-13t10:16:26.118
hashedwheeltimer 是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
(图片来源于网络)
以上的图片可以理解为,时间轮大小为 8,某个时间转一格(例如 1s),每格指向一个链表,保存着待执行的任务。
5.mq 实现延迟任务
如果专门开启一个 mq 中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了 mq 环境的话,用它来实现延迟任务的话,还是可取的。
几乎所有的 mq 中间件都可以实现延迟任务,在这里更准确的叫法应该叫延队列。本文就使用 rabbitmq 为例,来看它是如何实现延迟任务的。
rabbitmq 实现延迟队列的方式有两种:
- 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
- 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。
注意: 延迟插件 rabbitmq-delayed-message-exchange 是在 rabbitmq 3.5.7 及以上的版本才支持的,依赖 erlang/opt 18.0 及以上运行环境。
由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列的功能。
首先,我们需要下载并安装 rabbitmq-delayed-message-exchange 插件,下载地址:
选择相应的对应的版本进行下载,然后拷贝到 rabbitmq 服务器目录,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件,在使用命令 rabbitmq-plugins list
查询安装的所有插件,安装成功如下图所示:
最后重启 rabbitmq 服务,使插件生效。
首先,我们先要配置消息队列,实现代码如下:
import com.example.rabbitmq.mq.directconfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.util.hashmap; import java.util.map; @configuration public class delayedconfig { final static string queue_name = "delayed.goods.order"; final static string exchange_name = "delayedec"; @bean public queue queue() { return new queue(delayedconfig.queue_name); } // 配置默认的交换机 @bean customexchange customexchange() { map<string, object> args = new hashmap<>(); args.put("x-delayed-type", "direct"); //参数二为类型:必须是x-delayed-message return new customexchange(delayedconfig.exchange_name, "x-delayed-message", true, false, args); } // 绑定队列到交换器 @bean binding binding(queue queue, customexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(delayedconfig.queue_name).noargs(); } }
然后添加增加消息的代码,具体实现如下:
import org.springframework.amqp.amqpexception; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagepostprocessor; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.text.simpledateformat; import java.util.date; @component public class delayedsender { @autowired private amqptemplate rabbittemplate; public void send(string msg) { simpledateformat sf = new simpledateformat("yyyy-mm-dd hh:mm:ss"); system.out.println("发送时间:" + sf.format(new date())); rabbittemplate.convertandsend(delayedconfig.exchange_name, delayedconfig.queue_name, msg, new messagepostprocessor() { @override public message postprocessmessage(message message) throws amqpexception { message.getmessageproperties().setheader("x-delay", 3000); return message; } }); } }
再添加消费消息的代码:
import org.springframework.amqp.rabbit.annotation.rabbithandler; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.stereotype.component; import java.text.simpledateformat; import java.util.date; @component @rabbitlistener(queues = "delayed.goods.order") public class delayedreceiver { @rabbithandler public void process(string msg) { simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss"); system.out.println("接收时间:" + sdf.format(new date())); system.out.println("消息内容:" + msg); } }
最后,我们使用代码测试一下:
import com.example.rabbitmq.rabbitmqapplication; import com.example.rabbitmq.mq.delayed.delayedsender; import org.junit.test; import org.junit.runner.runwith; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.junit4.springrunner; import java.text.simpledateformat; import java.util.date; @runwith(springrunner.class) @springboottest public class delayedtest { @autowired private delayedsender sender; @test public void test() throws interruptedexception { simpledateformat sf = new simpledateformat("yyyy-mm-dd"); sender.send("hi admin."); thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试 } }
以上程序的执行结果如下:
发送时间:2020-04-13 20:47:51
接收时间:2020-04-13 20:47:54
消息内容:hi admin.
从结果可以看出,以上程序执行符合延迟任务的实现预期。
6.使用 spring 定时任务
如果你使用的是 spring 或 springboot 的项目的话,可以使用借助 scheduled 来实现,本文将使用 springboot 项目来演示 scheduled 的实现,实现我们需要声明开启 scheduled,实现代码如下:
@springbootapplication @enablescheduling public class application { public static void main(string[] args) { springapplication.run(application.class, args); } }
然后添加延迟任务,实现代码如下:
@component public class schedulejobs { @scheduled(fixeddelay = 2 * 1000) public void fixeddelayjob() throws interruptedexception { system.out.println("任务执行,时间:" + localdatetime.now()); } }
此时当我们启动项目之后就可以看到任务以延迟了 2s 的形式一直循环执行,结果如下:
任务执行,时间:2020-04-13t14:07:53.349
任务执行,时间:2020-04-13t14:07:55.350
任务执行,时间:2020-04-13t14:07:57.351
...
我们也可以使用 corn 表达式来定义任务执行的频率,例如使用 @scheduled(cron = "0/4 * * * * ?")
。
7.quartz 实现延迟任务
quartz 是一款功能强大的任务调度器,可以实现较为复杂的调度功能,它还支持分布式的任务调度。
我们使用 quartz 来实现一个延迟任务,首先定义一个执行任务代码如下:
import org.quartz.jobexecutioncontext; import org.quartz.jobexecutionexception; import org.springframework.scheduling.quartz.quartzjobbean; import java.time.localdatetime; public class samplejob extends quartzjobbean { @override protected void executeinternal(jobexecutioncontext jobexecutioncontext) throws jobexecutionexception { system.out.println("任务执行,时间:" + localdatetime.now()); } }
在定义一个 jobdetail 和 trigger 实现代码如下:
import org.quartz.*; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class samplescheduler { @bean public jobdetail samplejobdetail() { return jobbuilder.newjob(samplejob.class).withidentity("samplejob") .storedurably().build(); } @bean public trigger samplejobtrigger() { // 3s 后执行 simpleschedulebuilder schedulebuilder = simpleschedulebuilder.simpleschedule().withintervalinseconds(3).withrepeatcount(1); return triggerbuilder.newtrigger().forjob(samplejobdetail()).withidentity("sampletrigger") .withschedule(schedulebuilder).build(); } }
最后在 springboot 项目启动之后开启延迟任务,实现代码如下:
import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.commandlinerunner; import org.springframework.scheduling.quartz.schedulerfactorybean; /** * springboot 项目启动后执行 */ public class mystartuprunner implements commandlinerunner { @autowired private schedulerfactorybean schedulerfactorybean; @autowired private samplescheduler samplescheduler; @override public void run(string... args) throws exception { // 启动定时任务 schedulerfactorybean.getscheduler().schedulejob( samplescheduler.samplejobtrigger()); } }
以上程序的执行结果如下:
2020-04-13 19:02:12.331 info 17768 --- [ restartedmain] com.example.demo.demoapplication : started demoapplication in 1.815 seconds (jvm running for 3.088)
任务执行,时间:2020-04-13t19:02:15.019
从结果可以看出在项目启动 3s 之后执行了延迟任务。
总结
本文讲了延迟任务的使用场景,以及延迟任务的 10 种实现方式:
- 手动无线循环;
- scheduledexecutorservice;
- delayqueue;
- redis zset 数据判断的方式;
- redis 键空间通知的方式;
- netty 提供的 hashedwheeltimer 工具类;
- rabbitmq 死信队列;
- rabbitmq 延迟消息插件 rabbitmq-delayed-message-exchange;
- spring scheduled;
- quartz。
最后的话
俗话说:台上一分钟,台下十年功。本文的所有内容皆为作者多年工作积累的结晶,以及熬夜呕心沥血的整理,如果觉得本文有帮助到你,请帮我分享出去,让更多的人看到,谢谢你。
本文由博客一文多发平台 openwrite 发布!