欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring boot 集成quartz实现定时任务调度

程序员文章站 2022-05-01 19:22:01
...

1.pom文件引入

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2.application.xml配置

#定时任务配置
asyn-tasks:
  triggers:
    - job-name: CacheJob
      cron: 0 0/1 * * * ?
      enabled: true

3.配置文件context-epasquartzmanage.properties

org.quartz.group.name=AIMS_JOBS
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.misfireThreshold = 60000
#org.quartz.jobStore.tablePrefix = QRTZ_
#org.quartz.jobStore.isClustered = true
#org.quartz.jobStore.clusterCheckinInterval = 2000

4.配置读取-job配置

package com.paic.aims.cache.config;

import com.paic.aims.cache.base.InvokeJobDetailFactory;
import com.paic.aims.cache.base.Job;
import com.paic.icore.agr.common.exception.ServiceRuntimeException;
import com.paic.icore.agr.common.um.PrincipalUtil;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Trigger;
import org.quartz.listeners.JobListenerSupport;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.text.ParseException;
import java.util.*;

/**
 * job配置
 * 扫描所有的Task类,并按照配置文件中配置的Cron表达式,启动job
 **/
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Configuration
public class JobConfig {
  private static Logger logger = AgrLogUtils.getLogger(JobConfig.class);
    @Autowired(required = false)
    private List<Job> jobList;
    @Autowired
    private TaskConfig taskConfig;
    @Bean
    public SchedulerFactoryBean schedulerFactoryBeans() {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        if(CollectionUtils.isEmpty(jobList)){
            return schedulerFactoryBean;
        }
        List<TriggerConfig> triggerConfigs = taskConfig.triggers;
        List<Trigger> triggers = new ArrayList<>();
        logger.info("jobs is+ "+ jobList);
        for(Job job : jobList){
            String jobName = job.getClass().getSimpleName();
            TriggerConfig triggerConfig = triggerConfigs.stream().filter(e -> e.getJobName().equals(jobName)).findFirst()
                                                        .orElse(null);
            if(triggerConfig == null || StringUtils.isBlank(triggerConfig.getCron())){
                throw new ServiceRuntimeException("No cron expression found for job [" + jobName + "]");
            }
            if(!triggerConfig.isEnabled()){
                continue;
            }
            JobDetailFactoryBean jobFactoryBean = new JobDetailFactoryBean();
            jobFactoryBean.setJobClass(InvokeJobDetailFactory.class);
            jobFactoryBean.setDurability(true);
            jobFactoryBean.setRequestsRecovery(true);
            jobFactoryBean.setGroup("AIMS_JOBS");
            jobFactoryBean.setBeanName(jobName);

            Map<String, Object> map = new HashMap<>(100);
            map.put("taskClass", job.getClass());
            jobFactoryBean.setJobDataAsMap(map);

            jobFactoryBean.afterPropertiesSet();

            JobDetail jobDetail = jobFactoryBean.getObject();
            //是否不允许并发执行
            logger.info("jobDetailIsConcurrentExectionDisallowed="+jobDetail.isConcurrentExectionDisallowed());

            CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
            cronTriggerFactoryBean.setJobDetail(jobDetail);
            cronTriggerFactoryBean.setName("trigger_" + jobName);
            cronTriggerFactoryBean.setGroup("AIMS_JOBS");
            cronTriggerFactoryBean.setCronExpression(triggerConfig.getCron());
            try {
                cronTriggerFactoryBean.afterPropertiesSet();
            } catch (ParseException e) {
                throw new ServiceRuntimeException("Invalid cron expression [" + triggerConfig.getCron() + "] for " +
                        "task [" + jobName + "]");
            }
            triggers.add(cronTriggerFactoryBean.getObject());
        }
        logger.info("triggers is+ "+ triggers);
        schedulerFactoryBean.setConfigLocation( new ClassPathResource("context-epasquartzmanage.properties"));

        Properties p = new Properties();
        try {
            // 读取配置文件pro.properties
            p.load(JobConfig.class.getClassLoader().getResourceAsStream("context-epasquartzmanage.properties"));
            logger.info("org.quartz.group.name="+p.getProperty("org.quartz.group.name"));
            logger.info("org.quartz.jobStore.tablePrefix="+p.getProperty("org.quartz.jobStore.tablePrefix"));
        }catch ( Exception e){
            logger.info("err"+e.getMessage());
        }
        schedulerFactoryBean.setQuartzProperties(p);
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        schedulerFactoryBean.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
        //设置job监听器,设置UserExecutionContext
        schedulerFactoryBean.setGlobalJobListeners(new JobListenerSupport() {
            @Override
            public String getName() {
                return "UserExecutionContextInit";
            }
            @Override
            public void jobToBeExecuted(JobExecutionContext context) {
                //设置默认执行身份
                PrincipalUtil.setUser(null);
            }
            @Override
            public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
                PrincipalUtil.clear();
            }
            @Override
            public void jobExecutionVetoed(JobExecutionContext context) {
                PrincipalUtil.clear();
            }
        });
        return schedulerFactoryBean;
    }

    @Component
    @ConfigurationProperties(prefix = "asyn-tasks")
    static class TaskConfig {
        private List<TriggerConfig> triggers = new ArrayList<>();
        public void setTriggers(List<TriggerConfig> triggers) {
            this.triggers = triggers;
        }
        public List<TriggerConfig> getTriggers() {
            return triggers;
        }
    }
}

