欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot多数据源整合Quartz任务调度

程序员文章站 2022-05-01 19:22:13
...

pom.xml

org.springframework.boot spring-boot-starter-quartz org.springframework spring-context-support

application.yml

spring:
h2:
console:
enabled: true
path: /h2
datasource:
hikari:
view:
driver-class-name: oracle.jdbc.OracleDriver
jdbc-url: jdbc:oracle:thin:@192.168.0.118:1521/orcl
username: grademed
password: grademed2017ylz
h2:
driver-class-name: org.h2.Driver
jdbc-url: jdbc:h2:file:./src/main/resources/static/method_db;ACCESS_MODE_DATA=rws
username: grademed
password: grademed2017ylz
data: classpath:db/data.sql
platform: h2
initialization-mode: always

quartz:
job-store-type: jdbc
jdbc:
initialize-schema: embedded
properties:
org:
scheduler: clusteredScheduler
instanceId: AUTO
jobStore:
dataSource: myDB
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: QRTZ_
isClustered: true
clusterCheckinInterval: 10000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true
dataSource:
myDB:
user: grademed
password: grademed2017ylz
maxConnections: 10
driver: org.h2.Driver
URL: jdbc:h2:file:./src/main/resources/static/method_db

配置数据源

1.H2DataSourceConfig(h2数据源)

@Configuration
@MapperScan(basePackages = "com.ylz.dataSync.mapper.h2", sqlSessionTemplateRef = "h2SqlSessionTemplate")
public class H2DataSourceConfig {

    /**
     * 根据application.yml系统配置文件中,对应属性的前缀,指明使用其对应的数据
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari.h2")
    @Primary // 本人Quartz数据库表放在h2数据库里面所以要将此数据源设置为主数据源
    public DataSource h2DataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @DependsOn("h2DataSource")
    @Primary
    public SqlSessionFactory h2SqlSessionFactory() throws Exception {
        MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
        factoryBean.setDataSource(h2DataSource());
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("com/ylz/dataSync/mapper/xml/*.xml"));
        return factoryBean.getObject();
    }

    /**
     * DefaultSqlSession和SqlSessionTemplate都实现了SqlSession,但我们
     * 注入线程安全的SqlSessionTemplate,而不使用默认的线程不安全的DefaultSqlSession
     */
    @Bean
    @DependsOn("h2SqlSessionFactory")
    @Primary
    public SqlSessionTemplate h2SqlSessionTemplate() throws Exception {
        return new SqlSessionTemplate(h2SqlSessionFactory());
    }

2.oracle数据源

@Configuration
@MapperScan(basePackages = "com.ylz.dataSync.mapper.view", sqlSessionTemplateRef = "viewSqlSessionTemplate")
public class ViewDataSourceConfig {

    /**
     * 根据application.yml系统配置文件中,对应属性的前缀,指明使用其对应的数据
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari.view")
    public DataSource viewDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @DependsOn("viewDataSource")
    public SqlSessionFactory viewSqlSessionFactory() throws Exception {
        MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
        factoryBean.setDataSource(viewDataSource());
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("com/ylz/dataSync/mapper/xml/*.xml"));
        return factoryBean.getObject();
    }

    /**
     * DefaultSqlSession和SqlSessionTemplate都实现了SqlSession,但我们
     * 注入线程安全的SqlSessionTemplate,而不使用默认的线程不安全的DefaultSqlSession
     */
    @Bean
    @DependsOn("viewSqlSessionFactory")
    public SqlSessionTemplate viewSqlSessionTemplate() throws Exception {
        return new SqlSessionTemplate(viewSqlSessionFactory());
    }

}

Quartz核心代码

@Service("quartzJobFactory")
public class QuartzJobFactory extends QuartzJobBean {

    private final QuartzService quartzService;

    public QuartzJobFactory(QuartzService quartzService) {
        this.quartzService = quartzService;
    }

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext){
        ScheduleJob object = (ScheduleJob) jobExecutionContext.getMergedJobDataMap().get("scheduleJob");
        if(object.getMethodName()==null || object.getMethodName().equals("")){
            quartzService.executeTask(object.getBeanName());
        }else {
            quartzService.executeTask(object.getBeanName(),object.getMethodName());
        }
    }
}
public interface SchedulerJobService {

