欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring整合quartz+数据库存储

程序员文章站 2022-06-19 15:20:08
目录spring整合quartz二级目录三级目录spring整合quartz前篇博文写到的quartz定时任务无法将service业务层的bean注入到工作类中,也就无法实现具体的业务逻辑操作,本篇博文将实现这个注入问题以及结合数据库实现定时器的动态更改quartz调度框架是有内置表的 ,相关的任务调度都是根据内置表完成的进入quartz的官网,点击Downloads后会得到一个tar文件解压后根据数据库将相应的内置表导入(例如mysql)导入pom依赖

spring整合quartz

前篇博文写到的quartz定时任务无法将service业务层的bean注入到工作类中,
也就无法实现具体的业务逻辑操作,本篇博文将实现这个注入问题以及结合数据库实现定时器的动态更改

quartz调度框架是有内置表的 ,相关的任务调度都是根据内置表完成的

进入quartz的官网,点击Downloads后会得到一个tar文件

spring整合quartz+数据库存储
解压后根据数据库将相应的内置表导入(例如mysql)
spring整合quartz+数据库存储
spring整合quartz+数据库存储

导入pom依赖

<dependency>
	<groupId>org.quartz-scheduler</groupId>
	<artifactId>quartz-jobs</artifactId>
	<version>2.2.1</version>
   </dependency>
   <dependency>
   	<groupId>org.springframework.boot</groupId>
   	<artifactId>spring-boot-starter-quartz</artifactId>
   </dependency>
<dependency>

更换数据库连接池

   quartz需要使用C3P0连接池将数据持久化到数据库
   Quartz各版本数据库连接池技术更新情况
   Quartz 2.0 以前 DBCP
   Quartz 2.0 以后 C3P0(包含2.0

由于为了开发便捷,一般我们用的都是druid连接池,那么我们就要进行换源操作
先导入druid的依赖

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid-spring-boot-starter</artifactId>
	<version>1.1.10</version>
</dependency>

项目中添加quartz.properties文件,默认会读取自建的properties文件配置

#
#============================================================================
# Configure Main Scheduler Properties 调度器属性
#============================================================================
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
#============================================================================
# Configure JobStore
#============================================================================
#存储方式使用JobStoreTX,也就是数据库
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#使用自己的配置文件
org.quartz.jobStore.useProperties:true
#数据库中quartz表的表名前缀
org.quartz.jobStore.tablePrefix:qrtz_
org.quartz.jobStore.dataSource:qzDS
#是否使用集群(如果项目只部署到 一台服务器,就不用了)
org.quartz.jobStore.isClustered = true
#============================================================================
# Configure Datasources
#============================================================================
#配置数据库源(org.quartz.dataSource.qzDS.maxConnections: c3p0配置的是有s的,druid数据源没有s)
org.quartz.dataSource.qzDS.connectionProvider.class:com.xiaoyang.quartzs.utils.DruidConnectionProvider
org.quartz.dataSource.qzDS.driver: com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL: jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.qzDS.user: xxx
org.quartz.dataSource.qzDS.password: xxx
org.quartz.dataSource.qzDS.maxConnection: 10


org.quartz.dataSource.qzDS.connectionProvider.class:com.xiaoyang.quartzs.utils.DruidConnectionProvider是druid数据库连接池的一些基础配置

DruidConnectionProvider:

package com.xiaoyang.quartzs.utils;


import com.alibaba.druid.pool.DruidDataSource;
import org.quartz.SchedulerException;
import org.quartz.utils.ConnectionProvider;

import java.sql.Connection;
import java.sql.SQLException;

/**
 * Druid连接池的Quartz扩展类
 *
 * @author xiaoyang
 * @create  2020-12-07 18:50
 */

public class DruidConnectionProvider implements ConnectionProvider {

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     *
     * 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。
     *
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */

    //JDBC驱动
    public String driver;
    //JDBC连接串
    public String URL;
    //数据库用户名
    public String user;
    //数据库用户密码
    public String password;
    //数据库最大连接数
    public int maxConnection;
    //数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。
    public String validationQuery;

    private boolean validateOnCheckout;

    private int idleConnectionValidationSeconds;

    public String maxCachedStatementsPerConnection;

    private String discardIdleConnectionsSeconds;

    public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;

    public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;

    //Druid连接池
    private DruidDataSource datasource;

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     *
     * 接口实现
     *
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */
    public Connection getConnection() throws SQLException {
        return datasource.getConnection();
    }

    public void shutdown() throws SQLException {
        datasource.close();
    }

    public void initialize() throws SQLException {
        if (this.URL == null) {
            throw new SQLException("DBPool could not be created: DB URL cannot be null");
        }

        if (this.driver == null) {
            throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
        }

        if (this.maxConnection < 0) {
            throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
        }

        datasource = new DruidDataSource();
        try {
            datasource.setDriverClassName(this.driver);
        } catch (Exception e) {
            try {
                throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
            } catch (SchedulerException e1) {
            }
        }

        datasource.setUrl(this.URL);
        datasource.setUsername(this.user);
        datasource.setPassword(this.password);
        datasource.setMaxActive(this.maxConnection);
        datasource.setMinIdle(1);
        datasource.setMaxWait(0);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(this.DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION);

        if (this.validationQuery != null) {
            datasource.setValidationQuery(this.validationQuery);
            if (!this.validateOnCheckout)
                datasource.setTestOnReturn(true);
            else
                datasource.setTestOnBorrow(true);
            datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
        }
    }

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     *
     * 提供get set方法
     *
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */
    public String getDriver() {
        return driver;
    }

    public void setDriver(String driver) {
        this.driver = driver;
    }

    public String getURL() {
        return URL;
    }

    public void setURL(String URL) {
        this.URL = URL;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMaxConnection() {
        return maxConnection;
    }

    public void setMaxConnection(int maxConnection) {
        this.maxConnection = maxConnection;
    }

    public String getValidationQuery() {
        return validationQuery;
    }

    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }

    public boolean isValidateOnCheckout() {
        return validateOnCheckout;
    }

    public void setValidateOnCheckout(boolean validateOnCheckout) {
        this.validateOnCheckout = validateOnCheckout;
    }

    public int getIdleConnectionValidationSeconds() {
        return idleConnectionValidationSeconds;
    }

    public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
        this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
    }

    public DruidDataSource getDatasource() {
        return datasource;
    }

    public void setDatasource(DruidDataSource datasource) {
        this.datasource = datasource;
    }
}


