HangFire循环作业中作业因执行时间太长未完成新作业开启导致重复数据的问题
程序员文章站
2022-05-13 23:46:06
解决方法:在执行的任务方法前加上Mutex特性即可,如果作业未完成,新作业开启的话,新作业会放入计划中的作业队列中,直到前面的作业完成。 必须使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作为数据库。 参考:https://github.com/Hangfire ......
解决方法:在执行的任务方法前加上mutex特性即可,如果作业未完成,新作业开启的话,新作业会放入计划中的作业队列中,直到前面的作业完成。
必须使用hangfire.pro.redis
和 hangfire.sqlserver 作为数据库。
参考:https://github.com/hangfireio/hangfire/issues/1053
[mutex("downloadvideo")] public async task downloadvideo() {
}
mutex特性代码如下:
using system; using system.collections.generic; using system.linq; using hangfire.common; using hangfire.states; using hangfire.storage; namespace hangfire.pro { /// <summary> /// represents a background job filter that helps to disable concurrent execution /// without causing worker to wait as in <see cref="hangfire.disableconcurrentexecutionattribute"/>. /// </summary> public class mutexattribute : jobfilterattribute, ielectstatefilter, iapplystatefilter { private static readonly timespan distributedlocktimeout = timespan.fromminutes(1); private readonly string _resource; public mutexattribute(string resource) { _resource = resource; retryinseconds = 15; } public int retryinseconds { get; set; } public int maxattempts { get; set; } public void onstateelection(electstatecontext context) { // we are intercepting transitions to the processed state, that is performed by // a worker just before processing a job. during the state election phase we can // change the target state to another one, causing a worker not to process the // backgorund job. if (context.candidatestate.name != processingstate.statename || context.backgroundjob.job == null) { return; } // this filter requires an extended set of storage operations. it's supported // by all the official storages, and many of the community-based ones. var storageconnection = context.connection as jobstorageconnection; if (storageconnection == null) { throw new notsupportedexception("this version of storage doesn't support extended methods. please try to update to the latest version."); } string blockedby; try { // distributed lock is needed here only to prevent a race condition, when another // worker picks up a background job with the same resource between get and set // operations. // there will be no race condition, when two or more workers pick up background job // with the same id, because state transitions are protected with distributed lock // themselves. using (acquiredistributedsetlock(context.connection, context.backgroundjob.job.args)) { // resource set contains a background job id that acquired a mutex for the resource. // we are getting only one element to see what background job blocked the invocation. var range = storageconnection.getrangefromset( getresourcekey(context.backgroundjob.job.args), 0, 0); blockedby = range.count > 0 ? range[0] : null; // we should permit an invocation only when the set is empty, or if current background // job is already owns a resource. this may happen, when the localtransaction succeeded, // but outer transaction was failed. if (blockedby == null || blockedby == context.backgroundjob.id) { // we need to commit the changes inside a distributed lock, otherwise it's // useless. so we create a local transaction instead of using the // context.transaction property. var localtransaction = context.connection.createwritetransaction(); // add the current background job identifier to a resource set. this means // that resource is owned by the current background job. identifier will be // removed only on failed state, or in one of final states (succeeded or // deleted). localtransaction.addtoset(getresourcekey(context.backgroundjob.job.args), context.backgroundjob.id); localtransaction.commit(); // invocation is permitted, and we did all the required things. return; } } } catch (distributedlocktimeoutexception) { // we weren't able to acquire a distributed lock within a specified window. this may // be caused by network delays, storage outages or abandoned locks in some storages. // since it is required to expire abandoned locks after some time, we can simply // postpone the invocation. context.candidatestate = new scheduledstate(timespan.fromseconds(retryinseconds)) { reason = "couldn't acquire a distributed lock for mutex: timeout exceeded" }; return; } // background job execution is blocked. we should change the target state either to // the scheduled or to the deleted one, depending on current retry attempt number. var currentattempt = context.getjobparameter<int>("mutexattempt") + 1; context.setjobparameter("mutexattempt", currentattempt); context.candidatestate = maxattempts == 0 || currentattempt <= maxattempts ? createscheduledstate(blockedby, currentattempt) : createdeletedstate(blockedby); } public void onstateapplied(applystatecontext context, iwriteonlytransaction transaction) { if (context.backgroundjob.job == null) return; if (context.oldstatename == processingstate.statename) { using (acquiredistributedsetlock(context.connection, context.backgroundjob.job.args)) { var localtransaction = context.connection.createwritetransaction(); localtransaction.removefromset(getresourcekey(context.backgroundjob.job.args), context.backgroundjob.id); localtransaction.commit(); } } } public void onstateunapplied(applystatecontext context, iwriteonlytransaction transaction) { } private static deletedstate createdeletedstate(string blockedby) { return new deletedstate { reason = $"execution was blocked by background job {blockedby}, all attempts exhausted" }; } private istate createscheduledstate(string blockedby, int currentattempt) { var reason = $"execution is blocked by background job {blockedby}, retry attempt: {currentattempt}"; if (maxattempts > 0) { reason += $"/{maxattempts}"; } return new scheduledstate(timespan.fromseconds(retryinseconds)) { reason = reason }; } private idisposable acquiredistributedsetlock(istorageconnection connection, ienumerable<object> args) { return connection.acquiredistributedlock(getdistributedlockkey(args), distributedlocktimeout); } private string getdistributedlockkey(ienumerable<object> args) { return $"extension:job-mutex:lock:{getkeyformat(args, _resource)}"; } private string getresourcekey(ienumerable<object> args) { return $"extension:job-mutex:set:{getkeyformat(args, _resource)}"; } private static string getkeyformat(ienumerable<object> args, string keyformat) { return string.format(keyformat, args.toarray()); } } }
上一篇: 使用oracle9的 odbc 连接oracle11
下一篇: c#的WebService和调用