欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

程序员文章站 2022-09-27 08:58:20
这篇文章的诞生要感谢一位读者,是他让这篇 优秀的文章 有了和大家见面的机会,重点是 优秀文章 ,哈哈。 事情的经过是这样的... 不用谢我,送人玫瑰,手有余香。 相信接下来的内容一定不会让你失望,因为它将是目前市面上最好的关于“延迟任务”的文章 ,这也一直是我写作追求的目标,让我的每一篇文章都比市面 ......

这篇文章的诞生要感谢一位读者,是他让这篇优秀的文章有了和大家见面的机会,重点是优秀文章,哈哈。

事情的经过是这样的...

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

不用谢我,送人玫瑰,手有余香。相信接下来的内容一定不会让你失望,因为它将是目前市面上最好的关于“延迟任务”的文章,这也一直是我写作追求的目标,让我的每一篇文章都比市面上的好那么一点点。

好了,话不多说,直接进入今天的主题,本文的主要内容如下图所示:
史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

什么是延迟任务?

顾明思议,我们把需要延迟执行的任务叫做延迟任务

延迟任务的使用场景有以下这些:

  1. 红包 24 小时未被查收,需要延迟执退还业务;
  2. 每个月账单日,需要给用户发送当月的对账单;
  3. 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。

等事件都需要使用延迟任务。

延迟任务实现思路分析

延迟任务实现的关键是在某个时间节点执行某个任务。基于这个信息我们可以想到实现延迟任务的手段有以下两个:

  1. 自己手写一个“死循环”一直判断当前时间节点有没有要执行的任务;
  2. 借助 jdk 或者第三方提供的工具类来实现延迟任务。

而通过 jdk 实现延迟任务我们能想到的关键词是:delayqueue、scheduledexecutorservice,而第三方提供的延迟任务执行方法就有很多了,例如:redis、netty、mq 等手段。

延迟任务实现

下面我们将结合代码来讲解每种延迟任务的具体实现。

1.无限循环实现延迟任务

此方式我们需要开启一个无限循环一直扫描任务,然后使用一个 map 集合用来存储任务和延迟执行的时间,实现代码如下:

import java.time.instant;
import java.time.localdatetime;
import java.util.hashmap;
import java.util.iterator;
import java.util.map;

/**
 * 延迟任务执行方法汇总
 */
public class delaytaskexample {
    // 存放定时任务
    private static map<string, long> _taskmap = new hashmap<>();

    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        // 添加定时任务
        _taskmap.put("task-1", instant.now().plusseconds(3).toepochmilli()); // 延迟 3s

        // 调用无限循环实现延迟任务
        looptask();
    }

    /**
     * 无限循环实现延迟任务
     */
    public static void looptask() {
        long itemlong = 0l;
        while (true) {
            iterator it = _taskmap.entryset().iterator();
            while (it.hasnext()) {
                map.entry entry = (map.entry) it.next();
                itemlong = (long) entry.getvalue();
                // 有任务需要执行
                if (instant.now().toepochmilli() >= itemlong) {
                    // 延迟任务,业务逻辑执行
                    system.out.println("执行任务:" + entry.getkey() +
                            " ,执行时间:" + localdatetime.now());
                    // 删除任务
                    _taskmap.remove(entry.getkey());
                }
            }
        }
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12t18:51:28.188

执行任务:task-1 ,执行时间:2020-04-12t18:51:31.189

可以看出任务延迟了 3s 钟执行了,符合我们的预期。

2.java api 实现延迟任务

java api 提供了两种实现延迟任务的方法:delayqueue 和 scheduledexecutorservice。

① scheduledexecutorservice 实现延迟任务

我们可以使用 scheduledexecutorservice 来以固定的频率一直执行任务,实现代码如下:

public class delaytaskexample {
    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        scheduledexecutorservicetask();
    }

    /**
     * scheduledexecutorservice 实现固定频率一直循环执行任务
     */
    public static void scheduledexecutorservicetask() {
        scheduledexecutorservice executor = executors.newscheduledthreadpool(1);
        executor.schedulewithfixeddelay(
                new runnable() {
                    @override
                    public void run() {
                        // 执行任务的业务代码
                        system.out.println("执行任务" +
                                " ,执行时间:" + localdatetime.now());
                    }
                },
                2, // 初次执行间隔
                2, // 2s 执行一次
                timeunit.seconds);
    }
}

以上程序执行的结果为:

程序启动时间:2020-04-12t21:28:10.416

执行任务 ,执行时间:2020-04-12t21:28:12.421

执行任务 ,执行时间:2020-04-12t21:28:14.422

......

