Quartz任务的集群配置与分布式开发
1. 背景
最近由于项目的需求需要用到quartz做定时任务,要做到分布式的集群配置,于是就入坑了quartz的学习,在此记录一下我的学习过程中遇到的一些问题
2. quartz核心API的介绍
quartz是一个任务调度的框架,负责在某一个有规律的时间点去触发某件事情;quartz API的风格在2.x以后采用的是DSL风格,写起来非常流畅,下面介绍一下框架中核心的API的作用
1) Scheduler: 任务调度器,是实际执行任务调度器的控制器,在Spring被封装成ScheduerFactoryBean类
2) Trigger:触发器,用于定义调度时间的规则,即什么时间做什么事情,一个trigger只能对应一个任务,但是一个job却可以被多个trigger触发,其中trigger的类别有: SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本次项目中也是使用了这种方式,Spring也提供了自己的封装类CronTriggerFactoryBean;
3) Calendar:它是一些日历特定时间点的集合,如常见的节假日,可以方便的安排任务在非节假日的时候触发,一个trigger可以包含多个calendar;
4) JobDetail: 存储了一些Job实现类的定义的一些信息,如job的名字,组名等;在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
5) Job:是一个任务接口,只有一个void execute人(JobExecutionContext context)方法,你需要定义一个自己的实现类,继承该接口,在execute方法中封装具体任务的逻辑,context提供了调度上下文的各种信息;实现Job接口默认状态下是无状态的,无状态就是说Job有可能是并发执行的,如:一个任务要执行10秒,但是触发的时间间隔是1秒钟,那么就有多个任务并发执行;有状态的任务即能做到,直到上次任务执行完才会去执行下一次的任务,当触发时间到了,这次任务还没执行完,其他任务就会被挂起等待直到这次任务执行完,所以有状态的任务会导致任务没有按照原本的时间点执行;若要将任务设置成有状态的;可以在Job的实现类上添加@DisallowConcurrentExecution注解(早期的版本是实现StatefuleJob,现在已被Deprecated),在与Spring的结合中可在配置文件JobDetail中设置concurrent参数
6) JobDataMap:它是Map的扩展类,提供了一些便捷的方法,且可用于给Job传递参数值,如:有一个需求要给两个人发邮件,一个是发给张三,一个发给李四,不用写两个job实现类,可将其存在JobDataMap中,然后通过context在execute方法中获取到;对于同一JobDetail实例,执行多个Job实例,是共享同一个JobDataMap,也即在任务里修改了里面的值,会对其他Job实例造成影响;Trigger同样有一个JobDataMap,共享范围是所有使用这个Trigger的Job实例
3. Quartz与Spring的RAM集成方式
Quartz与Spring的集成,Spring提供了对Quartz的内部集成,集成的存储方式分RAM方式与JDBC方式(企业级开发中若任务要求高可用,可扩展性则必须使用JDBC方式,集群也是基于这一存储方式);接下来介绍一下RAM的存储集成方式.
1) applicationContext-quartz.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.3.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd">
<!-- 定义自己的jobFactory -->
<bean id="myJobFactory" class="com.zach.recipemanage.quartz.job.MyJobFactory" />
<!-- 定义无处方单任务bean -->
<bean name="recipeJobDetail"
class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<!-- 指定job的名称 -->
<property name="name" value="recipe_job"></property>
<!-- 指定job分组 -->
<property name="group" value="recipe_group"></property>
<!-- 指定具体的job类 -->
<property name="jobClass" value="com.zach.recipemanage.quartz.job.RecipeJob" />
<!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中会删除该任务 -->
<property name="durability" value="true" />
<!-- 任务中断重启后会恢复 -->
<property name="requestsRecovery" value="true" />
<!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 -->
<property name="applicationContextJobDataKey" value="applicationContext" />
</bean>
<!-- 2.1:定义无处方单触发器的bean,一个触发器只能和一个任务进行绑定 -->
<!-- <bean name="noRecipeTrigger"
class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="name" value="recipe_trigger" />
<property name="group" value="recipe_trigger_group" />
<property name="jobDetail" ref="recipeJobDetail" />
<property name="startDelay" value="1000" />
<property name="repeatInterval" value="30000" />
<property name="repeatCount" value="0" />
</bean> -->
<bean id="noRecipeTrigger"
class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="name" value="no_Recipe_trigger" />
<property name="group" value="recipe_trigger_group" />
<property name="jobDetail" ref="recipeJobDetail" />
<property name="cronExpression" value=" 0 0 1 * * ?" />
</bean>
<!-- 处方单不匹配任务-->
<bean name="thirdRecipeJobDetail"
class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<!-- 指定job的名称 -->
<property name="name" value="thirdRecipe_job"></property>
<!-- 指定job分组 -->
<property name="group" value="recipe_group"></property>
<!-- 指定具体的job类 -->
<property name="jobClass"
value="com.zach.recipemanage.quartz.job.ThirdRecipeJob" />
<!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中会删除该任务 -->
<property name="durability" value="true" />
<!-- 任务中断重启后会恢复 -->
<property name="requestsRecovery" value="true" />
<!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 -->
<property name="applicationContextJobDataKey" value="applicationContext" />
</bean>
<!-- 2.2:定义触发器的bean,定义一个Cron的Trigger,一个触发器只能和一个任务进行绑定 -->
<bean id="thirdRecipeTrigger"
class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- 指定Trigger的名称 -->
<property name="name" value="thirdRecipe_trigger" />
<!-- 指定Trigger的名称 -->
<property name="group" value="recipe_trigger_group" />
<!-- 指定Tirgger绑定的Job -->
<property name="jobDetail" ref="thirdRecipeJobDetail" />
<!-- 指定Cron 的表达式 ,当前是每天凌晨2:00运行一次 -->
<property name="cronExpression" value=" 0 0 2 * * ?" />
</bean>
<!-- 药品流向数据更新 定时器 - start -->
<!-- 定义药品流向数据中间表数据任务bean -->
<bean name="drugFlowDataUpdater"
class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<!-- 指定job的名称 -->
<property name="name" value="drug_flow_job"></property>
<!-- 指定job分组 -->
<property name="group" value="drug_group"></property>
<!-- 指定具体的job类 -->
<property name="jobClass" value="com.zach.recipemanage.quartz.job.DrugFlowJob" />
<!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中会删除该任务 -->
<property name="durability" value="true" />
<!-- 任务中断重启后会恢复 -->
<property name="requestsRecovery" value="true" />
<!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 -->
<property name="applicationContextJobDataKey" value="applicationContext" />
</bean>
<!-- 2.1:定义药品流向数据中间表数据更新触发器的bean,一个触发器只能和一个任务进行绑定 -->
<bean id="drugFlowDataUpdateTrigger"
class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- 指定Trigger的名称 -->
<property name="name" value="drug_flow_trigger" />
<!-- 指定Trigger的名称 -->
<property name="group" value="drug_trigger_group" />
<!-- 指定Tirgger绑定的Job -->
<property name="jobDetail" ref="drugFlowDataUpdater" />
<!-- 指定Cron 的表达式 ,当前是每天凌晨1点运行一次 -->
<property name="cronExpression" value=" 0 0 1 * * ?" />
</bean>
<!-- 药品流向数据更新 定时器 - end -->
<!-- 3.定义调度器,并将Trigger注册到调度器中 -->
<bean id="scheduler"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="startupDelay" value="3" />
<property name="jobFactory" ref="myJobFactory" />
<property name="triggers">
<list>
<ref bean="noRecipeTrigger" />
<ref bean="thirdRecipeTrigger"/>
<ref bean="drugFlowDataUpdateTrigger"/>
</list>
</property>
<property name="autoStartup" value="true" />
</bean>
</beans>
2) 定义自己的任务类(这里只列出了其中一个任务类,其他的任务类也是相似的)
public class RecipeJob implements Job {
@Resource
private SynchronizedDataService synchronizedDataService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//同步数据
synchronizedDataService.synchronizedData();
}
}
@Resource
private SynchronizedDataService synchronizedDataService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//同步数据
synchronizedDataService.synchronizedData();
}
}
在任务类中我们是通过注入service的方式,来调用具体的业务逻辑,但是当你一运行就会发现synchronizeDataService无法注入,空指针异常;原因是Spring框架整合了Quartz,在创建JobDetail对象的过程中,使用AdapterJobFactory的createJobInstance这个方法创建JobDetail对象,而没有把对象放入Spring的IOC容器管理,所以无法注入Spring对象
解决办法:手动把JobDetail对象放入SpringIOC容器中:
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Resource
private AutowireCapableBeanFactory factory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的原有的方法创建JobDetail对象
Object jobDetail = super.createJobInstance(bundle);
//把JobDetail放入SpringIOC容器中管理
factory.autowireBean(jobDetail);
return jobDetail;
}
}
@Resource
private AutowireCapableBeanFactory factory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的原有的方法创建JobDetail对象
Object jobDetail = super.createJobInstance(bundle);
//把JobDetail放入SpringIOC容器中管理
factory.autowireBean(jobDetail);
return jobDetail;
}
}
在配置文件中引入自定义的MyJobFactory,然后再注册到调度器Scheduler中;
4. Quartz集群原理分析
quartz的集群功能通过容错和负载均衡可以达到高可用和可扩展性,示意图如下:
Quartz集群是通过数据库表来感知其他节点的存在的,各个节点之间没有直接的通信,只有使用持久化的JobStore才能完成Quartz集群,而且每个节点都必须分享同一个数据库;
与quartz持久化相关的表现在有11张表(原来是12张,去掉了listener相关的表)
QRTZ_LOCKS就是quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。
1) quartz核心元素关系图
2) quartz线程视图
Scheduler调度线程主要有两个:执行常规调度线程和执行misfired trigger线程,常规调度线程轮询存储的所有的trigger,若达到了触发的时间则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务.misfire线程是扫描所有的trigger,查看是否有misfired trigger,若有则根据misfire的策略分别处理
3) Quartz调度线程流程图
4) quartz启动流程
若quartz是配置在spring中,当服务启动时,就会装载相关的bean;SchedulerFactoryBean实现了InitializingBean接口,故在初始化bean的时候,会执行afterPropertiesSet方法,该方法再调用ScheduerFactory(一般是StdSchedulerFactory)创建Scheduler.SchedulerFactory在创建quartzScheduer的过程中,将会读取配置参数,初始化各个组件,关键组件如下:
ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;
JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;
QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;
另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:
创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理;
创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理;
置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;
整个启动流程图
5. Spring + Quartz实现持久化任务调度
1) 环境准备:
Spring 4.3.7.RELEASE + mybatis 3.4.1+Quartz2.2.1+ mysql5.7
数据库表在Quartz包docs/dbTables中选择对应的数据库脚本,有一个注意点:mysql5.5之前用的表存储引擎是MyISAM,使用的是表级所,锁发生冲突概率比较高,并发度低,5.6之后默认的存储引擎为InnoDB,采用的是行级锁机制,并发度较高,而quartz集群使用的数据库的锁机制来实现同一个任务同一时刻只能被一个实例执行;
2) 任务用于前端展示视图类ScheduleJob的映射表:
-- Table structure for task_schedule_job -- ---------------------------- DROP TABLE IF EXISTS `task_schedule_job`; CREATE TABLE `task_schedule_job` ( `job_id` int(20) NOT NULL AUTO_INCREMENT, `create_time` timestamp NULL DEFAULT NULL, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `job_name` varchar(255) DEFAULT NULL, `job_group` varchar(255) DEFAULT NULL, `job_status` varchar(255) DEFAULT NULL, `cron_expression` varchar(255) NOT NULL, `description` varchar(255) DEFAULT NULL, `bean_class` varchar(255) DEFAULT NULL, `is_concurrent` varchar(255) DEFAULT NULL, `spring_id` varchar(255) DEFAULT NULL, `method_name` varchar(255) NOT NULL, PRIMARY KEY (`job_id`), UNIQUE KEY `name_group` (`job_name`,`job_group`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of task_schedule_job -- ---------------------------- |
3) application-quartz.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.3.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd">
<!-- 注册集群调度任务 -->
<bean id="recipeManageSchedule" lazy-init="false" autowire="no"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean"
destroy-method="destroy">
<!-- 数据源 -->
<property name="dataSource" ref="dataSource" />
<!-- 可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
<property name="overwriteExistingJobs" value="true" />
<!-- 必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
<property name="startupDelay" value="3" />
<!-- 设置自动启动 -->
<property name="autoStartup" value="true" />
<property name="applicationContextSchedulerContextKey" value="applicationContext" />
<property name="configLocation" value="classpath:quartz.properties" />
</bean>
</beans>
4) quartz.proepries配置文件
#==============================================================
#Configure Main Scheduler Properties
#==============================================================
org.quartz.scheduler.instanceName = RecipeManageSchedule
org.quartz.scheduler.instanceId = AUTO
#==============================================================
#Configure JobStore
#==============================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.txIsolationLevelSerializable = false
org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE
#org.quartz.jobStore.dataSource = myDS 由spring管理dataSource
==============================================================
#Configure DataSource 数据库的连接信息在dataSource文件中已经配置过
==============================================================
#org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
#org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/gd_recipe?useUnicode=true&characterEncoding=utf8
#org.quartz.dataSource.myDS.user = root
#org.quartz.dataSource.myDS.password = 123456
#org.quartz.dataSource.myDS.maxConnections = 30
#==============================================================
#Configure ThreadPool
#==============================================================
org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount= 10
org.quartz.threadPool.threadPriority= 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true
#==============================================================
#Skip Check Update
#update:true
#not update:false
#==============================================================
org.quartz.scheduler.skipUpdateCheck = true
#============================================================================
# Configure Plugins
#============================================================================
org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownhook.cleanShutdown = true
5) 关键代码
/**
* 任务信息类
*
* @author Zach
* @date 2018年7月9日
* @title ScheduleJob
*/
@Table(name="task_schedule_job")
public class ScheduleJob implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
@Id
private Integer jobId;
private Date createTime;
private Date updateTime;
private String jobName;
private String jobGroup;
/**
* 任务状态 是否启动任务
*/
private String jobStatus;
/**
* cron表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任务执行时调用哪个类的方法 ,包名+类名
*/
private String beanClass;
/**
* 任务是否有状态
*/
private String isConcurrent;
/**
* spring bean
*/
private String springId;
/**
* 任务调用的方法名
*/
private String methodName;
public Integer getJobId() {
return jobId;
}
public void setJobId(Integer jobId) {
this.jobId = jobId;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getJobGroup() {
return jobGroup;
}
public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobStatus() {
return jobStatus;
}
public void setJobStatus(String jobStatus) {
this.jobStatus = jobStatus;
}
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getBeanClass() {
return beanClass;
}
public void setBeanClass(String beanClass) {
this.beanClass = beanClass;
}
public String getIsConcurrent() {
return isConcurrent;
}
public void setIsConcurrent(String isConcurrent) {
this.isConcurrent = isConcurrent;
}
public String getSpringId() {
return springId;
}
public void setSpringId(String springId) {
this.springId = springId;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
}
@Service
public class JobTaskServiceImpl implements JobTaskService {
public static final Logger log = LoggerFactory.getLogger("bizDataLogger");
@Autowired
private Scheduler scheduler;
@Autowired
private ScheduleJobMapper scheduleJobMapper;
/**
* 从数据库中取 区别于getAllJob
*
* @return
*/
public AppResponse getAllTask() {
AppResponse appResponse = new AppResponse();
List<ScheduleJob> list = scheduleJobMapper.select(null);
appResponse.setData(list);
return appResponse;
}
/**
* 添加到数据库中 区别于addJob
*/
public void addTask(ScheduleJob job) {
job.setCreateTime(new Date());
job.setUpdateTime(new Date());
scheduleJobMapper.insertSelective(job);
}
/**
* 从数据库中查询job
*/
public AppResponse getTaskById(Integer jobId) {
AppResponse appResponse = new AppResponse();
ScheduleJob scheduleJob = scheduleJobMapper.selectByPrimaryKey(jobId);
appResponse.setData(scheduleJob);
return appResponse;
}
/**
* 更改任务状态
*
* @throws Exception
*/
public void changeStatus(Integer jobId, String cmd) {
try {
ScheduleJob job =scheduleJobMapper.selectByPrimaryKey(jobId);
if (job == null) {
return;
}
if ("stop".equals(cmd)) {
deleteJob(job);
job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);
} else if ("start".equals(cmd)) {
job.setJobStatus(JobUtils.STATUS_RUNNING);
addJob(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
} catch (Exception e) {
log.error(e.getMessage());
}
}
/**
* 更改任务 cron表达式
*
* @throws Exception
*/
public void updateCron(Integer jobId, String cron) {
try {
ScheduleJob job = scheduleJobMapper.selectByPrimaryKey(jobId);
if (job == null) {
return;
}
job.setCronExpression(cron);
if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
updateJobCron(job);
}
scheduleJobMapper.updateByPrimaryKeySelective(job);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 添加任务
*
* @throws Exception
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void addJob(ScheduleJob job) {
try {
if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {
return;
}
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
log.debug(scheduler
+ ".......................................................................................add");
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 不存在,创建一个
if (null == trigger) {
Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class
: QuartzJobFactoryDisallowConcurrentExecution.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup())
.build();
jobDetail.getJobDataMap().put("scheduleJob", job);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup())
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
// Trigger已存在,那么更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
} catch (SchedulerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 获取所有计划中的任务列表
*
* @return
* @throws Exception
*/
public AppResponse getAllJob() {
AppResponse appResponse = new AppResponse();
List<ScheduleJob> jobList = new ArrayList<>();
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
jobList = new ArrayList<ScheduleJob>();
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
ScheduleJob job = new ScheduleJob();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
}
appResponse.setData(jobList);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return appResponse;
}
/**
* 所有正在运行的job
*
* @return
* @throws Exception
*/
public AppResponse getRunningJob() {
AppResponse appResponse = new AppResponse();
List<ScheduleJob> jobList = new ArrayList<>();
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
jobList = new ArrayList<ScheduleJob>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
ScheduleJob job = new ScheduleJob();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
appResponse.setData(jobList);
} catch (SchedulerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return appResponse;
}
/**
* 暂停一个job
*
* @param scheduleJob
* @throws Exception
*/
public void pauseJob(ScheduleJob scheduleJob) {
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 恢复一个job
*
* @param scheduleJob
* @throws Exception
*/
public void resumeJob(ScheduleJob scheduleJob) {
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 删除一个job
*
* @param scheduleJob
* @throws Exception
*/
public void deleteJob(ScheduleJob scheduleJob) {
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.deleteJob(jobKey);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 立即执行job
*
* @param scheduleJob
* @throws Exception
*/
public void runAJobNow(ScheduleJob scheduleJob) {
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.triggerJob(jobKey);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 更新job时间表达式
*
* @param scheduleJob
* @throws Exception
*/
public void updateJobCron(ScheduleJob scheduleJob) {
try {
//Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
scheduler.rescheduleJob(triggerKey, trigger);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobUtils {
public static final Logger log = LoggerFactory.getLogger("bizDataLogger");
//public final static Logger log = Logger.getLogger(JobUtils.class);
public static final String STATUS_RUNNING = "1"; //启动状态
public static final String STATUS_NOT_RUNNING = "0"; //未启动状态
public static final String CONCURRENT_IS = "1";
public static final String CONCURRENT_NOT = "0";
//private ApplicationContext ctx;
/**
* 通过反射调用scheduleJob中定义的方法
*
* @param scheduleJob
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) {
//SpringFactory instance = SpringFactory.getInstance();
Object object = null;
Class clazz = null;
log.info(Thread.currentThread()+"开始启动任务=======================");
if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {
// object = SpringUtils.getBean(scheduleJob.getSpringId());
//object = instance.getBean(scheduleJob.getSpringId());
object = SpringUtils.getBean(scheduleJob.getSpringId());
} else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) {
try {
log.info("通过类全名获取对象============================");
clazz = Class.forName(scheduleJob.getBeanClass());
object = clazz.newInstance();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e.getMessage());
}
}
if (object == null) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查是否配置正确!!!");
return;
}
clazz = object.getClass();
Method method = null;
try {
// method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class});
method = clazz.getMethod(scheduleJob.getMethodName(), null);
} catch (NoSuchMethodException e) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,方法名设置错误!!!");
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e.getMessage());
}
if (method != null) {
try {
// method.invoke(object, new Object[] {context});
method.invoke(object, null);
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e.getMessage());
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e.getMessage());
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error(e.getMessage());
}
}
log.info("任务名称 = [" + scheduleJob.getJobName() + "]----------启动成功");
}
}
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
public final class SpringUtils implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
/**
* 获取对象
*
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
@SuppressWarnings("unchecked")
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return beanFactory.containsBean(name);
}
/**
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。
* 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注册对象的类型
* @throws NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
*
* @param name
* @return
* @throws NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
}
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* 计划任务执行处 无状态
* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
* @author Zach
* @date 2018年6月29日
* @title QuartzJobFactory
*/
public class QuartzJobFactory implements Job {
public final Logger log = LoggerFactory.getLogger(this.getClass());
public QuartzJobFactory() {
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
JobUtils.invokMethod(scheduleJob, context);
}
}
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 若一个方法一次执行不完下次轮转时则等待该方法执行完后才执行下一次操作
* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误)
*
* @author Zach
* @date 2018年6月29日
* @title QuartzJobFactoryDisallowConcurrentExecution
*/
@DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {
public final Logger log = LoggerFactory.getLogger(this.getClass());
public QuartzJobFactoryDisallowConcurrentExecution() {
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
JobUtils.invokMethod(scheduleJob, context);
}
}
/**
*
* 项目启动时进行定时任务初始化
*
* @author Zach
* @date 2018年7月3日
* @title ScheduleJobInit
*/
public class ScheduleJobInit {
@Autowired
private ScheduleJobMapper scheduleJobMapper;
@Autowired
private JobTaskService jobTaskService;
public static final Logger log = LoggerFactory.getLogger("bizDataLogger");
/**
* 初始化方法
*
* void
*
*/
public void init() {
log.info("开始任务初始化了============================="+new Date());
// 这里获取任务信息数据
List<ScheduleJob> jobList = scheduleJobMapper.select(null);
if (jobList == null || jobList.size() == 0) {
scheduleJobMapper.addScheduleJob();
jobList = scheduleJobMapper.select(null);
}
// 添加任务到quartz中
for (ScheduleJob job : jobList) {
jobTaskService.addJob(job);
}
log.info("任务初始化结束了============================="+new Date());
}
}
初始化类采用配置init-method方式初始化,因为使用@PostConstruct注解初始化或者实现ServletContextListener在本项目的环境下会被初始化两次,具体原因是加载了两次配置文件,因此将初始化的类配置在application-mvc的配置文件中,解决了调用两次初始化的问题,但是在qrtz_scheduler_state中还是会看到两个实例,目前没有找到解决方案,希望知道的大神可以告知一下
6. 注意事项
1) SchedulerFactory无法注入的问题
在JobTaskServiceImpl中不要直接将recipeManageSchedule直接注入,那样会报空;它是一个工厂类,得到的不是它本身,而是它负责创建的org.quartz.impl.StdScheduler对象,只要把Service中的recipeManageSchedule用Scheduler替换即可;
2) 时间同步的问题:
Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。
节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。
3) quartz mysql死锁问题
quartz文档建议我们在集群环境下,最好将org.quartz.jobStore.txIsolationLevelSerializable设置为true;这个选项在mysql下非常容易出现死锁
这个选项的作用:
quartz需要提升隔离级别来保障自己的运作,不过,由于各数据库实现的隔离级别定义都不一样,所以quartz提供一个设置序列化这样的隔离级别存在,因为例如oracle中是没有未提交读和可重复读这样的隔离级别存在。但是由于mysql默认的是可重复读,比提交读高了一个级别,所以已经可以满足quartz集群的正常运行。
7. 参考资料
上一篇: 项目:用数组实现反弹球消砖块
下一篇: 小学生名字卡(HTML版可打印)