欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

(八)分布式通信----主机Host

程序员文章站 2022-10-15 16:06:33
==>>点击查看本系列文章目录 上节中有谈到的是通信主机(TransportHost),本节中主机(ServiceHost)负责管理服务的生命周期。 项目中将两个主机拆分开,实现不同的功能: 通信主机:用于启动通信监听端口; 生命周期管理的主机:负责模块功能的依赖注入,管理生命周期。 先看一下启动服 ......

 

 

上节中有谈到的是通信主机(transporthost),本节中主机(servicehost)负责管理服务的生命周期。

项目中将两个主机拆分开,实现不同的功能:

通信主机:用于启动通信监听端口;

生命周期管理的主机:负责模块功能的依赖注入,管理生命周期。

 

先看一下启动服务端主机和客户端主机后完成通信的效果图:

(八)分布式通信----主机Host

 

 文件结构如下:

(八)分布式通信----主机Host

 

servicehost 主机由servicehostbuilder来构建。

过程如下:

先看调用图: 

(八)分布式通信----主机Host

1.program中main() 调用 servicehostbuilder 的方法:mapservices、registerservices、configureservices、configure

  分别将委托填充到 list<action<icontainer>>、list<action<containerbuilder>>、list<action<iservicecollection>>、list<action<iconfigurationbuilder>> 类型的容器中。

  其中 icontainer、containerbuilder 是 autofac中的容器,iservicecollection、iconfigurationbuilder 是 microsoft中的容器。

2. program中main() 调用 servicehostbuilder 的方法 usestartup<startup>()  ,startup 必须实现 istartup,完成startup 的单例注入(微软中的 startup 可以不实现 istartup ,但是必须使用方法configureservices、configure)

3. program中main() 调用 servicehostbuilder 的方法 build()

  (1)回调容器 list<action<icontainer>>、list<action<containerbuilder>>、list<action<iservicecollection>>、list<action<iconfigurationbuilder>> 中的委托。

      容器生成过程: configureservices   ---》  list<action<iservicecollection>>      ---》  iservicecollection

            configure      ---》  list<action<iconfigurationbuilder>>      ---》  iconfigurationbuilder

            iservicecollection + iconfigurationbuilder  ---》  iservicecollection   ---》  serviceprovider

            registerservices     ---》  list<action<containerbuilder>>       ---》  containerbuilder

            containerbuilder + iservicecollection         ---》  containerbuilder 

            mapservices      ---》  list<action<icontainer>>

  (2)将上面红色字体的对象通过构造函数传给new 的 servicehost 对象。

  (3)调用servicehost .initialize(),  该方法中执行如下过程

    a. 用serviceprovider 解析出 startup 对象

    b. 回调startup的 icontainer configureservices(containerbuilder builder) , 返回构建好的容器 icontainer

    c. 回调startup的 void configure(icontainer app) , icontainer中注入其它功能

  (4)将包含icontainer容器的servicehost 对象返回

4. servicehost.run(), 回调主机中一直未执行的容器委托 list<action<icontainer>> 

总结一下,整个过程就是将原来的四个委托的容器最后合并成一个 icontainer 容器。

解析容器中的服务,可以用 :

icontainer _container;
idemoservice service = _container.resolve<idemoservice>(); 

 

(八)分布式通信----主机Host

服务端和客户端启动:

(八)分布式通信----主机Host

 

 

代码:

我们先看客户端和服务端代码:

服务端:

namespace leo.servicelaunch.server
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("server, hello world!");

            var host = new servicehostbuilder()
                .registerservices(builder =>
                {
                    builder.registertype<messagepacktransportmessagecodecfactory>().as<itransportmessagecodecfactory>().singleinstance();
                    builder.registertype(typeof(httpserviceexecutor)).as(typeof(iserviceexecutor)).named<iserviceexecutor>("tcp").singleinstance();
                    builder.register(provider =>
                    {
                        return new dotnettyservermessagelistener(provider.resolve<ilogger<dotnettyservermessagelistener>>(),
                              provider.resolve<itransportmessagecodecfactory>());
                    }).singleinstance();
                    builder.register(provider =>
                    {
                        var serviceexecutor = provider.resolvekeyed<iserviceexecutor>("tcp");
                        var messagelistener = provider.resolve<dotnettyservermessagelistener>();
                        return new dotnettytransporthost(async endpoint =>
                        {
                            await messagelistener.startasync(endpoint);
                            return messagelistener;
                        }, serviceexecutor);
                    }).as<itransporthost>();
                })
                .useserver()  // 指定监听的端口
                .usestartup<startup>()
                .build();

            using (host.run())
            {
                console.writeline($"服务端启动成功,{datetime.now}。");
            }

            console.readline();
        }

    }
}
namespace leo.servicelaunch.server
{
    class startup : istartup
    {
        public icontainer configureservices(containerbuilder builder)
        {
            return builder.build();
        }

