spring boot + quartz集群搭建的完整步骤
程序员文章站
2023-12-15 18:57:52
quartz集群能力:
quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,水平集群最大的问题就是时钟同步问题,
quart...
quartz集群能力:
- quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,水平集群最大的问题就是时钟同步问题,
quartz集群强烈要求时钟同步,若时钟不能同步,则会导致集群中各个节点状态紊乱,造成不可预知的后果,请自行搜索服务器时钟同步,
若能保证时钟同步,水平集群能保证服务的可靠性,其中一个节点挂掉或其中一个服务器宕机,其他节点依然正常服务;垂直集群则是集群各节点部署在同一台服务器,
时钟同步自然不是问题,但存在单点故障问题,服务器宕机会严重影响服务的可用性。因此,要结合实际情况来考虑集群方案 - 由于集群中强烈要求时钟同步,因此不管是垂直集群还是水平集群,本地开发决不能连接线上环境(本地也是集群模式),这样的话势必会破坏集群,但本地若是非集群模式,
则可以依情况来连接线上环境。 - quartz集群和redis这样的集群实现方式不一样,redis集群需要节点之间通信,各节点需要知道其他节点的状况,而quartz集群的实现
方式在于11张表,集群节点相互之间不通信,而是通过定时任务持久化加锁的方式来实现集群。 - 破坏集群后果一般是死锁或者状态紊乱每个节点都不可用或其中某些节点能用部分或全部的定时任务
以上是个人的一些见解,下面开始本文的正文,本文主要介绍了关于spring boot + quartz集群搭建的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。
步骤如下:
spring boot bean配置:
@configuration public class quartzconfig { @value("${quartz.scheduler.instancename}") private string quartzinstancename; @value("${org.quartz.datasource.myds.driver}") private string mydsdriver; @value("${org.quartz.datasource.myds.url}") private string mydsurl; @value("${org.quartz.datasource.myds.user}") private string mydsuser; @value("${org.quartz.datasource.myds.password}") private string mydspassword; @value("${org.quartz.datasource.myds.maxconnections}") private string mydsmaxconnections; /** * 设置属性 * @return * @throws ioexception */ private properties quartzproperties() throws ioexception { properties prop = new properties(); prop.put("quartz.scheduler.instancename", quartzinstancename); prop.put("org.quartz.scheduler.instanceid", "auto"); prop.put("org.quartz.scheduler.skipupdatecheck", "true"); prop.put("org.quartz.scheduler.jmx.export", "true"); prop.put("org.quartz.jobstore.class", "org.quartz.impl.jdbcjobstore.jobstoretx"); prop.put("org.quartz.jobstore.driverdelegateclass", "org.quartz.impl.jdbcjobstore.stdjdbcdelegate"); prop.put("org.quartz.jobstore.datasource", "quartzdatasource"); prop.put("org.quartz.jobstore.tableprefix", "qrtz_"); prop.put("org.quartz.jobstore.isclustered", "true"); prop.put("org.quartz.jobstore.clustercheckininterval", "20000"); prop.put("org.quartz.jobstore.datasource", "myds"); prop.put("org.quartz.jobstore.maxmisfirestohandleatatime", "1"); prop.put("org.quartz.jobstore.misfirethreshold", "120000"); prop.put("org.quartz.jobstore.txisolationlevelserializable", "true"); prop.put("org.quartz.jobstore.selectwithlocksql", "select * from {0}locks where lock_name = ? for update"); prop.put("org.quartz.threadpool.class", "org.quartz.simpl.simplethreadpool"); prop.put("org.quartz.threadpool.threadcount", "10"); prop.put("org.quartz.threadpool.threadpriority", "5"); prop.put("org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread", "true"); prop.put("org.quartz.datasource.myds.driver", mydsdriver); prop.put("org.quartz.datasource.myds.url", mydsurl); prop.put("org.quartz.datasource.myds.user", mydsuser); prop.put("org.quartz.datasource.myds.password", mydspassword); system.out.println("mydsmaxconnections:" + mydsmaxconnections); prop.put("org.quartz.datasource.myds.maxconnections", mydsmaxconnections); prop.put("org.quartz.plugin.trigghistory.class", "org.quartz.plugins.history.loggingjobhistoryplugin"); prop.put("org.quartz.plugin.shutdownhook.class", "org.quartz.plugins.management.shutdownhookplugin"); prop.put("org.quartz.plugin.shutdownhook.cleanshutdown", "true"); return prop; } @bean public schedulerfactorybean schedulerfactorybean(@qualifier("dialogjobtrigger") trigger cronjobtrigger) throws ioexception { schedulerfactorybean factory = new schedulerfactorybean(); // this allows to update triggers in db when updating settings in config file: //用于quartz集群,quartzscheduler 启动时更新己存在的job,这样就不用每次修改targetobject后删除qrtz_job_details表对应记录了 factory.setoverwriteexistingjobs(true); //用于quartz集群,加载quartz数据源 //factory.setdatasource(datasource); //quartzscheduler 延时启动,应用启动完10秒后 quartzscheduler 再启动 factory.setstartupdelay(10); //用于quartz集群,加载quartz数据源配置 factory.setquartzproperties(quartzproperties()); factory.setautostartup(true); factory.setapplicationcontextschedulercontextkey("applicationcontext"); //注册触发器 factory.settriggers(cronjobtrigger);<br data-filtered="filtered"> //直接使用配置文件 // factory.setconfiglocation(new filesystemresource(this.getclass().getresource("/quartz.properties").getpath())); return factory; } /** * 加载job * @return */ @bean public jobdetailfactorybean updatedialogstatusjobdetail() { return createjobdetail(invokingjobdetaildetailfactory.class, "updatedialogstatusgroup", "dialogjob"); } /** * 加载触发器 * @param jobdetail * @return */ @bean(name = "dialogjobtrigger") public crontriggerfactorybean dialogstatusjobtrigger(@qualifier("updatedialogstatusjobdetail") jobdetail jobdetail) { return dialogstatustrigger(jobdetail, "0 0 0/1 * * ?"); } /** * 创建job工厂 * @param jobclass * @param groupname * @param targetobject * @return */ private static jobdetailfactorybean createjobdetail(class<?> jobclass, string groupname, string targetobject) { jobdetailfactorybean factorybean = new jobdetailfactorybean(); factorybean.setjobclass(jobclass); factorybean.setdurability(true); factorybean.setrequestsrecovery(true); factorybean.setgroup(groupname); map<string, string> map = new hashmap<>(); map.put("targetobject", targetobject); map.put("targetmethod", "execute"); factorybean.setjobdataasmap(map); return factorybean; } /** * 创建触发器工厂 * @param jobdetail * @param cronexpression * @return */ private static crontriggerfactorybean dialogstatustrigger(jobdetail jobdetail, string cronexpression) { crontriggerfactorybean factorybean = new crontriggerfactorybean(); factorybean.setjobdetail(jobdetail); factorybean.setcronexpression (cronexpression); return factorybean; } }
invokingjobdetaildetailfactory对象:
public class invokingjobdetaildetailfactory extends quartzjobbean{ // 计划任务所在类 private string targetobject; // 具体需要执行的计划任务 private string targetmethod; private applicationcontext ctx; @override protected void executeinternal(jobexecutioncontext context) throws jobexecutionexception { try { object otargetobject = ctx.getbean(targetobject); method m = null; try { m = otargetobject.getclass().getmethod(targetmethod); m.invoke(otargetobject); } catch (securityexception e) { e.printstacktrace(); } catch (nosuchmethodexception e) { e.printstacktrace(); } } catch (exception e) { throw new jobexecutionexception(e); } } public void setapplicationcontext(applicationcontext applicationcontext) { this.ctx = applicationcontext; } public void settargetobject(string targetobject) { this.targetobject = targetobject; } public void settargetmethod(string targetmethod) { this.targetmethod = targetmethod; } }
备注:set方法不能少,setapplicationcontext中的applicationcontext与factory.setapplicationcontextschedulercontextkey("applicationcontext")填入的值有关,其原理由invokingjobdetaildetailfactory父类中的beanwrapper实现。
sql脚本:--
<em id="__mcedel">-- 表的结构 `qrtz_blob_triggers` -- create table if not exists `qrtz_blob_triggers` ( `sched_name` varchar(120) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `blob_data` blob ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_calendars` -- create table if not exists `qrtz_calendars` ( `sched_name` varchar(120) not null, `calendar_name` varchar(120) not null, `calendar` blob not null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_cron_triggers` -- create table if not exists `qrtz_cron_triggers` ( `sched_name` varchar(120) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `cron_expression` varchar(120) not null, `time_zone_id` varchar(80) default null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_fired_triggers` -- create table if not exists `qrtz_fired_triggers` ( `sched_name` varchar(120) not null, `entry_id` varchar(95) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `instance_name` varchar(120) not null, `fired_time` bigint(13) not null, `sched_time` bigint(13) not null, `priority` int(11) not null, `state` varchar(16) not null, `job_name` varchar(120) default null, `job_group` varchar(120) default null, `is_nonconcurrent` varchar(1) default null, `requests_recovery` varchar(1) default null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_job_details` -- create table if not exists `qrtz_job_details` ( `sched_name` varchar(120) not null, `job_name` varchar(120) not null, `job_group` varchar(120) not null, `description` varchar(250) default null, `job_class_name` varchar(250) not null, `is_durable` varchar(1) not null, `is_nonconcurrent` varchar(1) not null, `is_update_data` varchar(1) not null, `requests_recovery` varchar(1) not null, `job_data` blob ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_locks` -- create table if not exists `qrtz_locks` ( `sched_name` varchar(120) not null, `lock_name` varchar(40) not null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_paused_trigger_grps` -- create table if not exists `qrtz_paused_trigger_grps` ( `sched_name` varchar(120) not null, `trigger_group` varchar(120) not null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_scheduler_state` -- create table if not exists `qrtz_scheduler_state` ( `sched_name` varchar(120) not null, `instance_name` varchar(120) not null, `last_checkin_time` bigint(13) not null, `checkin_interval` bigint(13) not null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_simple_triggers` -- create table if not exists `qrtz_simple_triggers` ( `sched_name` varchar(120) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `repeat_count` bigint(7) not null, `repeat_interval` bigint(12) not null, `times_triggered` bigint(10) not null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_simprop_triggers` -- create table if not exists `qrtz_simprop_triggers` ( `sched_name` varchar(120) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `str_prop_1` varchar(512) default null, `str_prop_2` varchar(512) default null, `str_prop_3` varchar(512) default null, `int_prop_1` int(11) default null, `int_prop_2` int(11) default null, `long_prop_1` bigint(20) default null, `long_prop_2` bigint(20) default null, `dec_prop_1` decimal(13,4) default null, `dec_prop_2` decimal(13,4) default null, `bool_prop_1` varchar(1) default null, `bool_prop_2` varchar(1) default null ) engine=innodb default charset=utf8mb4; -- -------------------------------------------------------- -- -- 表的结构 `qrtz_triggers` -- create table if not exists `qrtz_triggers` ( `sched_name` varchar(120) not null, `trigger_name` varchar(120) not null, `trigger_group` varchar(120) not null, `job_name` varchar(120) not null, `job_group` varchar(120) not null, `description` varchar(250) default null, `next_fire_time` bigint(13) default null, `prev_fire_time` bigint(13) default null, `priority` int(11) default null, `trigger_state` varchar(16) not null, `trigger_type` varchar(8) not null, `start_time` bigint(13) not null, `end_time` bigint(13) default null, `calendar_name` varchar(200) default null, `misfire_instr` smallint(2) default null, `job_data` blob ) engine=innodb default charset=utf8mb4; -- -- indexes for dumped tables -- -- -- indexes for table `qrtz_blob_triggers` -- alter table `qrtz_blob_triggers` add primary key (`sched_name`,`trigger_name`,`trigger_group`), add key `sched_name` (`sched_name`,`trigger_name`,`trigger_group`); -- -- indexes for table `qrtz_calendars` -- alter table `qrtz_calendars` add primary key (`sched_name`,`calendar_name`); -- -- indexes for table `qrtz_cron_triggers` -- alter table `qrtz_cron_triggers` add primary key (`sched_name`,`trigger_name`,`trigger_group`); -- -- indexes for table `qrtz_fired_triggers` -- alter table `qrtz_fired_triggers` add primary key (`sched_name`,`entry_id`), add key `idx_qrtz_ft_trig_inst_name` (`sched_name`,`instance_name`), add key `idx_qrtz_ft_inst_job_req_rcvry` (`sched_name`,`instance_name`,`requests_recovery`), add key `idx_qrtz_ft_j_g` (`sched_name`,`job_name`,`job_group`), add key `idx_qrtz_ft_jg` (`sched_name`,`job_group`), add key `idx_qrtz_ft_t_g` (`sched_name`,`trigger_name`,`trigger_group`), add key `idx_qrtz_ft_tg` (`sched_name`,`trigger_group`); -- -- indexes for table `qrtz_job_details` -- alter table `qrtz_job_details` add primary key (`sched_name`,`job_name`,`job_group`), add key `idx_qrtz_j_req_recovery` (`sched_name`,`requests_recovery`), add key `idx_qrtz_j_grp` (`sched_name`,`job_group`); -- -- indexes for table `qrtz_locks` -- alter table `qrtz_locks` add primary key (`sched_name`,`lock_name`); -- -- indexes for table `qrtz_paused_trigger_grps` -- alter table `qrtz_paused_trigger_grps` add primary key (`sched_name`,`trigger_group`); -- -- indexes for table `qrtz_scheduler_state` -- alter table `qrtz_scheduler_state` add primary key (`sched_name`,`instance_name`); -- -- indexes for table `qrtz_simple_triggers` -- alter table `qrtz_simple_triggers` add primary key (`sched_name`,`trigger_name`,`trigger_group`); -- -- indexes for table `qrtz_simprop_triggers` -- alter table `qrtz_simprop_triggers` add primary key (`sched_name`,`trigger_name`,`trigger_group`); -- -- indexes for table `qrtz_triggers` -- alter table `qrtz_triggers` add primary key (`sched_name`,`trigger_name`,`trigger_group`), add key `idx_qrtz_t_j` (`sched_name`,`job_name`,`job_group`), add key `idx_qrtz_t_jg` (`sched_name`,`job_group`), add key `idx_qrtz_t_c` (`sched_name`,`calendar_name`(191)), add key `idx_qrtz_t_g` (`sched_name`,`trigger_group`), add key `idx_qrtz_t_state` (`sched_name`,`trigger_state`), add key `idx_qrtz_t_n_state` (`sched_name`,`trigger_name`,`trigger_group`,`trigger_state`), add key `idx_qrtz_t_n_g_state` (`sched_name`,`trigger_group`,`trigger_state`), add key `idx_qrtz_t_next_fire_time` (`sched_name`,`next_fire_time`), add key `idx_qrtz_t_nft_st` (`sched_name`,`trigger_state`,`next_fire_time`), add key `idx_qrtz_t_nft_misfire` (`sched_name`,`misfire_instr`,`next_fire_time`), add key `idx_qrtz_t_nft_st_misfire` (`sched_name`,`misfire_instr`,`next_fire_time`,`trigger_state`), add key `idx_qrtz_t_nft_st_misfire_grp` (`sched_name`,`misfire_instr`,`next_fire_time`,`trigger_group`,`trigger_state`); -- -- 限制导出的表 -- -- -- 限制表 `qrtz_blob_triggers` -- alter table `qrtz_blob_triggers` add constraint `qrtz_blob_triggers_ibfk_1` foreign key (`sched_name`, `trigger_name`, `trigger_group`) references `qrtz_triggers` (`sched_name`, `trigger_name`, `trigger_group`); -- -- 限制表 `qrtz_cron_triggers` -- alter table `qrtz_cron_triggers` add constraint `qrtz_cron_triggers_ibfk_1` foreign key (`sched_name`, `trigger_name`, `trigger_group`) references `qrtz_triggers` (`sched_name`, `trigger_name`, `trigger_group`); -- -- 限制表 `qrtz_simple_triggers` -- alter table `qrtz_simple_triggers` add constraint `qrtz_simple_triggers_ibfk_1` foreign key (`sched_name`, `trigger_name`, `trigger_group`) references `qrtz_triggers` (`sched_name`, `trigger_name`, `trigger_group`); -- -- 限制表 `qrtz_simprop_triggers` -- alter table `qrtz_simprop_triggers` add constraint `qrtz_simprop_triggers_ibfk_1` foreign key (`sched_name`, `trigger_name`, `trigger_group`) references `qrtz_triggers` (`sched_name`, `trigger_name`, `trigger_group`); -- -- 限制表 `qrtz_triggers` -- alter table `qrtz_triggers` add constraint `qrtz_triggers_ibfk_1` foreign key (`sched_name`, `job_name`, `job_group`) references `qrtz_job_details` (`sched_name`, `job_name`, `job_group`);<br><br><br></em>
quartz集群实现原理,利用数据库记录job行为,并通过锁机制,使job在同一次中仅运行一次。
jobbean示例
//需要交由spring管理 @service("dialogjob") public class dialogjob { @autowired private questionservice questionservice; // 方法名在quartz定义 public void execute() throws exception{ //具体执行业务 questionservice.xxxxx(); } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
推荐阅读