可以看出使用 scheduledexecutorservice#schedulewithfixeddelay(...) 方法之后,会以某个频率一直循环执行延迟任务。

② delayqueue 实现延迟任务

delayqueue 是一个支持延时获取元素的*阻塞队列,队列中的元素必须实现 delayed 接口,并重写 getdelay(timeunit) 和 compareto(delayed) 方法,delayqueue 实现延迟队列的完整代码如下:

public class delaytest {
    public static void main(string[] args) throws interruptedexception {
        delayqueue delayqueue = new delayqueue();
        // 添加延迟任务
        delayqueue.put(new delayelement(1000));
        delayqueue.put(new delayelement(3000));
        delayqueue.put(new delayelement(5000));
        system.out.println("开始时间:" +  dateformat.getdatetimeinstance().format(new date()));
        while (!delayqueue.isempty()){
            // 执行延迟任务
            system.out.println(delayqueue.take());
        }
        system.out.println("结束时间:" +  dateformat.getdatetimeinstance().format(new date()));
    }

    static class delayelement implements delayed {
        // 延迟截止时间(单面:毫秒)
        long delaytime = system.currenttimemillis();
        public delayelement(long delaytime) {
            this.delaytime = (this.delaytime + delaytime);
        }
        @override
        // 获取剩余时间
        public long getdelay(timeunit unit) {
            return unit.convert(delaytime - system.currenttimemillis(), timeunit.milliseconds);
        }
        @override
        // 队列里元素的排序依据
        public int compareto(delayed o) {
            if (this.getdelay(timeunit.milliseconds) > o.getdelay(timeunit.milliseconds)) {
                return 1;
            } else if (this.getdelay(timeunit.milliseconds) < o.getdelay(timeunit.milliseconds)) {
                return -1;
            } else {
                return 0;
            }
        }
        @override
        public string tostring() {
            return dateformat.getdatetimeinstance().format(new date(delaytime));
        }
    }
}

以上程序执行的结果为:

开始时间:2020-4-12 20:40:38

2020-4-12 20:40:39

2020-4-12 20:40:41

2020-4-12 20:40:43

结束时间:2020-4-12 20:40:43

3.redis 实现延迟任务

使用 redis 实现延迟任务的方法大体可分为两类:通过 zset 数据判断的方式,和通过键空间通知的方式

① 通过数据判断的方式

我们借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 jedis 框架):

import redis.clients.jedis.jedis;
import utils.jedisutils;
import java.time.instant;
import java.util.set;

public class delayqueueexample {
    // zset key
    private static final string _key = "mydelayqueue";
    
    public static void main(string[] args) throws interruptedexception {
        jedis jedis = jedisutils.getjedis();
        // 延迟 30s 执行(30s 后的时间)
        long delaytime = instant.now().plusseconds(30).getepochsecond();
        jedis.zadd(_key, delaytime, "order_1");
        // 继续添加测试数据
        jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_2");
        jedis.zadd(_key, instant.now().plusseconds(2).getepochsecond(), "order_3");
        jedis.zadd(_key, instant.now().plusseconds(7).getepochsecond(), "order_4");
        jedis.zadd(_key, instant.now().plusseconds(10).getepochsecond(), "order_5");
        // 开启延迟队列
        dodelayqueue(jedis);
    }

    /**
     * 延迟队列消费
     * @param jedis redis 客户端
     */
    public static void dodelayqueue(jedis jedis) throws interruptedexception {
        while (true) {
            // 当前时间
            instant nowinstant = instant.now();
            long lastsecond = nowinstant.plusseconds(-1).getepochsecond(); // 上一秒时间
            long nowsecond = nowinstant.getepochsecond();
            // 查询当前时间的所有任务
            set<string> data = jedis.zrangebyscore(_key, lastsecond, nowsecond);
            for (string item : data) {
                // 消费任务
                system.out.println("消费:" + item);
            }
            // 删除已经执行的任务
            jedis.zremrangebyscore(_key, lastsecond, nowsecond);
            thread.sleep(1000); // 每秒轮询一次
        }
    }
}

② 通过键空间通知

默认情况下 redis 服务器端是不开启键空间通知的,需要我们通过 config set notify-keyspace-events ex 的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:

import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispubsub;
import utils.jedisutils;

public class taskexample {
    public static final string _topic = "__keyevent@0__:expired"; // 订阅频道名称
    public static void main(string[] args) {
        jedis jedis = jedisutils.getjedis();
        // 执行定时任务
        dotask(jedis);
    }

    /**
     * 订阅过期消息,执行定时任务
     * @param jedis redis 客户端
     */
    public static void dotask(jedis jedis) {
        // 订阅过期消息
        jedis.psubscribe(new jedispubsub() {
            @override
            public void onpmessage(string pattern, string channel, string message) {
                // 接收到消息,执行定时任务
                system.out.println("收到消息:" + message);
            }
        }, _topic);
    }
}

