spring boot 集成quartz实现定时任务调度
程序员文章站
2022-05-01 19:22:01
...
1.pom文件引入
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
2.application.xml配置
#定时任务配置
asyn-tasks:
triggers:
- job-name: CacheJob
cron: 0 0/1 * * * ?
enabled: true
3.配置文件context-epasquartzmanage.properties
org.quartz.group.name=AIMS_JOBS
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.misfireThreshold = 60000
#org.quartz.jobStore.tablePrefix = QRTZ_
#org.quartz.jobStore.isClustered = true
#org.quartz.jobStore.clusterCheckinInterval = 2000
4.配置读取-job配置
package com.paic.aims.cache.config;
import com.paic.aims.cache.base.InvokeJobDetailFactory;
import com.paic.aims.cache.base.Job;
import com.paic.icore.agr.common.exception.ServiceRuntimeException;
import com.paic.icore.agr.common.um.PrincipalUtil;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Trigger;
import org.quartz.listeners.JobListenerSupport;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.text.ParseException;
import java.util.*;
/**
* job配置
* 扫描所有的Task类,并按照配置文件中配置的Cron表达式,启动job
**/
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Configuration
public class JobConfig {
private static Logger logger = AgrLogUtils.getLogger(JobConfig.class);
@Autowired(required = false)
private List<Job> jobList;
@Autowired
private TaskConfig taskConfig;
@Bean
public SchedulerFactoryBean schedulerFactoryBeans() {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
if(CollectionUtils.isEmpty(jobList)){
return schedulerFactoryBean;
}
List<TriggerConfig> triggerConfigs = taskConfig.triggers;
List<Trigger> triggers = new ArrayList<>();
logger.info("jobs is+ "+ jobList);
for(Job job : jobList){
String jobName = job.getClass().getSimpleName();
TriggerConfig triggerConfig = triggerConfigs.stream().filter(e -> e.getJobName().equals(jobName)).findFirst()
.orElse(null);
if(triggerConfig == null || StringUtils.isBlank(triggerConfig.getCron())){
throw new ServiceRuntimeException("No cron expression found for job [" + jobName + "]");
}
if(!triggerConfig.isEnabled()){
continue;
}
JobDetailFactoryBean jobFactoryBean = new JobDetailFactoryBean();
jobFactoryBean.setJobClass(InvokeJobDetailFactory.class);
jobFactoryBean.setDurability(true);
jobFactoryBean.setRequestsRecovery(true);
jobFactoryBean.setGroup("AIMS_JOBS");
jobFactoryBean.setBeanName(jobName);
Map<String, Object> map = new HashMap<>(100);
map.put("taskClass", job.getClass());
jobFactoryBean.setJobDataAsMap(map);
jobFactoryBean.afterPropertiesSet();
JobDetail jobDetail = jobFactoryBean.getObject();
//是否不允许并发执行
logger.info("jobDetailIsConcurrentExectionDisallowed="+jobDetail.isConcurrentExectionDisallowed());
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetail);
cronTriggerFactoryBean.setName("trigger_" + jobName);
cronTriggerFactoryBean.setGroup("AIMS_JOBS");
cronTriggerFactoryBean.setCronExpression(triggerConfig.getCron());
try {
cronTriggerFactoryBean.afterPropertiesSet();
} catch (ParseException e) {
throw new ServiceRuntimeException("Invalid cron expression [" + triggerConfig.getCron() + "] for " +
"task [" + jobName + "]");
}
triggers.add(cronTriggerFactoryBean.getObject());
}
logger.info("triggers is+ "+ triggers);
schedulerFactoryBean.setConfigLocation( new ClassPathResource("context-epasquartzmanage.properties"));
Properties p = new Properties();
try {
// 读取配置文件pro.properties
p.load(JobConfig.class.getClassLoader().getResourceAsStream("context-epasquartzmanage.properties"));
logger.info("org.quartz.group.name="+p.getProperty("org.quartz.group.name"));
logger.info("org.quartz.jobStore.tablePrefix="+p.getProperty("org.quartz.jobStore.tablePrefix"));
}catch ( Exception e){
logger.info("err"+e.getMessage());
}
schedulerFactoryBean.setQuartzProperties(p);
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
schedulerFactoryBean.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
//设置job监听器,设置UserExecutionContext
schedulerFactoryBean.setGlobalJobListeners(new JobListenerSupport() {
@Override
public String getName() {
return "UserExecutionContextInit";
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
//设置默认执行身份
PrincipalUtil.setUser(null);
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
PrincipalUtil.clear();
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
PrincipalUtil.clear();
}
});
return schedulerFactoryBean;
}
@Component
@ConfigurationProperties(prefix = "asyn-tasks")
static class TaskConfig {
private List<TriggerConfig> triggers = new ArrayList<>();
public void setTriggers(List<TriggerConfig> triggers) {
this.triggers = triggers;
}
public List<TriggerConfig> getTriggers() {
return triggers;
}
}
}
5.trigger配置
package com.paic.aims.cache.config;
/**
* trigger配置
*/
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
public class TriggerConfig {
private String jobName;
private String cron;
private boolean enabled;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
}
6.基础类
/**
*定义一个job接口
**/
package com.paic.aims.cache.base;
public interface Job {
/**
* 这就是个接口
*/
void execute();
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheJob implements Job {
/**
分布式事务锁的超时时间 120s
*/
public final static long NX_TIME_OUT = 60L;
private final String BLOCK_REDIS_REFRESH = "BLOCKREDISREFRESHAIMS";
/**
* 定时刷新所有缓存
*/
@Autowired
private List<CacheDataPojoService> list;
/**
* 初始化数据
*/
public void init() {
for (CacheDataPojoService cacheDataPojoService : list) {
cacheDataPojoService.initData();
}
}
@Override
public void execute() {
if (RedisUtil.setnx(BLOCK_REDIS_REFRESH, BLOCK_REDIS_REFRESH, NX_TIME_OUT)) {
try {
for (CacheDataPojoService cacheDataPojoService : list) {
cacheDataPojoService.refreshData();
}
} finally {
RedisUtil.deleteString(BLOCK_REDIS_REFRESH);
}
}
}
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceImpl {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);
/**
* 定时刷新所有缓存
*/
@Autowired
private List<CacheDataPojoService> list;
public void refresh(){
for (CacheDataPojoService cacheDataPojoService: list){
cacheDataPojoService.refreshData();
}
}
/**
* 初始化数据
*/
public void init(){
log.info("init cache listSize = {}",list.size());
for (CacheDataPojoService cacheDataPojoService: list){
log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
try {
cacheDataPojoService.initData();
}catch (Exception e){
log.error("init data happen error",e);
}
}
}
}
package com.paic.aims.cache.base;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
public class InvokeJobDetailFactory extends QuartzJobBean {
private Class<? extends Job> taskClass;
private ApplicationContext ctx;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
Job job = ctx.getBean(taskClass);
job.execute();
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ctx = applicationContext;
}
public void setTaskClass(Class<? extends Job> taskClass) {
this.taskClass = taskClass;
}
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceImpl {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);
/**
* 定时刷新所有缓存
*/
@Autowired
private List<CacheDataPojoService> list;
public void refresh(){
for (CacheDataPojoService cacheDataPojoService: list){
cacheDataPojoService.refreshData();
}
}
/**
* 初始化数据
*/
public void init(){
log.info("init cache listSize = {}",list.size());
for (CacheDataPojoService cacheDataPojoService: list){
log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
try {
cacheDataPojoService.initData();
}catch (Exception e){
log.error("init data happen error",e);
}
}
}
}
7.启动类
自定义启动类
package com.paic.aims;
import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.cache.base.CacheServiceImpl;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceStartedRunner implements ApplicationRunner {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceStartedRunner.class);
private CacheDataPojoService cacheDataPojoService;
@Autowired
private CacheServiceImpl acheServiceImpl;
public CacheServiceStartedRunner(CacheServiceImpl acheServiceImpl) {
this.acheServiceImpl =acheServiceImpl;
}
@Override
public void run(ApplicationArguments args) throws Exception {
//初始化基础数据
log.info("初始化缓存数据");
acheServiceImpl.init();
}
}
系统启动类
package com.paic.aims;
import com.paic.icore.agr.common.config.AgrBootstrap;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.web.MultipartAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@SpringBootApplication
@Import(AgrBootstrap.class)
@EnableTransactionManagement
@EnableFeignClients(basePackages = {"com.paic.aims.*"})
@EnableAutoConfiguration(exclude = {MultipartAutoConfiguration.class})
@EnableAsync
@EnableScheduling
public class ApplicationJobs {
public static void main(String[] args) {
new SpringApplicationBuilder(ApplicationJobs.class).web(true).run(args);
}
}
8.自定义service类,实现自己的业务需求
package com.paic.aims.service;
import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.dto.InformationModuleModel;
import com.paic.aims.mapper.InformationCacheMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Service
public class InformationAppServiceImpl implements CacheDataPojoService {
private static final Log logger = LogFactory.getLog(InformationAppServiceImpl.class);
@Autowired
private InformationCacheMapper informationMapper;
/**
* 分页查询三农资讯模块
* @return
*/
public List<InformationModuleModel> selectInformationModuleList(int belong) throws Exception {
List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
return module;
}
@Override
public void initData() {
List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
System.out.println("初始化数据");
}
@Override
public void refreshData() {
System.out.println("更新数据");
}
}
9.工具类
package com.paic.icore.agr.common.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @Author
* @Date 2017/11/1 15:33
*/
@Component
public class RedisUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);
private static StringRedisTemplate stringRedisTemplate;
private static RedisTemplate<String, Object> redisTemplate;
private static String applicationName;
private static String wrapKey(String key) {
return key;
}
public static void setObject(String key, Object o) {
redisTemplate.opsForValue().set(wrapKey(key), o);
}
public static void setObject(String key, Object o, int timeout) {
redisTemplate.opsForValue().set(wrapKey(key), o, timeout, TimeUnit.DAYS);
}
public static <T> T getObject(String key) {
return (T) redisTemplate.opsForValue().get(wrapKey(key));
}
public static void deleteObject(String key) {
redisTemplate.delete(wrapKey(key));
}
public static void deleteString(String key) {
stringRedisTemplate.delete(wrapKey(key));
}
public static String getString(String key) {
if (StringUtils.isBlank(key)) {
return null;
}
return stringRedisTemplate.opsForValue().get(wrapKey(key));
}
public static void setString(String key, String val) {
stringRedisTemplate.opsForValue().set(wrapKey(key), val);
}
public static void setString(String key, String val, int timeout) {
stringRedisTemplate.opsForValue().set(wrapKey(key), val, timeout, TimeUnit.SECONDS);
}
/**
* 分布式事务锁
*
* @param key
* @param val
* @param time 超时时间
* @return true 为上锁成功,false为上锁失败
*/
public static boolean setnx(String key, String val, long time) {
boolean flag;
RedisConnection connection = stringRedisTemplate.getConnectionFactory().getConnection();
try {
flag = connection.setNX(key.getBytes(), val.getBytes());
} finally {
connection.close();
}
// flag = stringRedisTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),val.getBytes());
//上锁成功则进行设置超时时间
if (flag){
stringRedisTemplate.expire(key, time, TimeUnit.SECONDS);}
return flag;
}
public static long ttl(String key) {
return stringRedisTemplate.getConnectionFactory().getConnection().ttl(key.getBytes());
}
/**
* 相当于 map.put(key,valure)
*
* @param mapName
* @param key
* @param object
* @param <T>
*/
public static <T> void mapPut(String mapName, String key, T object) {
redisTemplate.opsForHash().put(mapName, key, object);
}
/**
* 相当于map.get(key)
*
* @param mapName
* @param key
* @param <T>
* @return
*/
public static <T> T mapGetByKey(String mapName, String key) {
return (T) redisTemplate.opsForHash().get(mapName, key);
}
/**
* 获取整个Map
*
* @param mapName
* @return
*/
public static Map getMap(String mapName) {
return redisTemplate.opsForHash().entries(mapName);
}
/**
* 将要查询的条件当做key进行ZSet存储
* @param pattern
* @return
*/
public static Set<String> keys(String pattern) {
return stringRedisTemplate.keys(pattern);
}
@Autowired
public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
RedisSerializer stringSerializer = new StringRedisSerializer();
stringRedisTemplate.setKeySerializer(stringSerializer);
stringRedisTemplate.setValueSerializer(stringSerializer);
stringRedisTemplate.setHashKeySerializer(stringSerializer);
stringRedisTemplate.setHashValueSerializer(stringSerializer);
RedisUtil.stringRedisTemplate = stringRedisTemplate;
}
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtil.redisTemplate = redisTemplate;
}
@Value("${spring.application.name:unknown}")
public void setApplicationName(String applicationName) {
RedisUtil.applicationName = applicationName;
}
public static void main(String[] args)
{
RedisUtil.setString("c","中文");
}
}
上一篇: Linux后台运行java的jar包
下一篇: cell高度自适应