欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring回顾之八 —— Quartz在集群、分布式系统中的应用

程序员文章站 2022-07-12 18:27:51
...
    在Quartz的使用中,简单的任务调度,我们直接在配置文件中进行配置就可以实现,如果需要再复杂点的,我们可以将任务执行信息在数据库中进行管理,然后对任务实现动态的更新,这些上一篇做了基本的介绍,当然这些应用都是基于单节点服务的。然而单节点应用是不能满足典型的企业需求的,假如你需要故障转移的能力并需要运行日益增多的任务调度,必须考虑Quartz集群的问题。使用Quartz集群的应用可以更好的支持更加丰富的业务需求,即使是其中某些机器服务崩溃了也能保证整体系统的正常运行。

    在Quartz集群中,每一个节点是一个独立的应用,它同时又负责管理着其他的节点,每个节点的启动或停止是相互独立的行为,他们之间没有任何通信。那他们是怎么来和其他节点一起协调工作的呢?核心:数据库,Quartz应用节点是通过数据库来和另一节点的进行协作的,接下来先看看Quartz应用如何在数据库中体现。

第一步:Quartz储存方式的尝试
    由于Quartz集群依赖于数据库,所以必须创建Quartz集群所需的数据库表,Quartz提供了几乎所有的数据库支持,并给出了现成的SQL建表脚本,这个我们可以去Quartz官网直接下载,当前可以访问 http://www.quartz-scheduler.org/downloads/ 链接,我们可以看到目前最新版的是 quartz-2.2.3
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    然后直接下载解压,我们可以得到一整套使用Quartz所需的jar包、示例以及各种文档,我们在docs/dbTables的路径下可以看到如下脚本,由于我们使用的是MySQL数据库进行测试使用,使用 tables_mysql.sql 文件即可
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    考虑到测试的独立性,我们新建一个名为quartz的数据库,然后将上边的脚本内容跑一下,可以看到生成了一组数据表
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    接下来,我们需要提供一下Quartz的配置文件,即quartz.properties,代码如下
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 3

org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.dataSource = quartzDS

org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
org.quartz.dataSource.quartzDS.user = root
org.quartz.dataSource.quartzDS.password = 123456
org.quartz.dataSource.quartzDS.maxConnections = 5


    接着定义一个实现Job接口的任务类
package test.demo.job.store;

import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class SampleStoreJob implements Job{
	
	private Logger logger = Logger.getLogger(SampleStoreJob.class); 

	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {
		logger.info("SampleStoreJob===========execute()");
	}

}

    然后我们需要写一个主调程序来进行任务调度的实现,代码如下
package test.demo.job.store;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class SampleStoreQuartz {

	public void run() throws Exception{
		//使用SchedulerFactory创建一个Scheduler
		SchedulerFactory schedulerFactory = new StdSchedulerFactory();
		Scheduler scheduler = schedulerFactory.getScheduler();
		scheduler.clear(); //测试用,避免因为调度存在报错,可以在job未delete的情况下删掉看下效果

		//定义一个具体的Job
		JobDetail jobDetail = JobBuilder.newJob(SampleStoreJob.class).withIdentity("sampleStoreJob", "sampleJobGroup").build();
		//定义一个具体的Trigger
		CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");//具体的执行时间定义
		CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("sampleStoreTrigger", "sampleTriggerGroup").withSchedule(scheduleBuilder).build();
		//将Job和Trigger绑定至Scheduler
		scheduler.scheduleJob(jobDetail, trigger);
		scheduler.start();//启动运行
		
		Thread.sleep(10*1000);//情节需要,10秒钟
		//定义一个JobKey,用来做删除Job测试
		JobKey jobKey = JobKey.jobKey("sampleStoreJob", "sampleJobGroup");
		scheduler.deleteJob(jobKey);
		
		scheduler.shutdown();//关闭Scheduler
	}
	
	public static void main(String[] args) throws Exception{
		SampleStoreQuartz sampleStoreQuartz = new SampleStoreQuartz();
		sampleStoreQuartz.run();
	}
}


    这里面我们要注意几个点,Quartz在Scheduler的创建过程中会自己去读取加载quartz.properties中的相关信息,我们得确保配置文件信息是准确可用的。还要注意下程序中的注释信息,基本上涵盖了测试运行时的相关问题。
    一切安排妥当,运行程序,可以看到输出栏打印出如下信息
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    我们可以看到任务运行的效果符合预期。在程序运行过程中,我们可以看到数据库的qrtz_triggers表中添加了一条信息
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    qrtz_job_details表中添加了如下信息
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    qrtz_cron_triggers表中添加了如下信息
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    这些信息将会在程序运行完被删掉,不方便看可以将程序中的 Thread.sleep(10*1000)调大或者将 scheduler.deleteJob(jobKey)注释掉即可。
    通过测试,我们可以看到Quartz当前运行的调度信息都体现在数据库里,如果做好相关配置,多个Quartz节点都围绕这个库进行运行,就可以实现集群了。接下来我们试一下Quartz同Spring结合,实现集群功能。

第二步:Quartz与Spring一起整合实现集群
    上一步做Quartz储存方式实践的时候,我们已经创建好了相关数据库和表,这里我们直接修改下原来的jdbc.properties文件,如下
