欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java定时任务

程序员文章站 2022-05-23 11:08:09
...

定时任务的Java实现

就是计划任务啦,只是在项目中这样叫也就习惯了,参考项目中大神的实现。
目的:通过MySQL配置,可以从MySQL中读取参数,按时定时启动和关闭。数据库记录字段包括实现类名(默认为jobName、jobGroupName、triggerName、triggerGroupName),创建时间,调度规则(cron表达式),启动标志,启动参数。
实现:quartz,与Strus2管理Action的思路相似(ActionMapping->ActionProxy->Action)。对于多节点的考虑,也是通过对数据库记录的查询、添加、删除实现Lock(数据库记录是唯一的)。

相关的类有5个

备注
JobTimerController 适配原有项目结构,启动调度器执行定时任务
JobScheduler 静态调度器类,管理Job列表,对Job的CQUD操作
JobProxy IJob的代理类,每个任务对应一个线程实例,执行定时任务
JobLock 静态锁,防止多节点异步执行任务,基于数据库实现
IJob 函数接口,定时任务需要基于此实现

类之间的关系如下

java定时任务

下面说说大体思路如何实现

1.定时任务调度器启动类JobTimerController
注意resultList、jobList只是保存Job的description(Map形式),但resultList是初始启动配置,jobList是内存中正运行的任务,而JobScheduler才是实际对IJob子类(相关任务实例)的操作(启动中断等)。因此,当数据库中添加了新的定时任务时,需要启动该类通知调度器类JobScheduler执行新的任务(本身该类该类也要当作定时任务执行才合理嘛)。

public void work(){
    List resultList = /* 从数据库中获取任务 */
    ArrayList jobList = JobScheduler.getJobList();

    // 比较resultList与jobList
    //调度器已经运行在内存中且已已有任务在执行
    if (!ArrayUtils.isEmpty(jobList.toArray())){
            for(int i=0;i<jobList.size();i++){
                jobMap = (HashMap) jobList.get(i);
                String jobName = jobMap.get("jobName").toString();
                //resultList不存在,数据库中无记录则移除任务
                if(!exist(jobName)){
                    //从quartz中删除,调用Scheduler.deleteJob
                    JobScheduler.removeJob(jobMap);
                    //从调度器JobScheduler中删除
                    jobList.remove(jobMap);
                    i--;
                }
            }
    }

    //配置好在数据库中,启动新的任务
    if (!ArrayUtils.isEmpty(resultList.toArray())){
        for(int i=0;i<resultList.size();i++){
            jobMap = (HashMap) resultList.get(i);
            String jobName = jobMap.get("jobName").toString();

            //不在调度器中,则新建任务实例并启动执行
            if(!exist(jobName)){
                //添加到调度器JobScheduler
                jobList.add(jobMap);
                //调用Scheduler.scheduleJob启动定时任务
                JobScheduler.addJob(jobMap);
            }
            //已存在则校验启动标志等
            else{
                if( "Y".equals(jobMap.get("jobStart").toString()) )
                    JobScheduler.updateJob(jobMap);
            }
        }
    }
}

2.定时任务调度器类JobSchduler
封装了quartz的操作。

    private static SchedulerFactory sf = new StdSchedulerFactory();

    public static void addJob(HashMap jobMap) throws SchedulerException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
        String jobName = jobMap.get("jobName").toString();
        String jobGroupName = jobMap.get("jobGroupName").toString();
        String triggerName = jobMap.get("triggerName").toString();
        String triggerGroupName = jobMap.get("triggerGroupName").toString();
        String jobClass = jobMap.get("jobBussinessClass").toString();
        String time = jobMap.get("cronExpression").toString();
        Class tstxJobClass =  Class.forName(baseTaskClassName);
        Job tstxJob = (Job) tstxJobClass.newInstance();

        Scheduler sched = sf.getScheduler();
        JobDetail jobDetail = new JobDetail(jobName, jobGroupName,tstxJob.getClass());
        jobDetail.getJobDataMap().put("jobMap", jobMap);
        CronTrigger trigger = new CronTrigger(triggerName,triggerGroupName);
        trigger.setCronExpression(time);
        sched.scheduleJob(jobDetail,trigger);
        //启动
        if(!sched.isShutdown()){
            sched.start();
        }
    }

    public static void updateJob(HashMap jobMap) throws SchedulerException, ParseException {
        String triggerName = jobMap.get("triggerName").toString();
        String triggerGroupName = jobMap.get("triggerGroupName").toString();
        String time = jobMap.get("cronExpression").toString();

        Scheduler sched = sf.getScheduler();
        Trigger trigger = sched.getTrigger(triggerName, triggerGroupName);
        if(trigger!=null){
            CronTrigger ct = (CronTrigger) trigger;
            // 修改时间
            ct.setCronExpression(time);
            // 重启触发器
            sched.rescheduleJob(triggerName, triggerGroupName , ct);
        }
    }

    public static void removeJob(HashMap jobMap) throws SchedulerException{
        String jobName = jobMap.get("jobName").toString();
        String jobGroupName = jobMap.get("jobGroupName").toString();
        String triggerName = jobMap.get("triggerName").toString();
        String triggerGroupName = jobMap.get("triggerGroupName").toString();

        Scheduler sched = sf.getScheduler();
        sched.pauseTrigger(triggerName, triggerGroupName);//停止触发器
        sched.unscheduleJob(triggerName, triggerGroupName);//移除触发器
        sched.deleteJob(jobName, jobGroupName);
    }

    public static void interruptJob(HashMap jobMap) throws SchedulerException{
        String jobName = jobMap.get("jobName").toString();
        String jobGroupName = jobMap.get("jobGroupName").toString();
        String triggerName = jobMap.get("triggerName").toString();
        String triggerGroupName = jobMap.get("triggerGroupName").toString();

        Scheduler sched = sf.getScheduler();
        sched.interrupt(triggerName, triggerGroupName);//中断触发器
    }