    List<ScheduleJob> getAllScheduleJob();

    void checkNotNull(ScheduleJob scheduleJob);

    List<ScheduleJob> getAllRunningJob() throws SchedulerException;

    void saveOrUpdate(ScheduleJob scheduleJob) throws Exception;

    public void pauseJob(String jobName, String jobGroup) throws SchedulerException;

    public void deleteJob(String jobName, String jobGroup) throws SchedulerException;

    public void runOneJob(String jobName, String jobGroup) throws SchedulerException;

    public void resumeJob(String jobName, String jobGroup) throws SchedulerException;
}
@Service("schedulerJobService")
public class SchedulerJobServiceImpl implements SchedulerJobService {

    private static final Logger logger = LoggerFactory.getLogger(SchedulerJobServiceImpl.class);

    private final Scheduler scheduler;

    private final ScheduleJobInService scheduleJobInService;

    public SchedulerJobServiceImpl(Scheduler scheduler, ScheduleJobInService scheduleJobInService) {
        this.scheduler = scheduler;
        this.scheduleJobInService = scheduleJobInService;
    }

    /**
     * 获取所有的任务
     * @return
     */
    @Override
    public List<ScheduleJob> getAllScheduleJob() {
        List<ScheduleJob> jobList = new ArrayList<>();
        GroupMatcher<JobKey> matcher = GroupMatcher.anyGroup();
        try {
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            for (JobKey key : jobKeys){
                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(key);
                for (Trigger trigger: triggers){
                    ScheduleJob scheduleJob = getScheduleJob(scheduler,key,trigger);
                    jobList.add(scheduleJob);
                }
            }
        } catch (SchedulerException e) {
            logger.error("[SchedulerJobServiceImpl] get the jobKeys is error:{}",e);
            //e.printStackTrace();
        }
        return jobList;
    }

    /**
     * 获取所有运行中的任务
     * @return
     * @throws SchedulerException
     */
    @Override
    public List<ScheduleJob> getAllRunningJob() throws SchedulerException {

        List<JobExecutionContext> executionJobList = scheduler.getCurrentlyExecutingJobs();
        List<ScheduleJob> jobList = new ArrayList<>();
        for (JobExecutionContext jobExecutionContext: executionJobList){
            JobDetail jobDetail = jobExecutionContext.getJobDetail();
            JobKey jobKey = jobDetail.getKey();
            Trigger trigger = jobExecutionContext.getTrigger();
            ScheduleJob scheduleJob = getScheduleJob(scheduler,jobKey,trigger);
            jobList.add(scheduleJob);
        }
        return jobList;
    }

    /**
     * 更新新的任务或者添加一个新的任务
     * @param scheduleJob
     * @throws Exception
     */
    @Override
    public void  saveOrUpdate(ScheduleJob scheduleJob) throws Exception{
        TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
        CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        if (cronTrigger==null){
            addJob(scheduleJob);
        }else {
            updateJobCronSchedule(scheduleJob);
        }
    }

