欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring boot 集成Quartz实现动态调度任务

程序员文章站 2022-05-01 19:22:31
...

1、首先引入jar包在maven中:

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>5.1.0.RELEASE</version>
        </dependency>

2、配置Quartz的bean类:


import org.quartz.Scheduler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;


@Configuration
public class QuartzConfig {

    @Bean
    public SchedulerFactoryBean schedulerBean() {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        return schedulerFactoryBean;
    }

    @Bean
    public Scheduler scheduler() {
        return schedulerBean().getScheduler();
    }
}

3、创建Job的接口类等会要在封装的定时器中调用:

import cn.com.boco.dss.ioms.common.SpringContext;
import cn.com.boco.dss.ioms.tagquartzjob.domain.OimScheduleTaskPlanEntity;
import cn.com.boco.dss.ioms.tagquartzjob.service.EditQuarzService;
import org.apache.log4j.Logger;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import java.util.concurrent.atomic.AtomicInteger;

@DisallowConcurrentExecution
public class QuartzJobFactory implements Job {

    protected Logger logger = Logger.getLogger(this.getClass());


    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        String targetBeanId = context.getMergedJobDataMap().getString("targetObjectId");
        String taskDesc = context.getMergedJobDataMap().getString("taskDesc");
        EditQuarzService editQuarzService = (EditQuarzService) SpringContext.getBean("editQuarzService");
        try {
            OimScheduleTaskPlanEntity en = new OimScheduleTaskPlanEntity();
            en.setTaskDesc(taskDesc);
            en.setTaskId(targetBeanId);
            editQuarzService.handleMethod(en);
        } catch (Exception e) {
            logger.info("QuartzJobFactory-execute thorw:", e);
        }


    }


}

4、创建SpringContext用来根据bean名称获取bean在第3步有用到、因为第3步没有注解到容器管理所以根据注解拿不到容器里面的bean,所以需要创建SpringContext来根据名称获取bean:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringContext implements ApplicationContextAware {

	private static ApplicationContext context;

	public void setApplicationContext(ApplicationContext applicationcontext)
			throws BeansException {
		SpringContext.context = applicationcontext;
	}

	public static Object getBean(String beanName) {
		return context.getBean(beanName);
	}

}

5、创建service类:



public interface EditQuarzService {

    public void initScheduler();

    public JsonData addScheduler(String id);

    public JsonData delScheduler(String id);

    public JsonData resumeScheduler(String id);

    public JsonData resumeSchedulerAll();

    public void handleMethod(Entity en);
}

6、创建serviceImpl类我用的持久层(repository)用的springboot data JPA,此处省略不在写了,可以根据自己框架持久层框架进行修改:

import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service("editQuarzService")
public class EditQuarzServiceImpl implements EditQuarzService {

    protected Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private QuarzJobRepository quarzJobRepository;


    /**
     * 开启服务器默认启动所有的定时器任务
     */
    public void initScheduler() {
        List<Entity> taskList = this.quarzJobRepository.findByStatus(Const.UNEXECUTED);
        for (Entity task : taskList) {
            try {
                this.scheduler(task, scheduler);
            } catch (Exception e) {
                log.error("定时:" + task.getTaskId() + "启动失败");
            }
        }
    }


    /**
     * 增加任务到定时器
     *
     * @param id
     * @return
     */
    @Override
    public JsonData addScheduler(String id) {
        JsonData jd = new JsonData();
        Entity en = this.quarzJobRepository.findByTaskIdAndStatus(id, Const.UNEXECUTED);
        if (en != null) {
            try {
                scheduler.deleteJob(new JobKey(id));
                this.scheduler(en, scheduler);
                en.setLastModifyTime(new Date());
                //en.setStatus(Const.COMPLETED);
                this.quarzJobRepository.save(en);
                jd.setData("成功新增定时任务");
            } catch (Exception e) {
                jd.setStatus("-1");
                jd.setData("新增定时任务出错");
                log.error("异常:" + e);
            }
        } else {
            jd.setStatus("-1");
            jd.setData("任务不存在");
        }
        return jd;
    }