解决quartz中注入spring的bean对象

实现这个也非常简单,一个配置类即可

package com.xiaoyang.quartzs.utils;

import lombok.extern.slf4j.Slf4j;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyJobFactory extends AdaptableJobFactory {

    //这个对象Spring会帮我们自动注入进来
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    //重写创建Job任务的实例方法
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object jobInstance = super.createJobInstance(bundle);
        //通过以下方式,解决Job任务无法使用Spring中的Bean问题
        autowireCapableBeanFactory.autowireBean(jobInstance);
        return super.createJobInstance(bundle);
    }
}

配置类:

package com.xiaoyang.quartzs.config;

import com.xiaoyang.quartzs.utils.MyJobFactory;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import java.io.IOException;
import java.util.Properties;

@Configuration
public class QuartzConfiguration {

    @Autowired
    private MyJobFactory myJobFactory;

    //创建调度器工厂
    @Bean
        public SchedulerFactoryBean schedulerFactoryBean(){
            //1.创建SchedulerFactoryBean
            //2.加载自定义的quartz.properties配置文件
            //3.设置MyJobFactory

            SchedulerFactoryBean factoryBean=new SchedulerFactoryBean();
            try {
                factoryBean.setQuartzProperties(quartzProperties());
                factoryBean.setJobFactory(myJobFactory);
                return factoryBean;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean=new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    @Bean(name="scheduler")
    public Scheduler scheduler(){
        return schedulerFactoryBean().getScheduler();
    }
}

配置完成后,我们在工作类调用execute方法会通过调度器这个类进行bean的注入

结合数据库进行操作

在实际项目中,定时器任务肯定不会是一成不变的,那么就要支持更改定时任务进行完成定时器的动态更改,quartz是有内置表的,我们新建两张表进行动态操作定时任务
spring整合quartz+数据库存储spring整合quartz+数据库存储

核心代码(具体实现),springtask结合quartz

package com.xiaoyang.quartzs.service;

import com.xiaoyang.quartzs.mapper.ScheduleTriggerMapper;
import com.xiaoyang.quartzs.mapper.ScheduleTriggerParamMapper;
import com.xiaoyang.quartzs.model.ScheduleTrigger;
import com.xiaoyang.quartzs.model.ScheduleTriggerParam;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class ScheduleTriggerServiceImpl {

    @Autowired
    private ScheduleTriggerMapper scheduleTriggerMapper;

    @Autowired
    private ScheduleTriggerParamMapper scheduleTriggerParamMapper;

    @Autowired
    private Scheduler scheduler;

    @Scheduled(cron = "0/10 * * * * ?")
    public void refreshScheduler() {
        try {
            List<ScheduleTrigger> scheduleTriggers =
                    scheduleTriggerMapper.queryScheduleTriggerLst();
            if (null != scheduleTriggers) {
                for (ScheduleTrigger scheduleTrigger : scheduleTriggers) {
                    String cron = scheduleTrigger.getCron();  //表达式
                    String jobName = scheduleTrigger.getJob_name(); //任务名称
                    String jobGroup = scheduleTrigger.getJob_group(); //任务分组
                    String status = scheduleTrigger.getStatus();  //任务状态

                    //JobName+JobGroup=Primary Key
                    //根据jobName和jobGroup生成TriggerKey
                    TriggerKey triggerKey =
                            TriggerKey.triggerKey(jobName, jobGroup);
                    //根据TriggerKey到Scheduler调度器中获取触发器
                    CronTrigger cronTrigger = (CronTrigger)
                            scheduler.getTrigger(triggerKey);

                    if (null == cronTrigger) {
                        if (status.equals("0"))
                            continue;
                        System.out.println("创建调度器");
                        //创建任务详情
                        JobDetail jobDetail =
                                JobBuilder.newJob((Class<? extends Job>) Class.forName(jobName))
                                        .withIdentity(jobName, jobGroup)
                                        .build();

                        //往Job任务中传递参数
                        JobDataMap jobDataMap = jobDetail.getJobDataMap();
                        List<ScheduleTriggerParam> params =
                                scheduleTriggerParamMapper.queryScheduleParamLst(scheduleTrigger.getId());
                        for (ScheduleTriggerParam param : params) {
                            jobDataMap.put(param.getName(), param.getValue());
                        }

                        //创建表达式调度器
                        CronScheduleBuilder cronSchedule =
                                CronScheduleBuilder.cronSchedule(cron);

                        //创建Trigger
                        cronTrigger = TriggerBuilder.newTrigger()
                                .withIdentity(jobName, jobGroup)
                                .withSchedule(cronSchedule)
                                .build();

                        //将jobDetail和Trigger注入到scheduler调度器中
                        scheduler.scheduleJob(jobDetail, cronTrigger);
                    } else {
                        //System.out.println("Quartz 调度任务中已存在该任务");
                        if (status.equals("0")) {
                            JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
                            scheduler.deleteJob(jobKey);
                            continue;
                        }
                        //调度器中的表达式
                        String cronExpression =
                                cronTrigger.getCronExpression();

                        if (!cron.equals(cronExpression)) {
                            //创建表达式调度器
                            CronScheduleBuilder cronSchedule =
                                    CronScheduleBuilder.cronSchedule(cron);

                            //重构
                            cronTrigger = cronTrigger.getTriggerBuilder()
                                    .withIdentity(triggerKey)
                                    .withSchedule(cronSchedule)
                                    .build();

                            //刷新调度器
                            scheduler.rescheduleJob(triggerKey, cronTrigger);
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过修改数据库自建表的status即可动态实现定时任务的状态(是否启用),这里是支持多线程的,因为void方法使用的springtask定时任务完成刷新quartz定时器任务
案例:
MyJob

package com.xiaoyang.quartzs.quartz;

import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class MyJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        System.out.println("MyJob是一个空的任务计划,时间:"+new Date().toLocaleString());
    }
}

MyJob1

package com.xiaoyang.quartzs.quartz;

import com.xiaoyang.quartzs.service.ScheduleTriggerParamService;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Date;

/**
 * @author xiaoyang
 * @create  2020-12-08 8:40
 */
public class MyJob1 implements Job {

    @Autowired
    private ScheduleTriggerParamService scheduleTriggerParamService;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail =
                jobExecutionContext.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        System.err.println(new Date().toLocaleString() + "-->携带参数个数:" + jobDataMap.size());
    }
}

MyJob2

package com.xiaoyang.quartzs.quartz;

import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class MyJob2 implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail =
                jobExecutionContext.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        System.out.println(new Date().toLocaleString() + "-->MyJob2参数传递name=" + jobDataMap.get("name") + ",score=" +
                jobDataMap.get("score"));
    }
}

最后在启动类加上注解

@EnableScheduling

同时启用三个定时任务的情况:
spring整合quartz+数据库存储执行定时任务时,只要是在用的,都会在quartz内置表中添加这个数据
spring整合quartz+数据库存储

over…

本文地址:https://blog.csdn.net/qq_45510899/article/details/110852492

相关标签: quartz