#MySQL驱动
jdbc.driver=com.mysql.cj.jdbc.Driver
#在使用连接mysql的jdbc驱动最新版时,会遇到数据库和系统时区差异引起的问题,这时候有两种解决方案,一种是降版本,这个我们知道就行了,适可而行,还有一种是在jdbc连接的url后面加上serverTimezone=UTC或GMT即可,如果需要使用gmt+8时区,需要写成GMT%2B8,否则可能会被解析为空。
jdbc.url=jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
jdbc.username=root
jdbc.password=123456
#初始连接数
jdbc.initialSize=0
#定义最大连接数
jdbc.maxActive=20
#最大空闲
jdbc.maxIdle=20
#最小空闲
jdbc.minIdle=1
#最长等待时间
jdbc.maxWait=60000

    接下来我们在原有的Spring配置文件applicationContext.xml上进行修改,代码如下
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd 
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd ">

    <!-- 加载配置文件 -->
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="location" value="classpath:jdbc.properties" />
    </bean>
    <!-- ========================= ORM BEGIN  ========================= -->
    <!-- 数据源配置 -->
    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
        <property name="driverClassName" value="${jdbc.driver}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="${jdbc.initialSize}"></property>
        <!-- 连接池最大数量 -->
        <property name="maxActive" value="${jdbc.maxActive}"></property>
        <!-- 连接池最大空闲 -->
        <property name="maxIdle" value="${jdbc.maxIdle}"></property>
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="${jdbc.minIdle}"></property>
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="${jdbc.maxWait}"></property>
    </bean> 
    <!-- spring和MyBatis完美整合,不需要mybatis的配置映射文件,mapperLocations的设置将会自动扫描MyBatis的xml文件-->  
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="dataSource"/>
        <property name="mapperLocations" value="classpath*:test/demo/mapper/*Mapper.xml"/>
    </bean>
    <!-- DAO接口所在包名,Spring会自动寻找其路径下的接口 -->  
    <bean id="demoDaoFactory" class="org.mybatis.spring.mapper.MapperScannerConfigurer">  
        <property name="basePackage" value="test.demo.dao" />  
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"></property>  
    </bean>
    <!-- 数据事务管理 ( Spring允许允许 MyBatis参与到事务管理中,所以MyBatis没有特定的事务管理器,直接利用了Spring中的 DataSourceTransactionManager) -->  
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="dataSource"/>
    </bean> 
    <!-- ========================= ORM END  ========================= -->
    <!-- ========================= Quartz BEGIN  ========================= -->
    <bean id="storeJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
    	<property name="jobClass" value="test.demo.job.SimpleExtendsJob"/>
        <property name="durability" value="true" />
        <property name="requestsRecovery" value="true" />
    </bean>
    <bean id="storeTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
        <property name="jobDetail" ref="storeJobDetail" />
        <property name="cronExpression" value="0/5 * * * * ?" />
    </bean>
    <bean name="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
        <property name="configLocation" value="classpath:quartz.properties" />
        <property name="triggers">
            <list>
                <ref bean="storeTrigger" />
            </list>
        </property>
    </bean>
	<!-- ========================= Quartz END  ========================= -->
	 
</beans> 

    其中需要注意的是storeJobDetail的requestsRecovery属性值必须为true,当Quartz服务被中止后,再次启动或其他节点将会恢复执行之前未完成的所有任务。这里我们用的是连接池的方式来做的数据源配置,具体任务用的是原来的SimpleExtendsJob类,然后还需要重新配置下quartz.properties文件
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 3

org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.dataSource = quartzDS

org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
org.quartz.dataSource.quartzDS.user = root
org.quartz.dataSource.quartzDS.password = 123456
org.quartz.dataSource.quartzDS.maxConnections = 5

    这里比较重要的是org.quartz.jobStore.isClustered属性要配置为true,表明Scheduler实例要它参与到一个集群当中。然后打包部署,启动服务器,我们可以在输出栏看到如下内容
    Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群

    这说明在当前节点,Quartz已经是正常运行了,我们可以去看下数据库表中发生的变化,然后设置不同的tomcat服务端口启动,停掉其中的一个看下效果。

第三步:总结
    集群通常有两种方式:节点在同一台机器上的称为垂直集群,垂直集群依赖于机器本身,机器崩溃了集群本身也就没意义了;节点放在不同的机器上的称为水平集群,水平集群可以避免单点故障的问题,但要注意个节点之间的机器时钟要保持同步,Quartz会在时钟不同步时出现运行异常,这个使用过程中一定要避免。关于集群时钟问题,比较简单的方式是使用Internet 时间服务器(Internet Time Server ITS)来解决。
    最后我们在附件中添加了quartz-2.2.3的压缩包,需要可以直接下载。





  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 7.9 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 3.7 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 59.1 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 8.7 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 13.9 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 13.4 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 11.8 KB
  • Spring回顾之八 —— Quartz在集群、分布式系统中的应用
            
    
    博客分类: JavaSpringQuartzMySQL quartzspringmysqljava集群
  • 大小: 3.4 KB