3.定时任务代理类JobProxy
implements InterruptableJob,新开线程执行具体的定时任务

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        _interrupted = true;
    }


    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                JobDetail jobDetail = context.getJobDetail();
                HashMap jobMap = (HashMap)jobDetail.getJobDataMap().get("jobMap");

                //锁定任务
                JobLock.Lock(jobMap);

                //任务正在执行中, 中断当前线程任务..
                if (_interrupted) {
                    return;
                }

                try{
                    //创建具体的任务实现类并执行任务
                    String className = (String)jobMap.get("jobClass");
                    IBaseJob baseJob = Class.forName(className).newInstance();
                    baseJob.execute(context);
                }catch (JobExecutionException e){
                    /* 显示异常,打印日志 */
                }

                //解锁
                finally {
                    JobLock.unLock(jobMap);
                }
            }
        });
        thread.start();
    }

4.并发锁JobLock
基于数据库的实现,需要达到锁的效果时,写入相关信息到数据库记录,当数据库中存在相应记录时,则说明该任务已锁,其中lockJobName为PK。

    public synchronized static void lock(HashMap jobMap){
        //解析定时任务参数信息
        String jobName = jobMap.get("jobName").toString();//任务名称
        String jobGroupName = jobMap.get("jobGroupName").toString();//任务组名
        String lockJobName = "LOCK_" + jobName + "_" + jobGroupName;//redis锁定job名称

        HashMap lockJobMap = /* 通过lockJobName从数据库中获取Lock信息 */

        //未锁
        if (null == lockJobMap){
            //尝试将任务锁定
            boolean lock = /* 将lockJobName写入到数据库中 */
            if (!lock){
                //锁定失败, 不可执行定时任务
                JobSchduler.interruptJob(jobMap);
                return;
            }
        }

        //已锁,需判断定时任务锁定(过期)时间
        else{
            String jobStartTimeStr = lockJobMap.get("jobStartTime").toString();//任务执行时间
            String lockTime = StringUtils.isEmpty(jobMap.get("lockTime").toString()) ? "600" : jobMap.get("lockTime").toString();//锁定秒数
            Date jobStartTimeData = null;
            try {
                jobStartTimeData = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(jobStartTimeStr);//转换日期类型
            } catch (ParseException e) {
                e.printStackTrace();
            }
            Integer lockTimeInt = Integer.parseInt(lockTime);//计算锁定时长
            Date jobExceedDate = DateUtils.addSeconds(jobStartTimeData,lockTimeInt);//计算JOB过期时间
            Boolean isJobExceed = jobExceedDate.before(new Date());

            //锁定时间(开始时间+锁定时长)与当前时间比较
            //过期不可执行任务
            if(!isJobExceed){
                JobSchduler.interruptJob(jobMap);
            }
            //未过期,重锁
            else{
                unlock(jobMap);
                //尝试将任务锁定
                boolean lock = /* 将lockJobName写入到数据库中 */
                if (!lock){
                    //锁定失败, 不可执行定时任务
                    JobSchduler.interruptJob(jobMap);
                    return;
                }
            }
        }
    }

    public synchronized static void unlock(HashMap jobMap){
        String jobName = jobMap.get("jobName").toString();//任务名称
        String jobGroupName = jobMap.get("jobGroupName").toString();//任务组名
        String lockJobName = "LOCK_" + jobName + "_" +jobGroupName;//redis锁定job名称
        /* 通过lockJobName更新数据库中相应记录状态 */
    }

5.定时任务接口类IJob
实现定时任务的具体逻辑,就是quartz要求的那个。

public interface IJob {
    public void execute(JobExecutionContext context) throws JobExecutionException;
}
相关标签: java quartz