        public void configure(icontainer app)
        {

        }
    }
}

客户端:

namespace leo.servicelaunch.client
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("client, hello world!");

            var host = new servicehostbuilder()
                .registerservices(builder =>
                {
                    builder.registertype<messagepacktransportmessagecodecfactory>().as<itransportmessagecodecfactory>().singleinstance();
                    builder.register(provider =>
                    {
                        iserviceexecutor serviceexecutor = null;  
                        if (provider.isregistered(typeof(iserviceexecutor)))  // 没有注册客户端接收消息执行器,因此一直为空
                            serviceexecutor = provider.resolve<iserviceexecutor>();
                        return new dotnettytransportclientfactory(provider.resolve<itransportmessagecodecfactory>(),
                            provider.resolve<ilogger<dotnettytransportclientfactory>>(),
                            serviceexecutor);
                    }).as(typeof(itransportclientfactory)).singleinstance();
                })
                .usestartup<startup>()
                .build();

            using (host.run())
            {
                startup.test();
            }
            console.readline();
        }
    }
}
namespace leo.servicelaunch.client
{
    class startup : istartup
    {
        private static icontainer _container;
        public void configure(icontainer app)
        {
        }

        public icontainer configureservices(containerbuilder builder)
        {
            _container = builder.build();
            return _container;
        }

        internal static void test()
        {
            task.run(async () =>
            {
                do
                {
                    console.writeline("正在循环 1万次发送消息.....");

                    //1w次调用
                    var watch = stopwatch.startnew();
                    for (var i = 1; i < 10000; i++)
                    {
                        var invokemessage = new transportmessage
                        {
                            id = i.tostring(),
                            contenttype = "string",
                            content = "你好啊,这是客户端发给服务端的消息"
                        };
                        try
                        {
                            var endpoint = new ipendpoint(ipaddress.parse("127.0.0.1"), 981);
                            itransportclientfactory transportclientfactory = _container.resolve<itransportclientfactory>();
                            var client = await transportclientfactory.createclientasync(endpoint);
                            await client.sendasync(invokemessage);
                        }
                        catch (exception exception)
                        {
                            console.writeline(exception.tostring(), $"发起请求中发生了错误,服务id:{invokemessage.id}。");
                            throw;
                        }
                    }
                    watch.stop();
                    console.writeline($"1万次发送结束,执行时间:{watch.elapsedmilliseconds}ms");
                    console.writeline("press any key to continue, q to exit the loop...");
                    var key = console.readline();
                    if (key.tolower() == "q")
                        break;
                } while (true);
            }).wait();
        }
    }
}

主机:

iservicehost:

    public interface iservicehost : idisposable
    {
        idisposable run();

        icontainer initialize();
    }

iservicehostbuilder:

    public interface iservicehostbuilder
    {
        iservicehost build();

        iservicehostbuilder registerservices(action<containerbuilder> builder);

        iservicehostbuilder configureservices(action<iservicecollection> configureservices);

        iservicehostbuilder configure(action<iconfigurationbuilder> builder);

        iservicehostbuilder mapservices(action<icontainer> mapper);
    }

servicehost:

public class servicehost : iservicehost
    {
        private readonly containerbuilder _builder;
        private istartup _startup;
        private icontainer _applicationservices;
        private readonly iserviceprovider _hostingserviceprovider;
        private readonly list<action<icontainer>> _mapservicesdelegates;

        public servicehost(containerbuilder builder,
            iserviceprovider hostingserviceprovider,
             list<action<icontainer>> mapservicesdelegate)
        {
            _builder = builder;
            _hostingserviceprovider = hostingserviceprovider;
            _mapservicesdelegates = mapservicesdelegate;
        }

        public icontainer initialize()
        {
            if (_applicationservices == null)
            {
                try
                {
                    if (_applicationservices == null)
                    {
                        if (_startup == null)
                        {
                            // 解析出 startup 
                            _startup = _hostingserviceprovider.getrequiredservice<istartup>();
                        }
                        //回调startup中的 configureservices,
                        _applicationservices = _startup.configureservices(_builder);
                    }
                    if (_applicationservices == null)
                        _applicationservices = _builder.build();
                    action<icontainer> configure = _startup.configure;
                    configure(_applicationservices);
                }
                catch (exception ex)
                {
                    console.out.writeline("应用程序启动异常: " + ex.tostring());
                    throw;
                }
            }
            return _applicationservices;
        }

        public idisposable run()
        {
            runasync().getawaiter().getresult();
            return this;
        }

        public async task runasync()
        {
            if (_applicationservices != null)
                mapperservices(_applicationservices);
        }

        private void mapperservices(icontainer mapper)
        {
            foreach (var mapservices in _mapservicesdelegates)
            {
                mapservices(mapper);
            }
        }

        public void dispose()
        {
            (_hostingserviceprovider as idisposable)?.dispose();
        }
    }

