Asp.net core利用MediatR进程内发布/订阅详解
1、背景
最近,一个工作了一个月的同事离职了,所做的东西怼了过来。一看代码,惨不忍睹,一个方法六七百行,啥也不说了吧,实在没法儿说。介绍下业务场景吧,一个公共操作a,业务中各个地方都会做a操作,正常人正常思维应该是把a操作提取出来封装,其他地方调用,可这哥们儿偏偏不这么干,代码到处复制。仔细分析了整个业务之后,发现是一个典型的事件/消息驱动型,或者叫发布/订阅型的业务逻辑。鉴于系统是单体的,所以想到利用进程内发布/订阅的解决方案。记得很久之前,做wpf时候,用过prism的eventaggregator(是不是暴露年龄了。。。),那玩意儿不知道现在还在不在,支不支持core,目前流行的是mediatr,跟core的集成也好,于是决定采用mediatr。
2.demo代码
startup服务注册:
public void configureservices(iservicecollection services) { services.addmvc().setcompatibilityversion(compatibilityversion.version_2_2); services.addscoped<iservice1, service1>(); services.addscoped<iservice2, service2>(); services.addscoped<icontext, context>(); services.addmediatr(typeof(someeventhandler).assembly); }
服务1:
public class service1 : iservice1 { private readonly ilogger _logger; private readonly imediator _mediator; private readonly icontext _context; private readonly iservice2 _service2; public service1(ilogger<service1> logger, imediator mediator, icontext context) { _logger = logger; _mediator = mediator; _context = context; //_service2 = service2; } public async task method() { _context.currentuser = "test"; //await _service2.method(); //_service2.method(); await _mediator.publish(new someevent()); //_mediator.publish(new someevent()); await task.completedtask; } }
可以看到,在服务1的method方法中,发布了someevent事件消息。
服务2代码:
public class service2 : iservice2 { private readonly ilogger _logger; private readonly icontext _context; public service2(ilogger<service2> logger, icontext context) { _logger = logger; _context = context; } public async task method() { _logger.logdebug("当前用户:{0}", _context.currentuser); await task.delay(5000); //_logger.logdebug("当前用户:{0}", _context.currentuser); _logger.logdebug("service2 method at :{0}", datetime.now); } }
解释下,为啥服务2 method方法中,要等待5秒,因为实际项目中,有这么一个操作,把一个压缩程序包传递到远端,然后在远端代码操作iis创建站点,这玩意儿非常耗时,大概要1分多钟,这里我用5s模拟,意思意思。这个5s至关重要,待会儿会详述。
再看事件订阅handler:
public class someeventhandler : inotificationhandler<someevent>, idisposable { private readonly ilogger _logger; private readonly iserviceprovider _serviceprovider; private readonly iservice2 _service2; public someeventhandler(ilogger<someeventhandler> logger, iserviceprovider serviceprovider, iservice2 service2) { _logger = logger; _serviceprovider = serviceprovider; _service2 = service2; } public void dispose() { _logger.logdebug("handler disposed at :{0}", datetime.now); } public async task handle(someevent notification, cancellationtoken cancellationtoken) { await _service2.method(); //using (var scope = _serviceprovider.createscope()) //{ // var service2 = scope.serviceprovider.getservice<iservice2>(); // await service2.method(); //} } }
然后,我们的入口action:
[httpget("test")] public async task<actionresult<string>> test() { stringbuilder sb = new stringbuilder(); sb.appendformat("开始时间:{0}", datetime.now); sb.appendline(); await _service1.method(); sb.appendformat("结束时间:{0}", datetime.now); sb.appendline(); return sb.tostring(); }
至此,demo要干的事情,脉络应该很清晰了:控制器接收http请求,然后调用service1的method,service1的method又发布消息,消息处理器接收到消息,调用service2的method完成后续操作。我们运行起来看下:
http请求开始到结束,耗时5s,看似没问题。我们看系统输出日志:
service2的method方法也确实被订阅执行了。
3.问题
上述一切的一切,看似没问题。运行成功没?成功了。对不对?好像也对。有没问题?大大的问题!http从开始到结束,要耗时5s,实际项目中,那是一分钟,这整整一分钟,你要前端挂起等待么一直?理论上,这种耗时的后端操作,合理做法是http迅速响应前端,并返给前端业务id,前端根据此业务id长轮询后端查询操作结果状态,直至此操作完成,决不能一直卡死的,否则交互效果不说,超过一定时间,http请求会直接超时的!这就必须动刀子了,将service2操作后台任务化且不等待。service1的method代码调整如下:
public async task method() { _context.currentuser = "test"; //await _service2.method(); //_service2.method(); //await _mediator.publish(new someevent()); _mediator.publish(new someevent()); await task.completedtask; }
见注释前后,改进地方只有一处,发布事件代码去掉了await,这样系统发布事件之后,便不会等待service2而是继续运行并立刻响应http请求。好,我们再来运行看下效果:
我们看到,系统立即响应了http请求(22:40:15),5s之后,service2才执行完成(22:40:20)。看似又没问题了。那是不是真的没问题呢?我们注意,service1和service2中,都注入了一个context上下文对象,这个对象是我用来模拟一些scope类型对象,例如dbcontext的,代码如下:
public class context : icontext, idisposable { private bool _isdisposed = false; private string _currentuser; public string currentuser { get { if (_isdisposed) { throw new exception("context disposed"); } return _currentuser; } set { if (_isdisposed) { throw new exception("context disposed"); } _currentuser = value; } } public void dispose() { _isdisposed = true; } }
里边就一个属性,当前上下文用户,并实现了dispose模式,并且当前上下文被释放时,对该上下文对象任何操作将引发异常。从上文的service1及service2截图中,我们看到了,两个服务均注入了这个context对象,service1设置,service2中获取。现在我们将service2的method方法稍作调整,如下:
public async task method() { //_logger.logdebug("当前用户:{0}", _context.currentuser); await task.delay(5000); _logger.logdebug("当前用户:{0}", _context.currentuser); _logger.logdebug("service2 method at :{0}", datetime.now); }
调整只有一处,就是获取当前上下文用户的操作,从5s延时之前,放到了5s延时之后。我们再来看看效果:
http请求上看,貌似没问题,立即响应了,是吧。我们再看看程序日志输出:
wft!service2 method没成功执行,给了我一个异常。我们看看这个异常:
context dispose异常,就是说上下文这时候已经被释放掉,对它任何操作都无效并引发异常。很容易想到,这里就是为了模拟dbcontext这种通常为scope类型的对象生命周期,这种吊毛它就这样。为啥会释放?因为http请求结束那会儿,core运行时就会dispose相应scope类型对象(注意,释放,不一定是销毁,具体销毁时间不确定)。那么,怎么解决?如果对基于di生命周期比较熟悉,就会知道,这儿应该基于http 的scope之外,单独起一个scope了,两个scope互补影响,http对应的scope结束,另外的照常运行。我们将handler处调整如下:
public async task handle(someevent notification, cancellationtoken cancellationtoken) { //await _service2.method(); using (var scope = _serviceprovider.createscope()) { var service2 = scope.serviceprovider.getservice<iservice2>(); await service2.method(); } }
无非就是handle中单独起了一个scope。我们再看运行效果:
ok,http请求23:02:58响应,service2 method 23:03:03执行完成。至此,问题才算得到解决。
顺便提一下,大家注意看截图,当前用户null,因为scope之后,原来的设置过currentuser的context已经释放掉了,新开的scope中注入的context是另外的,所以没任何信息。这里你可能会问了,那我确实需要传递上下文怎么办?答案是,订阅事件,本文中someevent未定义任何信息,如果你需要传递,做对应调整即可,比较简单,也不是重点,不做赘述。
4、总结
感觉,没什么好总结的。扎实,细心,实践,没什么解决不了的。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。