欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java 使用Queue在队列中异步执行任务

程序员文章站 2022-04-08 19:14:30
先创建一个总的Handler(队列统一处理接口),名字就叫做 QueueTaskHandler 然后写一个队列服务类,就不多做说明了,我的注释已经写的很清楚了 接下来就可以开始写你的业务Handler了 那么我们来在service中添加一个任务 整个过程就结束了,然后在你的业务Handler中如果需 ......

先创建一个总的handler(队列统一处理接口),名字就叫做 queuetaskhandler

public interface queuetaskhandler {

    void processdata();
}

然后写一个队列服务类,就不多做说明了,我的注释已经写的很清楚了

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import javax.annotation.predestroy;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.future;
import java.util.concurrent.linkedblockingqueue;

@component
public class queuegenerationservice{

    // 日志监控
    private static final logger log = loggerfactory.getlogger(queuegenerationservice.class);
    // 根据业务与服务器性能自行配置 这里我配置的是最多50000个任务
    // linkedblockingqueue构造的时候若没有指定大小,则默认大小为integer.max_value
    private final linkedblockingqueue<queuetaskhandler> tasks = new linkedblockingqueue<queuetaskhandler>(50000);
    // 类似于一个线程总管 保证所有的任务都在队列之中
    private executorservice service = executors.newsinglethreadexecutor();
    // 检查服务是否运行
    private volatile boolean running = true;
    //线程状态
    private future<?> servicethreadstatus = null;

    @postconstruct
    public void init() {
    servicethreadstatus = service.submit(new thread(new runnable() {
        @override
        public void run() {
        while (running) {
            try {
            //开始一个任务
            queuetaskhandler task = tasks.take();
            try {
                task.processdata();
            } catch (exception e) {
                log.error("任务处理发生错误", e);
            }
            } catch (interruptedexception e) {
            log.error("服务停止,退出", e);
            running = false;
            }
        }
        }
    }, "save data thread"));
    }

    public boolean adddata(queuetaskhandler datahandler) {
    if (!running) {
        log.warn("service is stop");
        return false;
    }
    //offer 队列已经满了,无法再加入的情况下
    boolean success = tasks.offer(datahandler);
    if (!success) {
        log.warn("添加任务到队列失败");
    }
    return success;
    }

    public boolean checkservicerun() {
    return running && !service.isshutdown() && !servicethreadstatus.isdone();
    }

    public void activeservice() {
    running = true;
    if (service.isshutdown()) {
        service = executors.newsinglethreadexecutor();
        init();
        log.info("线程池关闭,重新初始化线程池及任务");
    }
    if (servicethreadstatus.isdone()) {
        init();
        log.info("线程池任务结束,重新初始化任务");
    }
    }

    @predestroy
    public void destory() {
    running = false;
    service.shutdownnow();
    }
}

接下来就可以开始写你的业务handler了

public class testservicehandler implements queuetaskhandler {

    // ******* start 这一段并不是必要的,这是示范一个传值的方式
    private string name;

    private integer age;

    public testservicehandler(string name) {
    this.name = name;
    }

    public testservicehandler(integer age) {
    this.age = age;
    }

    public testservicehandler(string name, integer age) {
    this.name = name;
    this.age = age;
    }

    // ****** end

    // 这里也就是我们实现queuetaskhandler的处理接口
    @override
    public void processdata() {
    // 可以去做你想做的业务了
    // 这里需要引用spring的service的话,我写了一个工具类,下面会贴出来
    // itestservice testservice = springutils.getbean(itestservice.class);
    system.out.println("name > " + name + "," + "age > " + age);
    }

}

那么我们来在service中添加一个任务

    // 这里注入队列服务
@autowired private queuegenerationservice queuegenerationservice;

  // 在方法中调用与传参的方式
  queuegenerationservice.adddata(new testservicehandler("小明",5));
 

整个过程就结束了,然后在你的业务handler中如果需要使用其他的bean比如service,那么请试试我写的这个工具类

import org.springframework.beans.beansexception;
import org.springframework.context.applicationcontext;
import org.springframework.context.applicationcontextaware;
import org.springframework.stereotype.component;

@component
public class springutils implements applicationcontextaware {

    private static applicationcontext applicationcontext;

    /**
     * @return
     * @description 获取applicationcontext
     */
    public static applicationcontext getapplicationcontext() {
        return applicationcontext;
    }

    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        if (springutils.applicationcontext == null) {
            springutils.applicationcontext = applicationcontext;
        }
    }

    /**
     * @param name
     * @return
     * @description 通过name获取 bean.
     */
    public static object getbean(string name) {
        return getapplicationcontext().getbean(name);
    }

    /**
     * @param clazz
     * @return
     * @description 通过class获取bean.
     */
    public static <t> t getbean(class<t> clazz) {
        return getapplicationcontext().getbean(clazz);
    }

    /**
     * @param name
     * @param clazz
     * @return
     * @description 通过name, 以及clazz返回指定的bean
     */
    public static <t> t getbean(string name, class<t> clazz) {
        return getapplicationcontext().getbean(name, clazz);
    }

}

 如果大家有什么不解,或意见,欢迎在下方留言,楼主看到就会回复的,谢谢。