servicehostbuilder:

public class servicehostbuilder : iservicehostbuilder
    {
        private readonly list<action<iservicecollection>> _configureservicesdelegates;
        private readonly list<action<containerbuilder>> _registerservicesdelegates;
        private readonly list<action<iconfigurationbuilder>> _configuredelegates;
        private readonly list<action<icontainer>> _mapservicesdelegates;

        public servicehostbuilder()
        {
            _configureservicesdelegates = new list<action<iservicecollection>>();
            _registerservicesdelegates = new list<action<containerbuilder>>();
            _configuredelegates = new list<action<iconfigurationbuilder>>();
            _mapservicesdelegates = new list<action<icontainer>>();

        }

        public iservicehost build()
        {
            #region microsoft原生的容器
            //执行 iservicecollection 类型的委托
            var services = buildcommonservices();
            //执行 iconfigurationbuilder 类型的委托
            var config = configure();
            //日志注入到 iservicecollection
            services.addlogging();
            //iconfigurationbuilder 注入到 iservicecollection
            services.addsingleton(typeof(iconfigurationbuilder), config);
            //用 iservicecollection 生成 serviceprovider 服务提供器
            var hostingserviceprovider = services.buildserviceprovider();
            #endregion

            #region autofac的容器
            //执行 containerbuilder 类型的委托
            var hostingservices = registerservices();
            #endregion

            //将 iservicecollection 填充到 autofac 的 containerbuilder 构建器中
            hostingservices.populate(services);


            //把autofac的containerbuild的容器构建器、microsoft的serviceprovider服务提供器、已有的icontainer容器的委托 都放入主机中
            var host = new servicehost(hostingservices, hostingserviceprovider, _mapservicesdelegates);
            //主机初始化以后返回的是icontainer容器
            var container = host.initialize();
            return host;
        }

        public iservicehostbuilder mapservices(action<icontainer> mapper)
        {
            if (mapper == null)
            {
                throw new argumentnullexception(nameof(mapper));
            }
            _mapservicesdelegates.add(mapper);
            return this;
        }

        public iservicehostbuilder registerservices(action<containerbuilder> builder)
        {
            if (builder == null)
            {
                throw new argumentnullexception(nameof(builder));
            }
            _registerservicesdelegates.add(builder);
            return this;
        }

        public iservicehostbuilder configureservices(action<iservicecollection> configureservices)
        {
            if (configureservices == null)
            {
                throw new argumentnullexception(nameof(configureservices));
            }
            _configureservicesdelegates.add(configureservices);
            return this;
        }

        public iservicehostbuilder configure(action<iconfigurationbuilder> builder)
        {
            if (builder == null)
            {
                throw new argumentnullexception(nameof(builder));
            }
            _configuredelegates.add(builder);
            return this;
        }

        private iservicecollection buildcommonservices()
        {
            var services = new servicecollection();
            foreach (var configureservices in _configureservicesdelegates)
            {
                configureservices(services);
            }
            return services;
        }

        private iconfigurationbuilder configure()
        {
            //var config = new configurationbuilder().setbasepath(appcontext.basedirectory);
            var config = new configurationbuilder();
            foreach (var configure in _configuredelegates)
            {
                configure(config);
            }
            return config;
        }

        private containerbuilder registerservices()
        {
            var hostingservices = new containerbuilder();
            foreach (var registerservices in _registerservicesdelegates)
            {
                registerservices(hostingservices);
            }
            return hostingservices;
        }
    }

istartup:

    public interface istartup
    {
        icontainer configureservices(containerbuilder builder);

        void configure(icontainer app);
    }

serverextensions:

    public static class serverextensions
    {
        public static iservicehostbuilder useserver(this iservicehostbuilder hostbuilder)
        {
            return hostbuilder.mapservices(async mapper =>
            {
                int _port = 981;
                string _ip = "127.0.0.1";

                console.writeline($"准备启动服务主机,监听地址:{_ip}:{_port}。");
                var transporthosts = mapper.resolve<ilist<itransporthost>>();
                task.factory.startnew(async () =>
                {
                    foreach (var transporthost in transporthosts)
                        await transporthost.startasync(_ip, _port);
                }).wait();
            });
        }

        public static iservicehostbuilder usestartup<tstartup>(this iservicehostbuilder hostbuilder) where tstartup : istartup
        {
            return hostbuilder
                .configureservices(services =>
                {
                    services.addsingleton(typeof(istartup), typeof(tstartup));
                });
        }
    }