Spring Boot多数据源整合Quartz任务调度
pom.xml
org.springframework.boot spring-boot-starter-quartz org.springframework spring-context-supportapplication.yml
spring:
h2:
console:
enabled: true
path: /h2
datasource:
hikari:
view:
driver-class-name: oracle.jdbc.OracleDriver
jdbc-url: jdbc:oracle:thin:@192.168.0.118:1521/orcl
username: grademed
password: grademed2017ylz
h2:
driver-class-name: org.h2.Driver
jdbc-url: jdbc:h2:file:./src/main/resources/static/method_db;ACCESS_MODE_DATA=rws
username: grademed
password: grademed2017ylz
data: classpath:db/data.sql
platform: h2
initialization-mode: always
quartz:
job-store-type: jdbc
jdbc:
initialize-schema: embedded
properties:
org:
scheduler: clusteredScheduler
instanceId: AUTO
jobStore:
dataSource: myDB
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: QRTZ_
isClustered: true
clusterCheckinInterval: 10000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true
dataSource:
myDB:
user: grademed
password: grademed2017ylz
maxConnections: 10
driver: org.h2.Driver
URL: jdbc:h2:file:./src/main/resources/static/method_db
配置数据源
1.H2DataSourceConfig(h2数据源)
@Configuration
@MapperScan(basePackages = "com.ylz.dataSync.mapper.h2", sqlSessionTemplateRef = "h2SqlSessionTemplate")
public class H2DataSourceConfig {
/**
* 根据application.yml系统配置文件中,对应属性的前缀,指明使用其对应的数据
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari.h2")
@Primary // 本人Quartz数据库表放在h2数据库里面所以要将此数据源设置为主数据源
public DataSource h2DataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@DependsOn("h2DataSource")
@Primary
public SqlSessionFactory h2SqlSessionFactory() throws Exception {
MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
factoryBean.setDataSource(h2DataSource());
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("com/ylz/dataSync/mapper/xml/*.xml"));
return factoryBean.getObject();
}
/**
* DefaultSqlSession和SqlSessionTemplate都实现了SqlSession,但我们
* 注入线程安全的SqlSessionTemplate,而不使用默认的线程不安全的DefaultSqlSession
*/
@Bean
@DependsOn("h2SqlSessionFactory")
@Primary
public SqlSessionTemplate h2SqlSessionTemplate() throws Exception {
return new SqlSessionTemplate(h2SqlSessionFactory());
}
2.oracle数据源
@Configuration
@MapperScan(basePackages = "com.ylz.dataSync.mapper.view", sqlSessionTemplateRef = "viewSqlSessionTemplate")
public class ViewDataSourceConfig {
/**
* 根据application.yml系统配置文件中,对应属性的前缀,指明使用其对应的数据
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari.view")
public DataSource viewDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@DependsOn("viewDataSource")
public SqlSessionFactory viewSqlSessionFactory() throws Exception {
MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
factoryBean.setDataSource(viewDataSource());
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("com/ylz/dataSync/mapper/xml/*.xml"));
return factoryBean.getObject();
}
/**
* DefaultSqlSession和SqlSessionTemplate都实现了SqlSession,但我们
* 注入线程安全的SqlSessionTemplate,而不使用默认的线程不安全的DefaultSqlSession
*/
@Bean
@DependsOn("viewSqlSessionFactory")
public SqlSessionTemplate viewSqlSessionTemplate() throws Exception {
return new SqlSessionTemplate(viewSqlSessionFactory());
}
}
Quartz核心代码
@Service("quartzJobFactory")
public class QuartzJobFactory extends QuartzJobBean {
private final QuartzService quartzService;
public QuartzJobFactory(QuartzService quartzService) {
this.quartzService = quartzService;
}
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext){
ScheduleJob object = (ScheduleJob) jobExecutionContext.getMergedJobDataMap().get("scheduleJob");
if(object.getMethodName()==null || object.getMethodName().equals("")){
quartzService.executeTask(object.getBeanName());
}else {
quartzService.executeTask(object.getBeanName(),object.getMethodName());
}
}
}
public interface SchedulerJobService {
List<ScheduleJob> getAllScheduleJob();
void checkNotNull(ScheduleJob scheduleJob);
List<ScheduleJob> getAllRunningJob() throws SchedulerException;
void saveOrUpdate(ScheduleJob scheduleJob) throws Exception;
public void pauseJob(String jobName, String jobGroup) throws SchedulerException;
public void deleteJob(String jobName, String jobGroup) throws SchedulerException;
public void runOneJob(String jobName, String jobGroup) throws SchedulerException;
public void resumeJob(String jobName, String jobGroup) throws SchedulerException;
}
@Service("schedulerJobService")
public class SchedulerJobServiceImpl implements SchedulerJobService {
private static final Logger logger = LoggerFactory.getLogger(SchedulerJobServiceImpl.class);
private final Scheduler scheduler;
private final ScheduleJobInService scheduleJobInService;
public SchedulerJobServiceImpl(Scheduler scheduler, ScheduleJobInService scheduleJobInService) {
this.scheduler = scheduler;
this.scheduleJobInService = scheduleJobInService;
}
/**
* 获取所有的任务
* @return
*/
@Override
public List<ScheduleJob> getAllScheduleJob() {
List<ScheduleJob> jobList = new ArrayList<>();
GroupMatcher<JobKey> matcher = GroupMatcher.anyGroup();
try {
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
for (JobKey key : jobKeys){
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(key);
for (Trigger trigger: triggers){
ScheduleJob scheduleJob = getScheduleJob(scheduler,key,trigger);
jobList.add(scheduleJob);
}
}
} catch (SchedulerException e) {
logger.error("[SchedulerJobServiceImpl] get the jobKeys is error:{}",e);
//e.printStackTrace();
}
return jobList;
}
/**
* 获取所有运行中的任务
* @return
* @throws SchedulerException
*/
@Override
public List<ScheduleJob> getAllRunningJob() throws SchedulerException {
List<JobExecutionContext> executionJobList = scheduler.getCurrentlyExecutingJobs();
List<ScheduleJob> jobList = new ArrayList<>();
for (JobExecutionContext jobExecutionContext: executionJobList){
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = jobExecutionContext.getTrigger();
ScheduleJob scheduleJob = getScheduleJob(scheduler,jobKey,trigger);
jobList.add(scheduleJob);
}
return jobList;
}
/**
* 更新新的任务或者添加一个新的任务
* @param scheduleJob
* @throws Exception
*/
@Override
public void saveOrUpdate(ScheduleJob scheduleJob) throws Exception{
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (cronTrigger==null){
addJob(scheduleJob);
}else {
updateJobCronSchedule(scheduleJob);
}
}
/**
* 停止运行任务
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
@Override
public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
// ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName,jobGroup);
// scheduleJob.setJobStatus("PAUSED");
// scheduleJobInService.updateByPrimaryKey(scheduleJob);
scheduler.pauseJob(jobKey);
}
/**
* 删除一个任务
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
@Override
public void deleteJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
scheduleJobInService.deleteByJobNameAndJobGroup(jobName,jobGroup);
scheduler.deleteJob(jobKey);
}
/**
* 运行一个任务
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
@Override
public void runOneJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName, jobGroup);
scheduleJob.setJobStatus("NORMAL");
scheduleJobInService.updateByPrimaryKey(scheduleJob);
scheduler.triggerJob(jobKey);
}
/**
* 重启一个任务
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
@Override
public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
ScheduleJob scheduleJob = scheduleJobInService.selectByJobNameAngJobGroup(jobName,jobGroup);
scheduleJob.setJobStatus("PAUSED");
scheduler.resumeJob(jobKey);
}
/**
* 添加任务
* @param scheduleJob
* @throws Exception
*/
private void addJob(ScheduleJob scheduleJob) throws Exception{
checkNotNull(scheduleJob);
if (StringUtils.isBlank(scheduleJob.getCronExpression())){
throw new Exception("[SchedulerJobServiceImpl] CronExpression不能为空");
}
scheduleJob.setJobStatus("NORMAL");
// int id = scheduleJobInService.insertSelective(scheduleJob);
logger.info("[SchedulerJobServiceImpl] the Primary key is:{}",scheduleJob.getId());
scheduleJob.setJobId(scheduleJob.getId()+"");
logger.info("[SchedulerJobServiceImpl] the scheduleJob is:{}",scheduleJob);
// scheduleJobInService.updateByPrimaryKey(scheduleJob);
JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class).withIdentity(scheduleJob.getJobName(),scheduleJob.getJobGroup())
.build();
jobDetail.getJobDataMap().put("scheduleJob",scheduleJob);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(scheduleJob.getJobName(),scheduleJob.getJobGroup())
.withSchedule(cronScheduleBuilder).build();
scheduler.scheduleJob(jobDetail,cronTrigger);
}
/**
* 更新一个任务
* @param scheduleJob
* @throws Exception
*/
private void updateJobCronSchedule(ScheduleJob scheduleJob) throws Exception{
checkNotNull(scheduleJob);
if (StringUtils.isBlank(scheduleJob.getCronExpression())){
throw new Exception("[SchedulerJobServiceImpl] CronExpression不能为空");
}
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
cronTrigger = cronTrigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(),scheduleJob.getJobGroup());
JobDetail jobDetail=scheduler.getJobDetail(jobKey);
jobDetail.getJobDataMap().put("scheduleJob",scheduleJob);
scheduler.rescheduleJob(triggerKey,cronTrigger);
// scheduleJobInService.updateByPrimaryKey(scheduleJob);
}
/**
* 判断一个任务是否为空
* @param scheduleJob
*/
@Override
public void checkNotNull(ScheduleJob scheduleJob) {
if (scheduleJob==null){
throw new IllegalStateException("scheduleJob为空,请检查");
}
if (scheduleJob.getJobName()==null || "".equals(scheduleJob.getJobName())){
throw new IllegalStateException("scheduleJob的jobName为null,请检查");
}
if (scheduleJob.getJobGroup()==null || "".equals(scheduleJob.getJobGroup())){
throw new IllegalStateException("scheduleJob的jobGroup为null,请检查");
}
if (scheduleJob.getBeanName()==null || "".equals(scheduleJob.getBeanName())){
throw new IllegalStateException("scheduleJob的BeanName为null,请检查");
}
}
private ScheduleJob getScheduleJob(Scheduler schedule, JobKey jobKey, Trigger trigger){
ScheduleJob scheduleJob = new ScheduleJob();
try {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
scheduleJob = (ScheduleJob)jobDetail.getJobDataMap().get("scheduleJob");
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
scheduleJob.setJobStatus(triggerState.name());
scheduleJob.setJobName(jobKey.getName());
scheduleJob.setJobGroup(jobKey.getGroup());
if (trigger instanceof CronTrigger){
CronTrigger cronTrigger = (CronTrigger) trigger;
scheduleJob.setCronExpression(cronTrigger.getCronExpression());
}
} catch (Exception e) {
logger.error("[SchedulerJobServiceImpl] method getScheduleJob get JobDetail error:{}",e);
}
return scheduleJob;
}
}
h2下Quartz 建表语句:
添加链接描述
上一篇: Spring Boot应用中进行任务调度
下一篇: Linux后台运行java的jar包