(八)分布式通信----主机Host
上节中有谈到的是通信主机(transporthost),本节中主机(servicehost)负责管理服务的生命周期。
项目中将两个主机拆分开,实现不同的功能:
通信主机:用于启动通信监听端口;
生命周期管理的主机:负责模块功能的依赖注入,管理生命周期。
先看一下启动服务端主机和客户端主机后完成通信的效果图:
文件结构如下:
servicehost 主机由servicehostbuilder来构建。
过程如下:
先看调用图:
1.program中main() 调用 servicehostbuilder 的方法:mapservices、registerservices、configureservices、configure
分别将委托填充到 list<action<icontainer>>、list<action<containerbuilder>>、list<action<iservicecollection>>、list<action<iconfigurationbuilder>> 类型的容器中。
其中 icontainer、containerbuilder 是 autofac中的容器,iservicecollection、iconfigurationbuilder 是 microsoft中的容器。
2. program中main() 调用 servicehostbuilder 的方法 usestartup<startup>() ,startup 必须实现 istartup,完成startup 的单例注入(微软中的 startup 可以不实现 istartup ,但是必须使用方法configureservices、configure)
3. program中main() 调用 servicehostbuilder 的方法 build()
(1)回调容器 list<action<icontainer>>、list<action<containerbuilder>>、list<action<iservicecollection>>、list<action<iconfigurationbuilder>> 中的委托。
容器生成过程: configureservices ---》 list<action<iservicecollection>> ---》 iservicecollection
configure ---》 list<action<iconfigurationbuilder>> ---》 iconfigurationbuilder
iservicecollection + iconfigurationbuilder ---》 iservicecollection ---》 serviceprovider
registerservices ---》 list<action<containerbuilder>> ---》 containerbuilder
containerbuilder + iservicecollection ---》 containerbuilder
mapservices ---》 list<action<icontainer>>
(2)将上面红色字体的对象通过构造函数传给new 的 servicehost 对象。
(3)调用servicehost .initialize(), 该方法中执行如下过程
a. 用serviceprovider 解析出 startup 对象
b. 回调startup的 icontainer configureservices(containerbuilder builder) , 返回构建好的容器 icontainer
c. 回调startup的 void configure(icontainer app) , icontainer中注入其它功能
(4)将包含icontainer容器的servicehost 对象返回
4. servicehost.run(), 回调主机中一直未执行的容器委托 list<action<icontainer>>
总结一下,整个过程就是将原来的四个委托的容器最后合并成一个 icontainer 容器。
解析容器中的服务,可以用 :
icontainer _container; idemoservice service = _container.resolve<idemoservice>();
服务端和客户端启动:
代码:
我们先看客户端和服务端代码:
服务端:
namespace leo.servicelaunch.server { class program { static void main(string[] args) { console.writeline("server, hello world!"); var host = new servicehostbuilder() .registerservices(builder => { builder.registertype<messagepacktransportmessagecodecfactory>().as<itransportmessagecodecfactory>().singleinstance(); builder.registertype(typeof(httpserviceexecutor)).as(typeof(iserviceexecutor)).named<iserviceexecutor>("tcp").singleinstance(); builder.register(provider => { return new dotnettyservermessagelistener(provider.resolve<ilogger<dotnettyservermessagelistener>>(), provider.resolve<itransportmessagecodecfactory>()); }).singleinstance(); builder.register(provider => { var serviceexecutor = provider.resolvekeyed<iserviceexecutor>("tcp"); var messagelistener = provider.resolve<dotnettyservermessagelistener>(); return new dotnettytransporthost(async endpoint => { await messagelistener.startasync(endpoint); return messagelistener; }, serviceexecutor); }).as<itransporthost>(); }) .useserver() // 指定监听的端口 .usestartup<startup>() .build(); using (host.run()) { console.writeline($"服务端启动成功,{datetime.now}。"); } console.readline(); } } }
namespace leo.servicelaunch.server { class startup : istartup { public icontainer configureservices(containerbuilder builder) { return builder.build(); } public void configure(icontainer app) { } } }
客户端:
namespace leo.servicelaunch.client { class program { static void main(string[] args) { console.writeline("client, hello world!"); var host = new servicehostbuilder() .registerservices(builder => { builder.registertype<messagepacktransportmessagecodecfactory>().as<itransportmessagecodecfactory>().singleinstance(); builder.register(provider => { iserviceexecutor serviceexecutor = null; if (provider.isregistered(typeof(iserviceexecutor))) // 没有注册客户端接收消息执行器,因此一直为空 serviceexecutor = provider.resolve<iserviceexecutor>(); return new dotnettytransportclientfactory(provider.resolve<itransportmessagecodecfactory>(), provider.resolve<ilogger<dotnettytransportclientfactory>>(), serviceexecutor); }).as(typeof(itransportclientfactory)).singleinstance(); }) .usestartup<startup>() .build(); using (host.run()) { startup.test(); } console.readline(); } } }
namespace leo.servicelaunch.client { class startup : istartup { private static icontainer _container; public void configure(icontainer app) { } public icontainer configureservices(containerbuilder builder) { _container = builder.build(); return _container; } internal static void test() { task.run(async () => { do { console.writeline("正在循环 1万次发送消息....."); //1w次调用 var watch = stopwatch.startnew(); for (var i = 1; i < 10000; i++) { var invokemessage = new transportmessage { id = i.tostring(), contenttype = "string", content = "你好啊,这是客户端发给服务端的消息" }; try { var endpoint = new ipendpoint(ipaddress.parse("127.0.0.1"), 981); itransportclientfactory transportclientfactory = _container.resolve<itransportclientfactory>(); var client = await transportclientfactory.createclientasync(endpoint); await client.sendasync(invokemessage); } catch (exception exception) { console.writeline(exception.tostring(), $"发起请求中发生了错误,服务id:{invokemessage.id}。"); throw; } } watch.stop(); console.writeline($"1万次发送结束,执行时间:{watch.elapsedmilliseconds}ms"); console.writeline("press any key to continue, q to exit the loop..."); var key = console.readline(); if (key.tolower() == "q") break; } while (true); }).wait(); } } }
主机:
iservicehost:
public interface iservicehost : idisposable { idisposable run(); icontainer initialize(); }
iservicehostbuilder:
public interface iservicehostbuilder { iservicehost build(); iservicehostbuilder registerservices(action<containerbuilder> builder); iservicehostbuilder configureservices(action<iservicecollection> configureservices); iservicehostbuilder configure(action<iconfigurationbuilder> builder); iservicehostbuilder mapservices(action<icontainer> mapper); }
servicehost:
public class servicehost : iservicehost { private readonly containerbuilder _builder; private istartup _startup; private icontainer _applicationservices; private readonly iserviceprovider _hostingserviceprovider; private readonly list<action<icontainer>> _mapservicesdelegates; public servicehost(containerbuilder builder, iserviceprovider hostingserviceprovider, list<action<icontainer>> mapservicesdelegate) { _builder = builder; _hostingserviceprovider = hostingserviceprovider; _mapservicesdelegates = mapservicesdelegate; } public icontainer initialize() { if (_applicationservices == null) { try { if (_applicationservices == null) { if (_startup == null) { // 解析出 startup _startup = _hostingserviceprovider.getrequiredservice<istartup>(); } //回调startup中的 configureservices, _applicationservices = _startup.configureservices(_builder); } if (_applicationservices == null) _applicationservices = _builder.build(); action<icontainer> configure = _startup.configure; configure(_applicationservices); } catch (exception ex) { console.out.writeline("应用程序启动异常: " + ex.tostring()); throw; } } return _applicationservices; } public idisposable run() { runasync().getawaiter().getresult(); return this; } public async task runasync() { if (_applicationservices != null) mapperservices(_applicationservices); } private void mapperservices(icontainer mapper) { foreach (var mapservices in _mapservicesdelegates) { mapservices(mapper); } } public void dispose() { (_hostingserviceprovider as idisposable)?.dispose(); } }
servicehostbuilder:
public class servicehostbuilder : iservicehostbuilder { private readonly list<action<iservicecollection>> _configureservicesdelegates; private readonly list<action<containerbuilder>> _registerservicesdelegates; private readonly list<action<iconfigurationbuilder>> _configuredelegates; private readonly list<action<icontainer>> _mapservicesdelegates; public servicehostbuilder() { _configureservicesdelegates = new list<action<iservicecollection>>(); _registerservicesdelegates = new list<action<containerbuilder>>(); _configuredelegates = new list<action<iconfigurationbuilder>>(); _mapservicesdelegates = new list<action<icontainer>>(); } public iservicehost build() { #region microsoft原生的容器 //执行 iservicecollection 类型的委托 var services = buildcommonservices(); //执行 iconfigurationbuilder 类型的委托 var config = configure(); //日志注入到 iservicecollection services.addlogging(); //iconfigurationbuilder 注入到 iservicecollection services.addsingleton(typeof(iconfigurationbuilder), config); //用 iservicecollection 生成 serviceprovider 服务提供器 var hostingserviceprovider = services.buildserviceprovider(); #endregion #region autofac的容器 //执行 containerbuilder 类型的委托 var hostingservices = registerservices(); #endregion //将 iservicecollection 填充到 autofac 的 containerbuilder 构建器中 hostingservices.populate(services); //把autofac的containerbuild的容器构建器、microsoft的serviceprovider服务提供器、已有的icontainer容器的委托 都放入主机中 var host = new servicehost(hostingservices, hostingserviceprovider, _mapservicesdelegates); //主机初始化以后返回的是icontainer容器 var container = host.initialize(); return host; } public iservicehostbuilder mapservices(action<icontainer> mapper) { if (mapper == null) { throw new argumentnullexception(nameof(mapper)); } _mapservicesdelegates.add(mapper); return this; } public iservicehostbuilder registerservices(action<containerbuilder> builder) { if (builder == null) { throw new argumentnullexception(nameof(builder)); } _registerservicesdelegates.add(builder); return this; } public iservicehostbuilder configureservices(action<iservicecollection> configureservices) { if (configureservices == null) { throw new argumentnullexception(nameof(configureservices)); } _configureservicesdelegates.add(configureservices); return this; } public iservicehostbuilder configure(action<iconfigurationbuilder> builder) { if (builder == null) { throw new argumentnullexception(nameof(builder)); } _configuredelegates.add(builder); return this; } private iservicecollection buildcommonservices() { var services = new servicecollection(); foreach (var configureservices in _configureservicesdelegates) { configureservices(services); } return services; } private iconfigurationbuilder configure() { //var config = new configurationbuilder().setbasepath(appcontext.basedirectory); var config = new configurationbuilder(); foreach (var configure in _configuredelegates) { configure(config); } return config; } private containerbuilder registerservices() { var hostingservices = new containerbuilder(); foreach (var registerservices in _registerservicesdelegates) { registerservices(hostingservices); } return hostingservices; } }
istartup:
public interface istartup { icontainer configureservices(containerbuilder builder); void configure(icontainer app); }
serverextensions:
public static class serverextensions { public static iservicehostbuilder useserver(this iservicehostbuilder hostbuilder) { return hostbuilder.mapservices(async mapper => { int _port = 981; string _ip = "127.0.0.1"; console.writeline($"准备启动服务主机,监听地址:{_ip}:{_port}。"); var transporthosts = mapper.resolve<ilist<itransporthost>>(); task.factory.startnew(async () => { foreach (var transporthost in transporthosts) await transporthost.startasync(_ip, _port); }).wait(); }); } public static iservicehostbuilder usestartup<tstartup>(this iservicehostbuilder hostbuilder) where tstartup : istartup { return hostbuilder .configureservices(services => { services.addsingleton(typeof(istartup), typeof(tstartup)); }); } }
上一篇: C#手工双缓冲技术用法实例分析
下一篇: Git的安装和使用教程详解
推荐阅读