    /**
     * 暂停任务从定时器中
     *
     * @param id
     * @return
     */
    @Override
    public JsonData delScheduler(String id) {
        JsonData jd = new JsonData();
        Entity en = this.quarzJobRepository.findByTaskIdAndStatus(id, Const.UNEXECUTED);
        if (en != null) {
            try {
                scheduler.deleteJob(new JobKey(id));
                en.setLastModifyTime(new Date());
                this.quarzJobRepository.save(en);
            } catch (Exception e) {
                jd.setStatus("-1");
                jd.setData("删除定时任务出错");
                log.error("异常:" + e);
            }
        } else {
            jd.setStatus("-1");
            jd.setData("任务不存在");
        }
        return jd;
    }


    /**
     * 重启指定的任务从定时器中
     *
     * @param id
     * @return
     */
    @Override
    public JsonData resumeScheduler(String id) {
        JsonData jd = new JsonData();
        Entity en = this.quarzJobRepository.findByTaskId(id);
        if (en != null) {
            try {
                scheduler.deleteJob(new JobKey(id));
                this.scheduler(en, scheduler);
            } catch (SchedulerException e) {
                jd.setStatus("-1");
                jd.setData("重启定时任务出错");
                log.error("异常:" + e);
            }
        } else {
            jd.setStatus("-1");
            jd.setData("任务不存在");
        }
        return jd;
    }


    /**
     * 重启所有的未执行过的任务
     *
     * @return
     */
    @Override
    public JsonData resumeSchedulerAll() {
        JsonData jd = new JsonData();
        List<Entity> taskList = this.quarzJobRepository.findByStatus(Const.UNEXECUTED);
        try {
            scheduler.clear();
        } catch (SchedulerException e1) {
            log.error("清空定时任务异常:", e1);
        }
        for (OimScheduleTaskPlanEntity entity : taskList) {
            try {
                this.scheduler(entity, scheduler);
            } catch (Exception e) {
                jd.setStatus("-1");
                jd.setData("重启定时任务出错");
                log.error("异常:", e);
            }
        }
        return jd;
    }

    //封装定时器方法
    public void scheduler(Entity task, Scheduler scheduler) {
        TriggerKey triggerKey = TriggerKey.triggerKey(task.getTaskId(), Scheduler.DEFAULT_GROUP);
        JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class).withDescription(task.getTaskDesc()).withIdentity(task.getTaskId(), Scheduler.DEFAULT_GROUP).build();
        jobDetail.getJobDataMap().put("targetObjectId", task.getTaskId());
        jobDetail.getJobDataMap().put("taskDesc", task.getTaskDesc());
        //jobDetail.getJobDataMap().put("executeParamter", task.getExecuteParamter());
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getStartTime());
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
        try {
            scheduler.scheduleJob(jobDetail, trigger);
            log.info("task " + task.getTaskId() + " schedulerRule :" + task.getTaskDesc() + " reload succeed");
        } catch (Exception e) {
            log.error("scheduler--异常:", e);
            throw new RuntimeException();
        }
    }



    /**
     *定时处理数据逻辑方法
     * @return
     */
    public void handleMethod(Entity en) {
        System.out.println(en.getTaskDesc() + "执行一次>>>"+en.getTaskId());
    }


}

7、我们每次需要重启项目的时候也要把我们之前已经再跑的定时任务给加载进来才行,所以在启动项目的时候初始化所以定时任务:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Component
public class DynamicJobAssembler {

    private static Logger logger = LoggerFactory.getLogger(DynamicJobAssembler.class);

    @Resource
    private EditQuarzService editQuarzService;

    @PostConstruct
    public void init() {
        logger.info("加载定时任务开始");
        try {
            this.editQuarzService.initScheduler();
        } catch (Exception e) {
            logger.info("定时任务加载失败,请手动加载");
        }
        logger.info("定时任务加载完成");
    }




}

8、工具类

public class Const {
	
	public static String ACTIVE_MQ = "mq";
	public static String URL_REQUEST = "http";
	public static String SUCCESS = "success";
	public static String FAILED = "failed";
	public static String RELOAD = "reload";
	public static String UNEXECUTED = "未执行";
	public static String INSERVICE = "运行中";
	public static String COMPLETED = "已完成";

}