5.trigger配置

package com.paic.aims.cache.config;

/**
 * trigger配置
 */
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
public class TriggerConfig {

    private String jobName;
    private String cron;
    private boolean enabled;

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getCron() {
        return cron;
    }

    public void setCron(String cron) {
        this.cron = cron;
    }
}

6.基础类

/**
*定义一个job接口
**/
package com.paic.aims.cache.base;
public interface Job {
    /**
     * 这就是个接口
     */
    void execute();
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheJob implements Job {
    /**
    分布式事务锁的超时时间 120s
     */
    public final static long NX_TIME_OUT =  60L;
    private final String BLOCK_REDIS_REFRESH = "BLOCKREDISREFRESHAIMS";
    /**
     * 定时刷新所有缓存
     */
    @Autowired
    private List<CacheDataPojoService> list;
    /**
     * 初始化数据
     */
    public void init() {
        for (CacheDataPojoService cacheDataPojoService : list) {
            cacheDataPojoService.initData();
        }
    }
    @Override
    public void execute() {
        if (RedisUtil.setnx(BLOCK_REDIS_REFRESH, BLOCK_REDIS_REFRESH, NX_TIME_OUT)) {
            try {
                for (CacheDataPojoService cacheDataPojoService : list) {
                    cacheDataPojoService.refreshData();
                }
            } finally {
                RedisUtil.deleteString(BLOCK_REDIS_REFRESH);
            }
        }
    }
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceImpl {

    public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);

    /**
     * 定时刷新所有缓存
     */
    @Autowired
    private List<CacheDataPojoService> list;
    public void refresh(){
        for (CacheDataPojoService cacheDataPojoService: list){
            cacheDataPojoService.refreshData();
        }
    }

    /**
    * 初始化数据
     */
    public void init(){
        log.info("init cache listSize = {}",list.size());
        for (CacheDataPojoService cacheDataPojoService: list){
            log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
            try {
                cacheDataPojoService.initData();
            }catch (Exception e){
                log.error("init data happen error",e);
            }

        }
    }

}

package com.paic.aims.cache.base;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
public class InvokeJobDetailFactory extends QuartzJobBean {
    private Class<? extends Job> taskClass;
    private ApplicationContext ctx;

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        try {
            Job job = ctx.getBean(taskClass);
            job.execute();
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ctx = applicationContext;
    }

    public void setTaskClass(Class<? extends Job> taskClass) {
        this.taskClass = taskClass;
    }
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceImpl {

    public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);

    /**
     * 定时刷新所有缓存
     */
    @Autowired
    private List<CacheDataPojoService> list;
    public void refresh(){
        for (CacheDataPojoService cacheDataPojoService: list){
            cacheDataPojoService.refreshData();
        }
    }

    /**
    * 初始化数据
     */
    public void init(){
        log.info("init cache listSize = {}",list.size());
        for (CacheDataPojoService cacheDataPojoService: list){
            log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
            try {
                cacheDataPojoService.initData();
            }catch (Exception e){
                log.error("init data happen error",e);
            }

        }
    }

}

7.启动类
自定义启动类

package com.paic.aims;

import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.cache.base.CacheServiceImpl;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceStartedRunner implements ApplicationRunner {
    public final static Logger log = AgrLogUtils.getLogger(CacheServiceStartedRunner.class);

    private CacheDataPojoService cacheDataPojoService;

    @Autowired
    private CacheServiceImpl acheServiceImpl;

    public CacheServiceStartedRunner(CacheServiceImpl acheServiceImpl) {
        this.acheServiceImpl =acheServiceImpl;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //初始化基础数据
        log.info("初始化缓存数据");
        acheServiceImpl.init();
    }
}

系统启动类

package com.paic.aims;

import com.paic.icore.agr.common.config.AgrBootstrap;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.web.MultipartAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@SpringBootApplication
@Import(AgrBootstrap.class)
@EnableTransactionManagement
@EnableFeignClients(basePackages = {"com.paic.aims.*"})
@EnableAutoConfiguration(exclude = {MultipartAutoConfiguration.class})
@EnableAsync
@EnableScheduling
public class ApplicationJobs {
    public static void main(String[] args) {
        new SpringApplicationBuilder(ApplicationJobs.class).web(true).run(args);
    }
}

8.自定义service类,实现自己的业务需求

package com.paic.aims.service;


import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.dto.InformationModuleModel;
import com.paic.aims.mapper.InformationCacheMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Service
public class InformationAppServiceImpl implements CacheDataPojoService {