4.netty 实现延迟任务

netty 是由 jboss 提供的一个 java 开源框架,它是一个基于 nio 的客户、服务器端的编程框架,使用 netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 tcp 和 udp 的 socket 服务开发。

可以使用 netty 提供的工具类 hashedwheeltimer 来实现延迟任务,实现代码如下。

首先在项目中添加 netty 引用,配置如下:

<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
    <groupid>io.netty</groupid>
    <artifactid>netty-common</artifactid>
    <version>4.1.48.final</version>
</dependency>

netty 实现的完整代码如下:

public class delaytaskexample {
    public static void main(string[] args) {
        system.out.println("程序启动时间:" + localdatetime.now());
        nettytask();
    }

    /**
     * 基于 netty 的延迟任务
     */
    private static void nettytask() {
        // 创建延迟任务实例
        hashedwheeltimer timer = new hashedwheeltimer(3, // 时间间隔
                timeunit.seconds,
                100); // 时间轮中的槽数
        // 创建一个任务
        timertask task = new timertask() {
            @override
            public void run(timeout timeout) throws exception {
                system.out.println("执行任务" +
                        " ,执行时间:" + localdatetime.now());
            }
        };
        // 将任务添加到延迟队列中
        timer.newtimeout(task, 0, timeunit.seconds);

    }
}

以上程序执行的结果为:

程序启动时间:2020-04-13t10:16:23.033

执行任务 ,执行时间:2020-04-13t10:16:26.118

hashedwheeltimer 是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
史上最全的延迟任务实现方式汇总!附代码(强烈推荐)
(图片来源于网络)

以上的图片可以理解为,时间轮大小为 8,某个时间转一格(例如 1s),每格指向一个链表,保存着待执行的任务。

5.mq 实现延迟任务

如果专门开启一个 mq 中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了 mq 环境的话,用它来实现延迟任务的话,还是可取的。

几乎所有的 mq 中间件都可以实现延迟任务,在这里更准确的叫法应该叫延队列。本文就使用 rabbitmq 为例,来看它是如何实现延迟任务的。

rabbitmq 实现延迟队列的方式有两种:

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  • 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。

注意: 延迟插件 rabbitmq-delayed-message-exchange 是在 rabbitmq 3.5.7 及以上的版本才支持的,依赖 erlang/opt 18.0 及以上运行环境。

由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列的功能。

首先,我们需要下载并安装 rabbitmq-delayed-message-exchange 插件,下载地址:

选择相应的对应的版本进行下载,然后拷贝到 rabbitmq 服务器目录,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件,在使用命令 rabbitmq-plugins list 查询安装的所有插件,安装成功如下图所示:

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

最后重启 rabbitmq 服务,使插件生效。

首先,我们先要配置消息队列,实现代码如下:

import com.example.rabbitmq.mq.directconfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.util.hashmap;
import java.util.map;

@configuration
public class delayedconfig {
    final static string queue_name = "delayed.goods.order";
    final static string exchange_name = "delayedec";
    @bean
    public queue queue() {
        return new queue(delayedconfig.queue_name);
    }

    // 配置默认的交换机
    @bean
    customexchange customexchange() {
        map<string, object> args = new hashmap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new customexchange(delayedconfig.exchange_name, "x-delayed-message", true, false, args);
    }
    // 绑定队列到交换器
    @bean
    binding binding(queue queue, customexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delayedconfig.queue_name).noargs();
    }
}

然后添加增加消息的代码,具体实现如下:

import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.text.simpledateformat;
import java.util.date;

@component
public class delayedsender {
    @autowired
    private amqptemplate rabbittemplate;

    public void send(string msg) {
        simpledateformat sf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
        system.out.println("发送时间:" + sf.format(new date()));

        rabbittemplate.convertandsend(delayedconfig.exchange_name, delayedconfig.queue_name, msg, new messagepostprocessor() {
            @override
            public message postprocessmessage(message message) throws amqpexception {
                message.getmessageproperties().setheader("x-delay", 3000);
                return message;
            }
        });
    }
}

再添加消费消息的代码:

import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.text.simpledateformat;
import java.util.date;

@component
@rabbitlistener(queues = "delayed.goods.order")
public class delayedreceiver {
    @rabbithandler
    public void process(string msg) {
        simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
        system.out.println("接收时间:" + sdf.format(new date()));
        system.out.println("消息内容:" + msg);
    }
}

最后,我们使用代码测试一下:

import com.example.rabbitmq.rabbitmqapplication;
import com.example.rabbitmq.mq.delayed.delayedsender;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;

import java.text.simpledateformat;
import java.util.date;

@runwith(springrunner.class)
@springboottest
public class delayedtest {

    @autowired
    private delayedsender sender;

    @test
    public void test() throws interruptedexception {
        simpledateformat sf = new simpledateformat("yyyy-mm-dd");
        sender.send("hi admin.");
        thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }
}

以上程序的执行结果如下:

发送时间:2020-04-13 20:47:51

接收时间:2020-04-13 20:47:54

消息内容:hi admin.

从结果可以看出,以上程序执行符合延迟任务的实现预期。

6.使用 spring 定时任务

如果你使用的是 spring 或 springboot 的项目的话,可以使用借助 scheduled 来实现,本文将使用 springboot 项目来演示 scheduled 的实现,实现我们需要声明开启 scheduled,实现代码如下:

@springbootapplication
@enablescheduling
public class application {
    public static void main(string[] args) {
        springapplication.run(application.class, args);
    }
}

然后添加延迟任务,实现代码如下:

@component
public class schedulejobs {
    @scheduled(fixeddelay = 2 * 1000)
    public void fixeddelayjob() throws interruptedexception {
        system.out.println("任务执行,时间:" + localdatetime.now());
    }
}

此时当我们启动项目之后就可以看到任务以延迟了 2s 的形式一直循环执行,结果如下:

任务执行,时间:2020-04-13t14:07:53.349

任务执行,时间:2020-04-13t14:07:55.350

任务执行,时间:2020-04-13t14:07:57.351

...

我们也可以使用 corn 表达式来定义任务执行的频率,例如使用 @scheduled(cron = "0/4 * * * * ?") 。

7.quartz 实现延迟任务

quartz 是一款功能强大的任务调度器,可以实现较为复杂的调度功能,它还支持分布式的任务调度。

我们使用 quartz 来实现一个延迟任务,首先定义一个执行任务代码如下:

import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
import org.springframework.scheduling.quartz.quartzjobbean;

import java.time.localdatetime;

public class samplejob extends quartzjobbean {
    @override
    protected void executeinternal(jobexecutioncontext jobexecutioncontext)
            throws jobexecutionexception {
        system.out.println("任务执行,时间:" + localdatetime.now());
    }
}

在定义一个 jobdetail 和 trigger 实现代码如下:

import org.quartz.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class samplescheduler {
    @bean
    public jobdetail samplejobdetail() {
        return jobbuilder.newjob(samplejob.class).withidentity("samplejob")
                .storedurably().build();
    }

    @bean
    public trigger samplejobtrigger() {
        // 3s 后执行
        simpleschedulebuilder schedulebuilder =
                simpleschedulebuilder.simpleschedule().withintervalinseconds(3).withrepeatcount(1);
        return triggerbuilder.newtrigger().forjob(samplejobdetail()).withidentity("sampletrigger")
                .withschedule(schedulebuilder).build();
    }
}

最后在 springboot 项目启动之后开启延迟任务,实现代码如下:

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.scheduling.quartz.schedulerfactorybean;

/**
 * springboot 项目启动后执行
 */
public class mystartuprunner implements commandlinerunner {

    @autowired
    private schedulerfactorybean schedulerfactorybean;

    @autowired
    private samplescheduler samplescheduler;

    @override
    public void run(string... args) throws exception {
        // 启动定时任务
        schedulerfactorybean.getscheduler().schedulejob(
                samplescheduler.samplejobtrigger());
    }
}

以上程序的执行结果如下:

2020-04-13 19:02:12.331  info 17768 --- [  restartedmain] com.example.demo.demoapplication         : started demoapplication in 1.815 seconds (jvm running for 3.088)

任务执行,时间:2020-04-13t19:02:15.019

从结果可以看出在项目启动 3s 之后执行了延迟任务。

总结

本文讲了延迟任务的使用场景,以及延迟任务的 10 种实现方式:

  1. 手动无线循环;
  2. scheduledexecutorservice;
  3. delayqueue;
  4. redis zset 数据判断的方式;
  5. redis 键空间通知的方式;
  6. netty 提供的 hashedwheeltimer 工具类;
  7. rabbitmq 死信队列;
  8. rabbitmq 延迟消息插件 rabbitmq-delayed-message-exchange;
  9. spring scheduled;
  10. quartz。

最后的话

俗话说:台上一分钟,台下十年功。本文的所有内容皆为作者多年工作积累的结晶,以及熬夜呕心沥血的整理,如果觉得本文有帮助到你,请帮我分享出去,让更多的人看到,谢谢你。

本文由博客一文多发平台 openwrite 发布!