.NET Core下开源任务调度框架Hangfire的Api任务拓展(支持秒级任务)
hangfire的拓展和使用
看了很多博客,小白第一次写博客。
最近由于之前的任务调度框架总出现问题,因此想寻找一个替代品,之前使用的是quartz.net,这个框架方便之处就是支持cron表达式适合复杂日期场景使用,以及秒级任务。但是配置比较复杂,而且管理不方便,自己开发了个web管理页面,不过这个需要额外的单独线程去统一管理工作状态,很容易出现问题。
有考虑过 “fluentscheduler” ,使用简单,但是管理配置也很麻烦,我希望能做到配置简单,管理方便,高性能。最后想到了以前听过的hangfire,它的好处就是自带控制面板,在园子里看了很多相关资料,偶然发现了有人拓展过hangfire通过调用api接口来执行任务,这种方式可以避免依赖本地代码,方便部署,在此基础上,我用空闲时间拓展了一下现在已经基本可以满足需求。
所拓展的功能全部属于外部拓展,因此hangfire版本可以一直更新,现在已经更新最新版,支持秒级任务
由于更新到最新版hangfire 1.7支持秒级任务,使用的在线表达式生成部分表达式有问题,注掉了秒级任务表达式生成,有时间需要详细测试更改,可以参考(hangfire官方提供的表达式)
现在已经实现的功能有:
1,部署及调试:只需要配置数据库连接,然后编译即可运行,无需建表,支持(redis,mysql, sqlserver)其他数据库暂时用不到没测试。推荐使用redis集群。项目中直接添加了redis的存储包,已经更新stackexchange.redis到最新版本方便拓展,调试时可以直接调试。部署,只需要发布项目,运行创建windows服务的bat命令,命令已经包含在项目中,或者发布至linux。
2,周期任务:支持在控制面板页面上添加周期任务,编辑周期任务,删除周期任务,手动触发周期任务,暂停和继续周期任务(暂停实现的原理是通过set中添加属性,在job执行前,过滤掉,直接跳过执行,因为hangfire中job一旦创建就失去了控制权,只能通过过滤器去拦截),任务暂停后会查询状态并渲染面板列表为红色字体方便查找哪个任务被暂停。
3,计划任务:在作业选项卡中,计划作业中可以实现添加计划任务,计划任务可以使任务在指定的分钟后执行,只执行一次。
4,只读面板:通过配置的用户名密码,使用户只具有读取面板的权限,这样可以防止误操作
1 //只读面板,只能读取不能操作 2 app.usehangfiredashboard("/job-read", new dashboardoptions 3 { 4 apppath = "#",//返回时跳转的地址 5 displaystorageconnectionstring = false,//是否显示数据库连接信息 6 isreadonlyfunc = context => 7 { 8 return true; 9 }, 10 authorization = new[] { new basicauthauthorizationfilter(new basicauthauthorizationfilteroptions 11 { 12 requiressl = false,//是否启用ssl验证,即https 13 sslredirect = false, 14 logincasesensitive = true, 15 users = new [] 16 { 17 new basicauthauthorizationuser 18 { 19 login = "read", 20 passwordclear = "only" 21 }, 22 new basicauthauthorizationuser 23 { 24 login = "test", 25 passwordclear = "123456" 26 }, 27 new basicauthauthorizationuser 28 { 29 login = "guest", 30 passwordclear = "123@123" 31 } 32 } 33 }) 34 } 35 });
5,邮件推送:目前使用的方式是,任务错误重试达到指定次数后,发送邮件通知,使用的mailkit
1 catch (exception ex) 2 { 3 //获取重试次数 4 var count = context.getjobparameter<string>("retrycount"); 5 context.settextcolor(consoletextcolor.red); 6 //signalr推送 7 //sendrequest(configsettings.instance.url+"/api/publish/everyone", "测试"); 8 if (count == "3")//重试达到三次的时候发邮件通知 9 { 10 sendemail(item.jobname, item.url, ex.tostring()); 11 } 12 logger.error(ex, "httpjob.excute"); 13 context.writeline($"执行出错:{ex.message}"); 14 throw;//不抛异常不会执行重试操作 15 }
1 /// <summary> 2 /// 邮件模板 3 /// </summary> 4 /// <param name="jobname"></param> 5 /// <param name="url"></param> 6 /// <param name="exception"></param> 7 /// <returns></returns> 8 private static string sethtmlbody(string jobname, string url, string exception) 9 { 10 var htmlbody = $@"<h3 align='center'>{hangfirehttpjoboptions.smtpsubject}</h3> 11 <h3>执行时间:</h3> 12 <p> 13 {datetime.now} 14 </p> 15 <h3> 16 任务名称:<span> {jobname} </span><br/> 17 </h3> 18 <h3> 19 请求路径:{url} 20 </h3> 21 <h3><span></span> 22 执行结果:<br/> 23 </h3> 24 <p> 25 {exception} 26 </p> "; 27 return htmlbody; 28 }
1 //使用redis 2 config.useredisstorage(redis, new hangfire.redis.redisstorageoptions() 3 { 4 fetchtimeout=timespan.fromminutes(5), 5 prefix = "{hangfire}:", 6 //活动服务器超时时间 7 invisibilitytimeout = timespan.fromhours(1), 8 //任务过期检查频率 9 expirycheckinterval = timespan.fromhours(1), 10 deletedlistsize = 10000, 11 succeededlistsize = 10000 12 }) 13 .usehangfirehttpjob(new hangfirehttpjoboptions() 14 { 15 sendtomaillist = hangfiresettings.instance.sendmaillist, 16 sendmailaddress = hangfiresettings.instance.sendmailaddress, 17 smtpserveraddress = hangfiresettings.instance.smtpserveraddress, 18 smtpport = hangfiresettings.instance.smtpport, 19 smtppwd = hangfiresettings.instance.smtppwd, 20 smtpsubject = hangfiresettings.instance.smtpsubject 21 })
6,signalr 推送:宿主程序使用的weapi,因此可以通过webapi推送,这样做的好处是可以将服务当作推送服务使用,第三方接口也可以利用此来推送,
1 /// <summary> 2 ///用户加入组处理 3 /// </summary> 4 /// <param name="userid">用户唯一标识</param> 5 /// <param name="groupname">组名称</param> 6 /// <returns></returns> 7 public task initusers(string userid,string groupname) 8 { 9 console.writeline($"{userid}加入用户组"); 10 groups.addtogroupasync(context.connectionid, groupname); 11 signalrgroups.usergroups.add(new signalrgroups() 12 { 13 connectionid = context.connectionid, 14 groupname = groupname, 15 userid = userid 16 }); 17 return clients.all.sendasync("userjoin", "用户组数据更新,新增id为:" + context.connectionid + " pid:" + userid); 18 } 19 /// <summary> 20 /// 断线的时候处理 21 /// </summary> 22 /// <param name="exception"></param> 23 /// <returns></returns> 24 public override task ondisconnectedasync(exception exception) 25 { 26 //掉线移除用户,不给其推送 27 var user = signalrgroups.usergroups.firstordefault(c => c.connectionid == context.connectionid); 28 29 if (user != null) 30 { 31 console.writeline($"用户:{user.userid}已离线"); 32 signalrgroups.usergroups.remove(user); 33 groups.removefromgroupasync(context.connectionid, user.groupname); 34 } 35 return base.ondisconnectedasync(exception); 36 }
1 /// <summary> 2 /// 单个connectionid推送 3 /// </summary> 4 /// <param name="groups"></param> 5 /// <returns></returns> 6 [httppost, route("anyone")] 7 public iactionresult anyone([frombody]ienumerable<signalrgroups> groups) 8 { 9 if (groups != null && groups.any()) 10 { 11 var ids = groups.select(c => c.userid); 12 var list = signalrgroups.usergroups.where(c => ids.contains(c.userid)); 13 foreach (var item in list) 14 hubcontext.clients.client(item.connectionid).sendasync("anyone", $"{item.connectionid}: {item.content}"); 15 } 16 return ok(); 17 } 18 19 /// <summary> 20 /// 全部推送 21 /// </summary> 22 /// <param name="message"></param> 23 /// <returns></returns> 24 [httppost, route("everyone")] 25 public iactionresult everyone([frombody] msg body) 26 { 27 var data = httpcontext.response.body; 28 hubcontext.clients.all.sendasync("everyone", $"{body.message}"); 29 return ok(); 30 } 31 32 /// <summary> 33 /// 单个组推送 34 /// </summary> 35 /// <param name="group"></param> 36 /// <returns></returns> 37 [httppost, route("anygroups")] 38 public iactionresult anygroups([frombody]signalrgroups group) 39 { 40 if (group != null) 41 { 42 hubcontext.clients.group(group.groupname).sendasync("anygroups", $"{group.content}"); 43 } 44 return ok(); 45 }
7,接口健康检查:因为主要用来调用api接口,因此集成接口健康检查还是很有必要的,目前使用的方式是配置文件中添加需要检查的地址
1 /*健康检查配置项*/ 2 "healthchecks-ui": { 3 /*检查地址,可以配置当前程序和外部程序*/ 4 "healthchecks": [ 5 { 6 "name": "hangfire api 健康检查", 7 "uri": "http://localhost:9006/healthz" 8 } 9 ], 10 /*需要检查的api地址*/ 11 "checkurls": [ 12 { 13 "uri": "http://localhost:17600/cityservice.svc/healthycheck", 14 "httpmethod": "get" 15 }, 16 { 17 "uri": "http://localhost:9098/checkhelath", 18 "httpmethod": "post" 19 }, 20 { 21 "uri": "http://localhost:9067/grthelathcheck", 22 "httpmethod": "get" 23 }, 24 { 25 "uri": "http://localhost:9043/grthelathcheck", 26 "httpmethod": "get" 27 } 28 ], 29 "webhooks": [], //钩子配置 30 "evaluationtimeonseconds": 10, //检测频率 31 "minimumsecondsbetweenfailurenotifications": 60, //推送间隔时间 32 "healthcheckdatabaseconnectionstring": "data source=\\healthchecksdb" //-> sqlite库存储检查配置及日志信息 33 }
后台会根据配置的指定间隔去检查服务接口是否可以正常访问,(这个中间件可以实现很多检查功能,包括网络,数据库,mq等,支持webhook推送等丰富功能,系统用不到因此没有添加)
健康检查的配置
1 //添加健康检查地址 2 hangfiresettings.instance.hostservers.foreach(s => 3 { 4 services.addhealthchecks().addurlgroup(new uri(s.uri), s.httpmethod.tolower() == "post" ? httpmethod.post : httpmethod.get, $"{s.uri}"); 5 });
1 app.usehealthchecks("/healthz", new healthcheckoptions() 2 { 3 predicate = _ => true, 4 responsewriter = uiresponsewriter.writehealthcheckuiresponse 5 }); 6 app.usehealthchecks("/health", options);//获取自定义格式的json数据 7 app.usehealthchecksui(setup => 8 { 9 setup.uipath = "/hc"; // 健康检查的ui面板地址 10 setup.apipath = "/hc-api"; // 用于api获取json的检查数据 11 });
其中,ui配置路径是在面板中展示检查结果需要使用的地址
api地址,可以通过接口的方式来调用检查结果,方便在第三方系统中使用,其数据格式可以自定义
通过接口调用
1 [{ 2 "id": 1, 3 "status": "unhealthy", 4 "onstatefrom": "2019-04-07t18:00:09.6996751+08:00", 5 "lastexecuted": "2019-04-07t18:05:03.4761739+08:00", 6 "uri": "http://localhost:53583/healthz", 7 "name": "hangfire api 健康检查", 8 "discoveryservice": null, 9 "entries": [{ 10 "id": 1, 11 "name": "http://localhost:17600/cityservice.svc/healthycheck", 12 "status": "unhealthy", 13 "description": "an error occurred while sending the request.", 14 "duration": "00:00:04.3907375" 15 }, { 16 "id": 2, 17 "name": "http://localhost:9098/checkhelath", 18 "status": "unhealthy", 19 "description": "an error occurred while sending the request.", 20 "duration": "00:00:04.4140310" 21 }, { 22 "id": 3, 23 "name": "http://localhost:9067/grthelathcheck", 24 "status": "unhealthy", 25 "description": "an error occurred while sending the request.", 26 "duration": "00:00:04.3847367" 27 }, { 28 "id": 4, 29 "name": "http://localhost:9043/grthelathcheck", 30 "status": "unhealthy", 31 "description": "an error occurred while sending the request.", 32 "duration": "00:00:04.4499007" 33 }], 34 "history": [] 35 }]
1 { 2 "status": "unhealthy", 3 "errors": [{ 4 "key": "http://localhost:17600/cityservice.svc/healthycheck", 5 "value": "unhealthy" 6 }, { 7 "key": "http://localhost:9098/checkhelath", 8 "value": "unhealthy" 9 }, { 10 "key": "http://localhost:9067/grthelathcheck", 11 "value": "unhealthy" 12 }, { 13 "key": "http://localhost:9043/grthelathcheck", 14 "value": "unhealthy" 15 }] 16 }
1 //重写json报告数据,可用于远程调用获取健康检查结果 2 var options = new healthcheckoptions 3 { 4 responsewriter = async (c, r) => 5 { 6 c.response.contenttype = "application/json"; 7 8 var result = jsonconvert.serializeobject(new 9 { 10 status = r.status.tostring(), 11 errors = r.entries.select(e => new { key = e.key, value = e.value.status.tostring() }) 12 }); 13 await c.response.writeasync(result); 14 } 15 };
8,通过接口添加任务:添加编辑周期任务,添加计划任务,触发周期任务,删除周期任务,多个任务连续一次执行的任务
1 /// <summary> 2 /// 添加一个队列任务立即被执行 3 /// </summary> 4 /// <param name="httpjob"></param> 5 /// <returns></returns> 6 [httppost, route("addbackgroundjob")] 7 public jsonresult addbackgroundjob([frombody] hangfire.httpjob.server.httpjobitem httpjob) 8 { 9 var addreslut = string.empty; 10 try 11 { 12 addreslut = backgroundjob.enqueue(() => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null)); 13 } 14 catch (exception ec) 15 { 16 return json(new message() { code = false, errormessage = ec.tostring() }); 17 } 18 return json(new message() { code = true, errormessage = "" }); 19 } 20 21 /// <summary> 22 /// 添加一个周期任务 23 /// </summary> 24 /// <param name="httpjob"></param> 25 /// <returns></returns> 26 [httppost, route("addorupdaterecurringjob")] 27 public jsonresult addorupdaterecurringjob([frombody] hangfire.httpjob.server.httpjobitem httpjob) 28 { 29 try 30 { 31 recurringjob.addorupdate(httpjob.jobname, () => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null), httpjob.corn, timezoneinfo.local); 32 } 33 catch (exception ec) 34 { 35 return json(new message() { code = false, errormessage = ec.tostring() }); 36 } 37 return json(new message() { code = true, errormessage = "" }); 38 } 39 40 /// <summary> 41 /// 删除一个周期任务 42 /// </summary> 43 /// <param name="jobname"></param> 44 /// <returns></returns> 45 [httpget,route("deletejob")] 46 public jsonresult deletejob(string jobname) 47 { 48 try 49 { 50 recurringjob.removeifexists(jobname); 51 } 52 catch (exception ec) 53 { 54 return json(new message() { code = false, errormessage = ec.tostring() }); 55 } 56 return json(new message() { code = true, errormessage = "" }); 57 } 58 /// <summary> 59 /// 手动触发一个任务 60 /// </summary> 61 /// <param name="jobname"></param> 62 /// <returns></returns> 63 [httpget, route("triggerrecurringjob")] 64 public jsonresult triggerrecurringjob(string jobname) 65 { 66 try 67 { 68 recurringjob.trigger(jobname); 69 } 70 catch (exception ec) 71 { 72 return json(new message() { code = false, errormessage = ec.tostring() }); 73 } 74 return json(new message() { code = true, errormessage = "" }); 75 } 76 /// <summary> 77 /// 添加一个延迟任务 78 /// </summary> 79 /// <param name="httpjob">httpjob.delayfromminutes(延迟多少分钟执行)</param> 80 /// <returns></returns> 81 [httppost, route("addschedulejob")] 82 public jsonresult addschedulejob([frombody] hangfire.httpjob.server.httpjobitem httpjob) 83 { 84 var reslut = string.empty; 85 try 86 { 87 reslut = backgroundjob.schedule(() => hangfire.httpjob.server.httpjob.excute(httpjob, httpjob.jobname, null), timespan.fromminutes(httpjob.delayfromminutes)); 88 } 89 catch (exception ec) 90 { 91 return json(new message() { code = false, errormessage = ec.tostring() }); 92 } 93 return json(new message() { code = true, errormessage = "" }); 94 } 95 /// <summary> 96 /// 添加连续任务,多个任务依次执行,只执行一次 97 /// </summary> 98 /// <param name="httpjob"></param> 99 /// <returns></returns> 100 [httppost, route("addcontinuejob")] 101 public jsonresult addcontinuejob([frombody] list<hangfire.httpjob.server.httpjobitem> httpjobitems) 102 { 103 var reslut = string.empty; 104 var jobid = string.empty; 105 try 106 { 107 httpjobitems.foreach(k => 108 { 109 if (!string.isnullorempty(jobid)) 110 { 111 jobid = backgroundjob.continuejobwith(jobid, () => runcontinuejob(k)); 112 } 113 else 114 { 115 jobid = backgroundjob.enqueue(() => hangfire.httpjob.server.httpjob.excute(k, k.jobname, null)); 116 } 117 }); 118 reslut = "true"; 119 } 120 catch (exception ec) 121 { 122 return json(new message() { code = false, errormessage = ec.tostring() }); 123 } 124 return json(new message() { code = true, errormessage = "" }); 125 }
这样做的好处是有效利用了宿主的webapi,而且无需登录控制面板操作就能实现任务管理,方便集成管理到其他系统中
防止多个实例的任务并行执行,即一个任务未执行完成,另一个相同的任务开始执行,可以使用分布式锁来解决
通过特性来添加任务重试时间间隔(hangfire 1.7 新增,单位/秒),重试次数,队列名称,任务名称,以及分布式锁超时时间
1 /// <summary> 2 /// 执行任务,delaysinseconds(重试时间间隔/单位秒) 3 /// </summary> 4 /// <param name="item"></param> 5 /// <param name="jobname"></param> 6 /// <param name="context"></param> 7 [automaticretry(attempts = 3, delaysinseconds = new[] { 30, 60, 90 }, logevents = true, onattemptsexceeded = attemptsexceededaction.fail)] 8 [displayname("api任务:{1}")] 9 [queue("apis")] 10 [jobfilter(timeoutinseconds: 3600)]
1 //设置分布式锁,分布式锁会阻止两个相同的任务并发执行,用任务名称和方法名称作为锁 2 var jobresource = $"{filtercontext.backgroundjob.job.args[1]}.{filtercontext.backgroundjob.job.method.name}"; 3 var locktimeout = timespan.fromseconds(_timeoutinseconds); 4 try 5 { 6 //判断任务是否被暂停 7 using (var connection = jobstorage.current.getconnection()) 8 { 9 var conts = connection.getallitemsfromset($"jobpauseof:{filtercontext.backgroundjob.job.args[1]}"); 10 if (conts.contains("true")) 11 { 12 filtercontext.canceled = true;//任务被暂停不执行直接跳过 13 return; 14 } 15 } 16 //申请分布式锁 17 var distributedlock = filtercontext.connection.acquiredistributedlock(jobresource, locktimeout); 18 filtercontext.items["distributedlock"] = distributedlock; 19 } 20 catch (exception ec) 21 { 22 //获取锁超时,取消任务,任务会默认置为成功 23 filtercontext.canceled = true; 24 logger.info($"任务{filtercontext.backgroundjob.job.args[1]}超时,任务id{filtercontext.backgroundjob.id}"); 25 }
1 if (!filtercontext.items.containskey("distributedlock")) 2 { 3 throw new invalidoperationexception("找不到分布式锁,没有为该任务申请分布式锁."); 4 } 5 //释放分布式锁 6 var distributedlock = (idisposable)filtercontext.items["distributedlock"]; 7 distributedlock.dispose();
通过过滤器来设置任务过期时间,过期后自动在数据库删除历史记录
1 public void onstateapplied(applystatecontext context, iwriteonlytransaction transaction) 2 { 3 //设置过期时间,任务将在三天后过期,过期的任务会自动被扫描并删除 4 context.jobexpirationtimeout = timespan.fromdays(3); 5 }
redis集群下,测试秒级任务
集群为windws环境下,一个主节点四个从节点,(使用时需要在redis连接中配置全部集群连接,主节点和从节点),目前用不到linux环境,没有进行测试。