    private static final Log logger = LogFactory.getLog(InformationAppServiceImpl.class);

    @Autowired
    private InformationCacheMapper informationMapper;

    /**
     * 分页查询三农资讯模块
     * @return
     */
    public List<InformationModuleModel> selectInformationModuleList(int belong) throws Exception {
        List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
        return module;
    }

    @Override
    public void initData() {
        List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
        System.out.println("初始化数据");
    }

    @Override
    public void refreshData() {
        System.out.println("更新数据");
    }


}

9.工具类

package com.paic.icore.agr.common.utils;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @Author
 * @Date 2017/11/1 15:33
 */
@Component
public class RedisUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);

    private static StringRedisTemplate stringRedisTemplate;
    private static RedisTemplate<String, Object> redisTemplate;
    private static String applicationName;

    private static String wrapKey(String key) {
        return key;
    }

    public static void setObject(String key, Object o) {
        redisTemplate.opsForValue().set(wrapKey(key), o);
    }

    public static void setObject(String key, Object o, int timeout) {
        redisTemplate.opsForValue().set(wrapKey(key), o, timeout, TimeUnit.DAYS);
    }

    public static <T> T getObject(String key) {
        return (T) redisTemplate.opsForValue().get(wrapKey(key));
    }

    public static void deleteObject(String key) {
        redisTemplate.delete(wrapKey(key));
    }

    public static void deleteString(String key) {
        stringRedisTemplate.delete(wrapKey(key));
    }

    public static String getString(String key) {
        if (StringUtils.isBlank(key)) {
            return null;
        }
        return stringRedisTemplate.opsForValue().get(wrapKey(key));
    }

    public static void setString(String key, String val) {
        stringRedisTemplate.opsForValue().set(wrapKey(key), val);
    }

    public static void setString(String key, String val, int timeout) {
        stringRedisTemplate.opsForValue().set(wrapKey(key), val, timeout, TimeUnit.SECONDS);
    }

    /**
     * 分布式事务锁
     *
     * @param key
     * @param val
     * @param time 超时时间
     * @return true 为上锁成功,false为上锁失败
     */
    public static boolean setnx(String key, String val, long time) {
        boolean flag;
        RedisConnection connection = stringRedisTemplate.getConnectionFactory().getConnection();
        try {
            flag = connection.setNX(key.getBytes(), val.getBytes());
        } finally {
            connection.close();
        }
//        flag = stringRedisTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),val.getBytes());
        //上锁成功则进行设置超时时间
        if (flag){
            stringRedisTemplate.expire(key, time, TimeUnit.SECONDS);}
        return flag;
    }

    public static long ttl(String key) {
        return stringRedisTemplate.getConnectionFactory().getConnection().ttl(key.getBytes());
    }

    /**
     * 相当于 map.put(key,valure)
     *
     * @param mapName
     * @param key
     * @param object
     * @param <T>
     */
    public static <T> void mapPut(String mapName, String key, T object) {
        redisTemplate.opsForHash().put(mapName, key, object);
    }

    /**
     * 相当于map.get(key)
     *
     * @param mapName
     * @param key
     * @param <T>
     * @return
     */
    public static <T> T mapGetByKey(String mapName, String key) {
        return (T) redisTemplate.opsForHash().get(mapName, key);
    }

    /**
     * 获取整个Map
     *
     * @param mapName
     * @return
     */
    public static Map getMap(String mapName) {
        return redisTemplate.opsForHash().entries(mapName);
    }


    /**
     * 将要查询的条件当做key进行ZSet存储
     * @param pattern
     * @return
     */
    public static Set<String> keys(String pattern) {
        return stringRedisTemplate.keys(pattern);
    }

    @Autowired
    public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
        RedisSerializer stringSerializer = new StringRedisSerializer();
        stringRedisTemplate.setKeySerializer(stringSerializer);
        stringRedisTemplate.setValueSerializer(stringSerializer);
        stringRedisTemplate.setHashKeySerializer(stringSerializer);
        stringRedisTemplate.setHashValueSerializer(stringSerializer);
        RedisUtil.stringRedisTemplate = stringRedisTemplate;
    }

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtil.redisTemplate = redisTemplate;
    }

    @Value("${spring.application.name:unknown}")
    public void setApplicationName(String applicationName) {
        RedisUtil.applicationName = applicationName;
    }

    public static void main(String[] args)
    {
        RedisUtil.setString("c","中文");
    }

}