java定时任务
定时任务的Java实现
就是计划任务啦,只是在项目中这样叫也就习惯了,参考项目中大神的实现。
目的:通过MySQL配置,可以从MySQL中读取参数,按时定时启动和关闭。数据库记录字段包括实现类名(默认为jobName、jobGroupName、triggerName、triggerGroupName),创建时间,调度规则(cron表达式),启动标志,启动参数。
实现:quartz,与Strus2管理Action的思路相似(ActionMapping->ActionProxy->Action)。对于多节点的考虑,也是通过对数据库记录的查询、添加、删除实现Lock(数据库记录是唯一的)。
相关的类有5个
类 | 备注 |
---|---|
JobTimerController | 适配原有项目结构,启动调度器执行定时任务 |
JobScheduler | 静态调度器类,管理Job列表,对Job的CQUD操作 |
JobProxy | IJob的代理类,每个任务对应一个线程实例,执行定时任务 |
JobLock | 静态锁,防止多节点异步执行任务,基于数据库实现 |
IJob | 函数接口,定时任务需要基于此实现 |
类之间的关系如下
下面说说大体思路如何实现
1.定时任务调度器启动类JobTimerController
注意resultList、jobList只是保存Job的description(Map形式),但resultList是初始启动配置,jobList是内存中正运行的任务,而JobScheduler才是实际对IJob子类(相关任务实例)的操作(启动中断等)。因此,当数据库中添加了新的定时任务时,需要启动该类通知调度器类JobScheduler执行新的任务(本身该类该类也要当作定时任务执行才合理嘛)。
public void work(){
List resultList = /* 从数据库中获取任务 */
ArrayList jobList = JobScheduler.getJobList();
// 比较resultList与jobList
//调度器已经运行在内存中且已已有任务在执行
if (!ArrayUtils.isEmpty(jobList.toArray())){
for(int i=0;i<jobList.size();i++){
jobMap = (HashMap) jobList.get(i);
String jobName = jobMap.get("jobName").toString();
//resultList不存在,数据库中无记录则移除任务
if(!exist(jobName)){
//从quartz中删除,调用Scheduler.deleteJob
JobScheduler.removeJob(jobMap);
//从调度器JobScheduler中删除
jobList.remove(jobMap);
i--;
}
}
}
//配置好在数据库中,启动新的任务
if (!ArrayUtils.isEmpty(resultList.toArray())){
for(int i=0;i<resultList.size();i++){
jobMap = (HashMap) resultList.get(i);
String jobName = jobMap.get("jobName").toString();
//不在调度器中,则新建任务实例并启动执行
if(!exist(jobName)){
//添加到调度器JobScheduler
jobList.add(jobMap);
//调用Scheduler.scheduleJob启动定时任务
JobScheduler.addJob(jobMap);
}
//已存在则校验启动标志等
else{
if( "Y".equals(jobMap.get("jobStart").toString()) )
JobScheduler.updateJob(jobMap);
}
}
}
}
2.定时任务调度器类JobSchduler
封装了quartz的操作。
private static SchedulerFactory sf = new StdSchedulerFactory();
public static void addJob(HashMap jobMap) throws SchedulerException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
String jobName = jobMap.get("jobName").toString();
String jobGroupName = jobMap.get("jobGroupName").toString();
String triggerName = jobMap.get("triggerName").toString();
String triggerGroupName = jobMap.get("triggerGroupName").toString();
String jobClass = jobMap.get("jobBussinessClass").toString();
String time = jobMap.get("cronExpression").toString();
Class tstxJobClass = Class.forName(baseTaskClassName);
Job tstxJob = (Job) tstxJobClass.newInstance();
Scheduler sched = sf.getScheduler();
JobDetail jobDetail = new JobDetail(jobName, jobGroupName,tstxJob.getClass());
jobDetail.getJobDataMap().put("jobMap", jobMap);
CronTrigger trigger = new CronTrigger(triggerName,triggerGroupName);
trigger.setCronExpression(time);
sched.scheduleJob(jobDetail,trigger);
//启动
if(!sched.isShutdown()){
sched.start();
}
}
public static void updateJob(HashMap jobMap) throws SchedulerException, ParseException {
String triggerName = jobMap.get("triggerName").toString();
String triggerGroupName = jobMap.get("triggerGroupName").toString();
String time = jobMap.get("cronExpression").toString();
Scheduler sched = sf.getScheduler();
Trigger trigger = sched.getTrigger(triggerName, triggerGroupName);
if(trigger!=null){
CronTrigger ct = (CronTrigger) trigger;
// 修改时间
ct.setCronExpression(time);
// 重启触发器
sched.rescheduleJob(triggerName, triggerGroupName , ct);
}
}
public static void removeJob(HashMap jobMap) throws SchedulerException{
String jobName = jobMap.get("jobName").toString();
String jobGroupName = jobMap.get("jobGroupName").toString();
String triggerName = jobMap.get("triggerName").toString();
String triggerGroupName = jobMap.get("triggerGroupName").toString();
Scheduler sched = sf.getScheduler();
sched.pauseTrigger(triggerName, triggerGroupName);//停止触发器
sched.unscheduleJob(triggerName, triggerGroupName);//移除触发器
sched.deleteJob(jobName, jobGroupName);
}
public static void interruptJob(HashMap jobMap) throws SchedulerException{
String jobName = jobMap.get("jobName").toString();
String jobGroupName = jobMap.get("jobGroupName").toString();
String triggerName = jobMap.get("triggerName").toString();
String triggerGroupName = jobMap.get("triggerGroupName").toString();
Scheduler sched = sf.getScheduler();
sched.interrupt(triggerName, triggerGroupName);//中断触发器
}
3.定时任务代理类JobProxy
implements InterruptableJob,新开线程执行具体的定时任务
@Override
public void interrupt() throws UnableToInterruptJobException {
_interrupted = true;
}
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
JobDetail jobDetail = context.getJobDetail();
HashMap jobMap = (HashMap)jobDetail.getJobDataMap().get("jobMap");
//锁定任务
JobLock.Lock(jobMap);
//任务正在执行中, 中断当前线程任务..
if (_interrupted) {
return;
}
try{
//创建具体的任务实现类并执行任务
String className = (String)jobMap.get("jobClass");
IBaseJob baseJob = Class.forName(className).newInstance();
baseJob.execute(context);
}catch (JobExecutionException e){
/* 显示异常,打印日志 */
}
//解锁
finally {
JobLock.unLock(jobMap);
}
}
});
thread.start();
}
4.并发锁JobLock
基于数据库的实现,需要达到锁的效果时,写入相关信息到数据库记录,当数据库中存在相应记录时,则说明该任务已锁,其中lockJobName为PK。
public synchronized static void lock(HashMap jobMap){
//解析定时任务参数信息
String jobName = jobMap.get("jobName").toString();//任务名称
String jobGroupName = jobMap.get("jobGroupName").toString();//任务组名
String lockJobName = "LOCK_" + jobName + "_" + jobGroupName;//redis锁定job名称
HashMap lockJobMap = /* 通过lockJobName从数据库中获取Lock信息 */
//未锁
if (null == lockJobMap){
//尝试将任务锁定
boolean lock = /* 将lockJobName写入到数据库中 */
if (!lock){
//锁定失败, 不可执行定时任务
JobSchduler.interruptJob(jobMap);
return;
}
}
//已锁,需判断定时任务锁定(过期)时间
else{
String jobStartTimeStr = lockJobMap.get("jobStartTime").toString();//任务执行时间
String lockTime = StringUtils.isEmpty(jobMap.get("lockTime").toString()) ? "600" : jobMap.get("lockTime").toString();//锁定秒数
Date jobStartTimeData = null;
try {
jobStartTimeData = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(jobStartTimeStr);//转换日期类型
} catch (ParseException e) {
e.printStackTrace();
}
Integer lockTimeInt = Integer.parseInt(lockTime);//计算锁定时长
Date jobExceedDate = DateUtils.addSeconds(jobStartTimeData,lockTimeInt);//计算JOB过期时间
Boolean isJobExceed = jobExceedDate.before(new Date());
//锁定时间(开始时间+锁定时长)与当前时间比较
//过期不可执行任务
if(!isJobExceed){
JobSchduler.interruptJob(jobMap);
}
//未过期,重锁
else{
unlock(jobMap);
//尝试将任务锁定
boolean lock = /* 将lockJobName写入到数据库中 */
if (!lock){
//锁定失败, 不可执行定时任务
JobSchduler.interruptJob(jobMap);
return;
}
}
}
}
public synchronized static void unlock(HashMap jobMap){
String jobName = jobMap.get("jobName").toString();//任务名称
String jobGroupName = jobMap.get("jobGroupName").toString();//任务组名
String lockJobName = "LOCK_" + jobName + "_" +jobGroupName;//redis锁定job名称
/* 通过lockJobName更新数据库中相应记录状态 */
}
5.定时任务接口类IJob
实现定时任务的具体逻辑,就是quartz要求的那个。
public interface IJob {
public void execute(JobExecutionContext context) throws JobExecutionException;
}
上一篇: sql_03_多表关系_级联操作
下一篇: 表、栈、队列