Spring回顾之八 —— Quartz在集群、分布式系统中的应用
程序员文章站
2022-07-12 18:26:15
...
在Quartz的使用中,简单的任务调度,我们直接在配置文件中进行配置就可以实现,如果需要再复杂点的,我们可以将任务执行信息在数据库中进行管理,然后对任务实现动态的更新,这些上一篇做了基本的介绍,当然这些应用都是基于单节点服务的。然而单节点应用是不能满足典型的企业需求的,假如你需要故障转移的能力并需要运行日益增多的任务调度,必须考虑Quartz集群的问题。使用Quartz集群的应用可以更好的支持更加丰富的业务需求,即使是其中某些机器服务崩溃了也能保证整体系统的正常运行。
在Quartz集群中,每一个节点是一个独立的应用,它同时又负责管理着其他的节点,每个节点的启动或停止是相互独立的行为,他们之间没有任何通信。那他们是怎么来和其他节点一起协调工作的呢?核心:数据库,Quartz应用节点是通过数据库来和另一节点的进行协作的,接下来先看看Quartz应用如何在数据库中体现。
第一步:Quartz储存方式的尝试
由于Quartz集群依赖于数据库,所以必须创建Quartz集群所需的数据库表,Quartz提供了几乎所有的数据库支持,并给出了现成的SQL建表脚本,这个我们可以去Quartz官网直接下载,当前可以访问 http://www.quartz-scheduler.org/downloads/ 链接,我们可以看到目前最新版的是 quartz-2.2.3
然后直接下载解压,我们可以得到一整套使用Quartz所需的jar包、示例以及各种文档,我们在docs/dbTables的路径下可以看到如下脚本,由于我们使用的是MySQL数据库进行测试使用,使用 tables_mysql.sql 文件即可
考虑到测试的独立性,我们新建一个名为quartz的数据库,然后将上边的脚本内容跑一下,可以看到生成了一组数据表
接下来,我们需要提供一下Quartz的配置文件,即quartz.properties,代码如下
接着定义一个实现Job接口的任务类
然后我们需要写一个主调程序来进行任务调度的实现,代码如下
这里面我们要注意几个点,Quartz在Scheduler的创建过程中会自己去读取加载quartz.properties中的相关信息,我们得确保配置文件信息是准确可用的。还要注意下程序中的注释信息,基本上涵盖了测试运行时的相关问题。
一切安排妥当,运行程序,可以看到输出栏打印出如下信息
我们可以看到任务运行的效果符合预期。在程序运行过程中,我们可以看到数据库的qrtz_triggers表中添加了一条信息
qrtz_job_details表中添加了如下信息
qrtz_cron_triggers表中添加了如下信息
这些信息将会在程序运行完被删掉,不方便看可以将程序中的 Thread.sleep(10*1000)调大或者将 scheduler.deleteJob(jobKey)注释掉即可。
通过测试,我们可以看到Quartz当前运行的调度信息都体现在数据库里,如果做好相关配置,多个Quartz节点都围绕这个库进行运行,就可以实现集群了。接下来我们试一下Quartz同Spring结合,实现集群功能。
第二步:Quartz与Spring一起整合实现集群
上一步做Quartz储存方式实践的时候,我们已经创建好了相关数据库和表,这里我们直接修改下原来的jdbc.properties文件,如下
接下来我们在原有的Spring配置文件applicationContext.xml上进行修改,代码如下
其中需要注意的是storeJobDetail的requestsRecovery属性值必须为true,当Quartz服务被中止后,再次启动或其他节点将会恢复执行之前未完成的所有任务。这里我们用的是连接池的方式来做的数据源配置,具体任务用的是原来的SimpleExtendsJob类,然后还需要重新配置下quartz.properties文件
这里比较重要的是org.quartz.jobStore.isClustered属性要配置为true,表明Scheduler实例要它参与到一个集群当中。然后打包部署,启动服务器,我们可以在输出栏看到如下内容
这说明在当前节点,Quartz已经是正常运行了,我们可以去看下数据库表中发生的变化,然后设置不同的tomcat服务端口启动,停掉其中的一个看下效果。
第三步:总结
集群通常有两种方式:节点在同一台机器上的称为垂直集群,垂直集群依赖于机器本身,机器崩溃了集群本身也就没意义了;节点放在不同的机器上的称为水平集群,水平集群可以避免单点故障的问题,但要注意个节点之间的机器时钟要保持同步,Quartz会在时钟不同步时出现运行异常,这个使用过程中一定要避免。关于集群时钟问题,比较简单的方式是使用Internet 时间服务器(Internet Time Server ITS)来解决。
最后我们在附件中添加了quartz-2.2.3的压缩包,需要可以直接下载。
在Quartz集群中,每一个节点是一个独立的应用,它同时又负责管理着其他的节点,每个节点的启动或停止是相互独立的行为,他们之间没有任何通信。那他们是怎么来和其他节点一起协调工作的呢?核心:数据库,Quartz应用节点是通过数据库来和另一节点的进行协作的,接下来先看看Quartz应用如何在数据库中体现。
第一步:Quartz储存方式的尝试
由于Quartz集群依赖于数据库,所以必须创建Quartz集群所需的数据库表,Quartz提供了几乎所有的数据库支持,并给出了现成的SQL建表脚本,这个我们可以去Quartz官网直接下载,当前可以访问 http://www.quartz-scheduler.org/downloads/ 链接,我们可以看到目前最新版的是 quartz-2.2.3
然后直接下载解压,我们可以得到一整套使用Quartz所需的jar包、示例以及各种文档,我们在docs/dbTables的路径下可以看到如下脚本,由于我们使用的是MySQL数据库进行测试使用,使用 tables_mysql.sql 文件即可
考虑到测试的独立性,我们新建一个名为quartz的数据库,然后将上边的脚本内容跑一下,可以看到生成了一组数据表
接下来,我们需要提供一下Quartz的配置文件,即quartz.properties,代码如下
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 3 org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.dataSource = quartzDS org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8 org.quartz.dataSource.quartzDS.user = root org.quartz.dataSource.quartzDS.password = 123456 org.quartz.dataSource.quartzDS.maxConnections = 5
接着定义一个实现Job接口的任务类
package test.demo.job.store; import org.apache.log4j.Logger; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class SampleStoreJob implements Job{ private Logger logger = Logger.getLogger(SampleStoreJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { logger.info("SampleStoreJob===========execute()"); } }
然后我们需要写一个主调程序来进行任务调度的实现,代码如下
package test.demo.job.store; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; public class SampleStoreQuartz { public void run() throws Exception{ //使用SchedulerFactory创建一个Scheduler SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); scheduler.clear(); //测试用,避免因为调度存在报错,可以在job未delete的情况下删掉看下效果 //定义一个具体的Job JobDetail jobDetail = JobBuilder.newJob(SampleStoreJob.class).withIdentity("sampleStoreJob", "sampleJobGroup").build(); //定义一个具体的Trigger CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");//具体的执行时间定义 CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("sampleStoreTrigger", "sampleTriggerGroup").withSchedule(scheduleBuilder).build(); //将Job和Trigger绑定至Scheduler scheduler.scheduleJob(jobDetail, trigger); scheduler.start();//启动运行 Thread.sleep(10*1000);//情节需要,10秒钟 //定义一个JobKey,用来做删除Job测试 JobKey jobKey = JobKey.jobKey("sampleStoreJob", "sampleJobGroup"); scheduler.deleteJob(jobKey); scheduler.shutdown();//关闭Scheduler } public static void main(String[] args) throws Exception{ SampleStoreQuartz sampleStoreQuartz = new SampleStoreQuartz(); sampleStoreQuartz.run(); } }
这里面我们要注意几个点,Quartz在Scheduler的创建过程中会自己去读取加载quartz.properties中的相关信息,我们得确保配置文件信息是准确可用的。还要注意下程序中的注释信息,基本上涵盖了测试运行时的相关问题。
一切安排妥当,运行程序,可以看到输出栏打印出如下信息
我们可以看到任务运行的效果符合预期。在程序运行过程中,我们可以看到数据库的qrtz_triggers表中添加了一条信息
qrtz_job_details表中添加了如下信息
qrtz_cron_triggers表中添加了如下信息
这些信息将会在程序运行完被删掉,不方便看可以将程序中的 Thread.sleep(10*1000)调大或者将 scheduler.deleteJob(jobKey)注释掉即可。
通过测试,我们可以看到Quartz当前运行的调度信息都体现在数据库里,如果做好相关配置,多个Quartz节点都围绕这个库进行运行,就可以实现集群了。接下来我们试一下Quartz同Spring结合,实现集群功能。
第二步:Quartz与Spring一起整合实现集群
上一步做Quartz储存方式实践的时候,我们已经创建好了相关数据库和表,这里我们直接修改下原来的jdbc.properties文件,如下
#MySQL驱动 jdbc.driver=com.mysql.cj.jdbc.Driver #在使用连接mysql的jdbc驱动最新版时,会遇到数据库和系统时区差异引起的问题,这时候有两种解决方案,一种是降版本,这个我们知道就行了,适可而行,还有一种是在jdbc连接的url后面加上serverTimezone=UTC或GMT即可,如果需要使用gmt+8时区,需要写成GMT%2B8,否则可能会被解析为空。 jdbc.url=jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8 jdbc.username=root jdbc.password=123456 #初始连接数 jdbc.initialSize=0 #定义最大连接数 jdbc.maxActive=20 #最大空闲 jdbc.maxIdle=20 #最小空闲 jdbc.minIdle=1 #最长等待时间 jdbc.maxWait=60000
接下来我们在原有的Spring配置文件applicationContext.xml上进行修改,代码如下
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd "> <!-- 加载配置文件 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:jdbc.properties" /> </bean> <!-- ========================= ORM BEGIN ========================= --> <!-- 数据源配置 --> <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"> <property name="driverClassName" value="${jdbc.driver}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> <!-- 初始化连接大小 --> <property name="initialSize" value="${jdbc.initialSize}"></property> <!-- 连接池最大数量 --> <property name="maxActive" value="${jdbc.maxActive}"></property> <!-- 连接池最大空闲 --> <property name="maxIdle" value="${jdbc.maxIdle}"></property> <!-- 连接池最小空闲 --> <property name="minIdle" value="${jdbc.minIdle}"></property> <!-- 获取连接最大等待时间 --> <property name="maxWait" value="${jdbc.maxWait}"></property> </bean> <!-- spring和MyBatis完美整合,不需要mybatis的配置映射文件,mapperLocations的设置将会自动扫描MyBatis的xml文件--> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource"/> <property name="mapperLocations" value="classpath*:test/demo/mapper/*Mapper.xml"/> </bean> <!-- DAO接口所在包名,Spring会自动寻找其路径下的接口 --> <bean id="demoDaoFactory" class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="test.demo.dao" /> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"></property> </bean> <!-- 数据事务管理 ( Spring允许允许 MyBatis参与到事务管理中,所以MyBatis没有特定的事务管理器,直接利用了Spring中的 DataSourceTransactionManager) --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <!-- ========================= ORM END ========================= --> <!-- ========================= Quartz BEGIN ========================= --> <bean id="storeJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <property name="jobClass" value="test.demo.job.SimpleExtendsJob"/> <property name="durability" value="true" /> <property name="requestsRecovery" value="true" /> </bean> <bean id="storeTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="storeJobDetail" /> <property name="cronExpression" value="0/5 * * * * ?" /> </bean> <bean name="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="dataSource" ref="dataSource"/> <property name="applicationContextSchedulerContextKey" value="applicationContextKey" /> <property name="configLocation" value="classpath:quartz.properties" /> <property name="triggers"> <list> <ref bean="storeTrigger" /> </list> </property> </bean> <!-- ========================= Quartz END ========================= --> </beans>
其中需要注意的是storeJobDetail的requestsRecovery属性值必须为true,当Quartz服务被中止后,再次启动或其他节点将会恢复执行之前未完成的所有任务。这里我们用的是连接池的方式来做的数据源配置,具体任务用的是原来的SimpleExtendsJob类,然后还需要重新配置下quartz.properties文件
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 3 org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.dataSource = quartzDS org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8 org.quartz.dataSource.quartzDS.user = root org.quartz.dataSource.quartzDS.password = 123456 org.quartz.dataSource.quartzDS.maxConnections = 5
这里比较重要的是org.quartz.jobStore.isClustered属性要配置为true,表明Scheduler实例要它参与到一个集群当中。然后打包部署,启动服务器,我们可以在输出栏看到如下内容
这说明在当前节点,Quartz已经是正常运行了,我们可以去看下数据库表中发生的变化,然后设置不同的tomcat服务端口启动,停掉其中的一个看下效果。
第三步:总结
集群通常有两种方式:节点在同一台机器上的称为垂直集群,垂直集群依赖于机器本身,机器崩溃了集群本身也就没意义了;节点放在不同的机器上的称为水平集群,水平集群可以避免单点故障的问题,但要注意个节点之间的机器时钟要保持同步,Quartz会在时钟不同步时出现运行异常,这个使用过程中一定要避免。关于集群时钟问题,比较简单的方式是使用Internet 时间服务器(Internet Time Server ITS)来解决。
最后我们在附件中添加了quartz-2.2.3的压缩包,需要可以直接下载。