基于spring+quartz的分布式定时任务框架实现
问题背景
我公司是一个快速发展的创业公司,目前有200人,主要业务是旅游和酒店相关的,应用迭代更新周期比较快,因此,开发人员花费了更多的时间去更=跟上迭代的步伐,而缺乏了对整个系统的把控
没有集群之前,公司定时任务的实现方式
在初期应用的访问量并不是那么大,一台服务器完全满足使用,应用中有很多定时任务需要执行
有了集群之后,公司定时任务实现的方式
随着用户的增加,访问量也就随之增加,一台服务器满足不了高并发的要求,因此公司把应用给部署到集群中,前端通过nginx代理(应用服务器ip可能是用防火墙进行了隔离,避免了直接使用ip+端口+应用名访问的方式)。
在集群环境中,同样的定时任务,在集群中的每台机器都会执行,这样定时任务就会重复执行,不但会增加服务器的负担,还会因为定时任务重复执行造成额外的不可预期的错误,因此公司的解决方案是:根据集群的数量,来把定时任务中的任务平均分到集群中的每台机器上(这里的平均分是指以前一个定时任务本来是在一台机器上运行,先在人为的把这个任务分成几部分,让所有的机器都去执行这个人去)
目前集群中定时任务实现方式的缺陷
目前公司在集群中处理定时任务的方式不是正真的分布式处理方式,而是一种伪分布式(公司内部俗称土方法),这种方式存在一个明显的缺陷就是当集群中机器宕机,那么整个定时任务就会挂掉或者不能一次性跑完,会对业务产生严重的影响
针对缺陷的解决方案(本文的重点之处)
利用spring+quartz构建一套真正的分布式定时任务系统,经过查阅相关资料得知:quartz框架是原生就支持分布式定时任务的
开发ide:intellij idea
jdk版本:1.8
spring版本:4.2.6
quartz版本:2.2.1
spring与quartz集成配置
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="com.aaron.clusterquartz.job"/> <bean name="datasource" class="org.springframework.jndi.jndiobjectfactorybean"> <!-- tomcat --> <!--<property name="jndiname" value="java:comp/env/jndi/mysql/quartz"/>--> <!-- jboss --> <property name="jndiname" value="jdbc/quartz"/> </bean> <!-- 分布式事务配置 start --> <!-- 配置线程池--> <bean name="executor" class="org.springframework.scheduling.concurrent.threadpooltaskexecutor"> <property name="corepoolsize" value="15"/> <property name="maxpoolsize" value="25"/> <property name="queuecapacity" value="100"/> </bean> <bean name="transactionmanager" class="org.springframework.jdbc.datasource.datasourcetransactionmanager"> <property name="datasource" ref="datasource"/> </bean> <!-- 配置调度任务--> <bean name="quartzscheduler" class="org.springframework.scheduling.quartz.schedulerfactorybean"> <property name="configlocation" value="classpath:quartz.properties"/> <property name="datasource" ref="datasource"/> <property name="transactionmanager" ref="transactionmanager"/> <!-- 任务唯一的名称,将会持久化到数据库--> <property name="schedulername" value="basescheduler"/> <!-- 每台集群机器部署应用的时候会更新触发器--> <property name="overwriteexistingjobs" value="true"/> <property name="applicationcontextschedulercontextkey" value="appli"/> <property name="jobfactory"> <bean class="com.aaron.clusterquartz.autowired.autowiringspringbeanjobfactory"/> </property> <property name="triggers"> <list> <ref bean="printcurrenttimescheduler"/> </list> </property> <property name="jobdetails"> <list> <ref bean="printcurrenttimejobs"/> </list> </property> <property name="taskexecutor" ref="executor"/> </bean> <!-- 配置job详情 --> <bean name="printcurrenttimejobs" class="org.springframework.scheduling.quartz.jobdetailfactorybean"> <property name="jobclass" value="com.aaron.clusterquartz.job.printcurrenttimejobs"/> <!--因为我使用了spring的注解,所以这里可以不用配置scheduler的属性--> <!--<property name="jobdataasmap"> <map> <entry key="clusterquartz" value="com.aaron.framework.clusterquartz.job.clusterquartz"/> </map> </property>--> <property name="durability" value="true"/> <property name="requestsrecovery" value="false"/> </bean> <!-- 配置触发时间 --> <bean name="printcurrenttimescheduler" class="com.aaron.clusterquartz.cron.persistablecrontriggerfactorybean"> <property name="jobdetail" ref="printcurrenttimejobs"/> <property name="cronexpression"> <value>0/10 * * * * ?</value> </property> <property name="timezone"> <value>gmt+8:00</value> </property> </bean> <!-- 分布式事务配置 end --> </beans>
quartz属性文件
#============================================================================ # configure jobstore # using spring datasource in quartzjobsconfig.xml # spring uses localdatasourcejobstore extension of jobstorecmt #============================================================================ org.quartz.jobstore.useproperties=true org.quartz.jobstore.tableprefix = qrtz_ org.quartz.jobstore.isclustered = true org.quartz.jobstore.clustercheckininterval = 5000 org.quartz.jobstore.misfirethreshold = 60000 org.quartz.jobstore.txisolationlevelreadcommitted = true # change this to match your db vendor org.quartz.jobstore.class = org.quartz.impl.jdbcjobstore.jobstoretx org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate #============================================================================ # configure main scheduler properties # needed to manage cluster instances #============================================================================ org.quartz.scheduler.instanceid=auto org.quartz.scheduler.instancename=my_clustered_job_scheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false #============================================================================ # configure threadpool #============================================================================ org.quartz.threadpool.class = org.quartz.simpl.simplethreadpool org.quartz.threadpool.threadcount = 10 org.quartz.threadpool.threadpriority = 5 org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true
相关类说明
autowiringspringbeanjobfactory类是为了可以在scheduler中使用spring注解,如果不使用注解,可以不适用该类,而直接使用
springbeanjobfactory
package com.aaron.clusterquartz.autowired; import org.quartz.spi.triggerfiredbundle; import org.springframework.beans.beansexception; import org.springframework.beans.factory.config.autowirecapablebeanfactory; import org.springframework.context.applicationcontext; import org.springframework.context.applicationcontextaware; import org.springframework.scheduling.quartz.springbeanjobfactory; /** * @author * @description 使job类支持spring的自动注入 * @date 2016-05-27 */ public class autowiringspringbeanjobfactory extends springbeanjobfactory implements applicationcontextaware { private transient autowirecapablebeanfactory beanfactory; public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { beanfactory = applicationcontext.getautowirecapablebeanfactory(); } @override protected object createjobinstance(triggerfiredbundle bundle) throws exception { object job = super.createjobinstance(bundle); beanfactory.autowirebean(job); return job; } }
package com.aaron.clusterquartz.job; import com.arron.util.dateutils; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.quartz.jobexecutioncontext; import org.quartz.jobexecutionexception; import org.springframework.beans.factory.annotation.autowired; import org.springframework.scheduling.quartz.quartzjobbean; import java.util.date; /** * @author * @description 一句话描述该文件的用途 * @date 2016-05-23 */ public class printcurrenttimejobs extends quartzjobbean { private static final log log_record = logfactory.getlog(printcurrenttimejobs.class); //这里就是因为有上文中的autowiringspringbeanjobfactory才可以使用@autowired注解,否则只能在配置文件中设置这属性的值 @autowired private clusterquartz clusterquartz; protected void executeinternal(jobexecutioncontext jobexecutioncontext) throws jobexecutionexception { log_record.info("begin to execute task," + dateutils.datetostring(new date())); clusterquartz.printuserinfo(); log_record.info("end to execute task," + dateutils.datetostring(new date())); } }
测试结果:
由于只有一台电脑,所有我开了8080和8888两个端口来测试的,上面的定时任务我设置了每10秒运行一次。
当只我启动8080端口时,可以看到控制台每隔10秒打印一条语句
两个端口同时启动的对比测试中可以看到,只有一个端口在跑定时任务
这个关了正在跑定时任务的端口后,之前的另一个没有跑的端口开始接管,继续运行定时任务
至此,我们可以清楚地看到,在分布式定时任务中(或者集群),同一时刻只会有一个定时任务运行。
整个demo地址:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。