    /**
     * 停止运行任务
     * @param jobName
     * @param jobGroup
     * @throws SchedulerException
     */
    @Override
    public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
//        ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName,jobGroup);
//        scheduleJob.setJobStatus("PAUSED");
//        scheduleJobInService.updateByPrimaryKey(scheduleJob);
        scheduler.pauseJob(jobKey);
    }

    /**
     * 删除一个任务
     * @param jobName
     * @param jobGroup
     * @throws SchedulerException
     */
    @Override
    public void deleteJob(String jobName, String jobGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        scheduleJobInService.deleteByJobNameAndJobGroup(jobName,jobGroup);
        scheduler.deleteJob(jobKey);
    }

    /**
     * 运行一个任务
     * @param jobName
     * @param jobGroup
     * @throws SchedulerException
     */
    @Override
    public void runOneJob(String jobName, String jobGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName, jobGroup);
        scheduleJob.setJobStatus("NORMAL");
        scheduleJobInService.updateByPrimaryKey(scheduleJob);
        scheduler.triggerJob(jobKey);
    }

    /**
     * 重启一个任务
     * @param jobName
     * @param jobGroup
     * @throws SchedulerException
     */
    @Override
    public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
        ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName,jobGroup);
        scheduleJob.setJobStatus("PAUSED");
        scheduler.resumeJob(jobKey);
    }


    /**
     * 添加任务
     * @param scheduleJob
     * @throws Exception
     */
    private void addJob(ScheduleJob scheduleJob) throws Exception{
        checkNotNull(scheduleJob);
        if (StringUtils.isBlank(scheduleJob.getCronExpression())){
            throw new Exception("[SchedulerJobServiceImpl] CronExpression不能为空");
        }
        scheduleJob.setJobStatus("NORMAL");
//        int id = scheduleJobInService.insertSelective(scheduleJob);
        logger.info("[SchedulerJobServiceImpl] the Primary key is:{}",scheduleJob.getId());

        scheduleJob.setJobId(scheduleJob.getId()+"");
        logger.info("[SchedulerJobServiceImpl] the scheduleJob is:{}",scheduleJob);
//        scheduleJobInService.updateByPrimaryKey(scheduleJob);
        JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class).withIdentity(scheduleJob.getJobName(),scheduleJob.getJobGroup())
                .build();
        jobDetail.getJobDataMap().put("scheduleJob",scheduleJob);
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
        CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(scheduleJob.getJobName(),scheduleJob.getJobGroup())
                .withSchedule(cronScheduleBuilder).build();
        scheduler.scheduleJob(jobDetail,cronTrigger);

    }

    /**
     * 更新一个任务
     * @param scheduleJob
     * @throws Exception
     */
    private void updateJobCronSchedule(ScheduleJob scheduleJob) throws Exception{
        checkNotNull(scheduleJob);
        if (StringUtils.isBlank(scheduleJob.getCronExpression())){
            throw new Exception("[SchedulerJobServiceImpl] CronExpression不能为空");
        }
        TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
        CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
        cronTrigger = cronTrigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
        JobDetail jobDetail=scheduler.getJobDetail(jobKey);
        jobDetail.getJobDataMap().put("scheduleJob",scheduleJob);
        scheduler.rescheduleJob(triggerKey,cronTrigger);
//        scheduleJobInService.updateByPrimaryKey(scheduleJob);

    }


    /**
     * 判断一个任务是否为空
     * @param scheduleJob
     */
    @Override
    public void checkNotNull(ScheduleJob scheduleJob) {
        if (scheduleJob==null){
            throw new IllegalStateException("scheduleJob为空,请检查");
        }
        if (scheduleJob.getJobName()==null || "".equals(scheduleJob.getJobName())){
            throw new IllegalStateException("scheduleJob的jobName为null,请检查");
        }
        if (scheduleJob.getJobGroup()==null || "".equals(scheduleJob.getJobGroup())){
            throw new IllegalStateException("scheduleJob的jobGroup为null,请检查");
        }
        if (scheduleJob.getBeanName()==null || "".equals(scheduleJob.getBeanName())){
            throw new IllegalStateException("scheduleJob的BeanName为null,请检查");
        }


    }



    private ScheduleJob getScheduleJob(Scheduler schedule, JobKey jobKey, Trigger trigger){
        ScheduleJob scheduleJob = new ScheduleJob();
        try {
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            scheduleJob = (ScheduleJob)jobDetail.getJobDataMap().get("scheduleJob");
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            scheduleJob.setJobStatus(triggerState.name());
            scheduleJob.setJobName(jobKey.getName());
            scheduleJob.setJobGroup(jobKey.getGroup());
            if (trigger instanceof CronTrigger){
                CronTrigger cronTrigger = (CronTrigger) trigger;
                scheduleJob.setCronExpression(cronTrigger.getCronExpression());
            }

        } catch (Exception e) {
            logger.error("[SchedulerJobServiceImpl] method getScheduleJob get JobDetail error:{}",e);
        }
        return scheduleJob;
    }

}

h2下Quartz 建表语句:
添加链接描述