(七)分布式通信----Netty实现NIO通信
程序员文章站
2023-11-23 11:04:52
项目文件结构图 1. 消息监听器(黄色框) 这部分由 Netty 实现,Netty是一个异步且非阻塞的通信框架。TCP通信实现服务端和客户端的交互。 Netty 的简单描述如下: 客户端(调用方):负责发送要执行的指令。 服务端(接收方):分为主从线程。主线程负责接收指令,将指令存入缓存区中,等待执 ......
项目文件结构图
1. 消息监听器(黄色框)
这部分由 netty 实现,netty是一个异步且非阻塞的通信框架。tcp通信实现服务端和客户端的交互。
netty 的简单描述如下:
客户端(调用方):负责发送要执行的指令。
服务端(接收方):分为主从线程。主线程负责接收指令,将指令存入缓存区中,等待执行完成后再通知客户端(非阻塞);
从线程,有不止一个线程(异步),负责从缓存池中取出线程依次执行(按队列先后顺序执行),我们通过程序来决定先执行哪个,比如先解码,后执行,再编码。
2. 指令执行器(红色框)
上图中只有服务端的实现,服务端接收到指令后会回调执行器中的过程。
客户端的时候需要在业务场景中来实现,根据业务不同,接收到服务端消息后执行的过程也不同。然后通过控制反转,由程序自动找到该过程。
3. 消息发送器(蓝色框)
将要发送的消息写入到通信管道中,服务端和客户端都有实现。
4. 客户端工厂(绿色框)
为啥没有服务端工厂呢,因为服务端是由服务端主机(host)直接创建的,主机直接调用监听器监听端口。既然我们重写了通信过程,就不能用微软原有的webhost,后续会讲到如何搭建自己的主机。
客户端工厂用来创建客户端,然后与服务端主机通信。
5. 序列化工具(白色框)
上节中我们用messagepack实现了序列化与反序列化,本节为通信,自然离不开消息序列化。
1. 消息监听器(黄色框)
上层接口:
/// <summary> /// 接受到消息的委托。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">接收到的消息。</param> public delegate task receiveddelegate(imessagesender sender, transportmessage message); /// <summary> /// 一个抽象的消息监听者。 /// </summary> public interface imessagelistener { /// <summary> /// 接收到消息的事件。 /// </summary> event receiveddelegate received; /// <summary> /// 触发接收到消息事件。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一个任务。</returns> task onreceived(imessagesender sender, transportmessage message); }
客户端:
/// <summary> /// 消息监听者。 /// </summary> public class dotnettyclientmessagelistener : imessagelistener { #region implementation of imessagelistener /// <summary> /// 接收到消息的事件。 /// </summary> public event receiveddelegate received; /// <summary> /// 触发接收到消息事件。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一个任务。</returns> public async task onreceived(imessagesender sender, transportmessage message) { if (received == null) return; await received(sender, message); } #endregion implementation of imessagelistener }
服务端:
public class dotnettyservermessagelistener : imessagelistener, idisposable { #region field private readonly ilogger<dotnettyservermessagelistener> _logger; private readonly itransportmessagedecoder _transportmessagedecoder; private readonly itransportmessageencoder _transportmessageencoder; private ichannel _channel; #endregion field #region constructor public dotnettyservermessagelistener(ilogger<dotnettyservermessagelistener> logger, itransportmessagecodecfactory codecfactory) { _logger = logger; _transportmessageencoder = codecfactory.getencoder(); _transportmessagedecoder = codecfactory.getdecoder(); } #endregion constructor #region implementation of imessagelistener public event receiveddelegate received; /// <summary> /// 触发接收到消息事件。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一个任务。</returns> public async task onreceived(imessagesender sender, transportmessage message) { if (received == null) return; await received(sender, message); } #endregion implementation of imessagelistener public async task startasync(endpoint endpoint) { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备启动服务主机,监听地址:{endpoint}。"); ieventloopgroup bossgroup = new multithreadeventloopgroup(1); ieventloopgroup workergroup = new multithreadeventloopgroup();//default eventloopcount is environment.processorcount * 2 var bootstrap = new serverbootstrap(); bootstrap .channel<tcpserversocketchannel>() .option(channeloption.sobacklog, 128) .childoption(channeloption.allocator, pooledbytebufferallocator.default) .group(bossgroup, workergroup) .childhandler(new actionchannelinitializer<ichannel>(channel => { var pipeline = channel.pipeline; pipeline.addlast(new lengthfieldprepender(4)); pipeline.addlast(new lengthfieldbasedframedecoder(int.maxvalue, 0, 4, 0, 4)); pipeline.addlast(new transportmessagechannelhandleradapter(_transportmessagedecoder)); pipeline.addlast(new serverhandler(async (contenxt, message) => { var sender = new dotnettyservermessagesender(_transportmessageencoder, contenxt); await onreceived(sender, message); }, _logger)); })); try { _channel = await bootstrap.bindasync(endpoint); if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"服务主机启动成功,监听地址:{endpoint}。"); } catch { _logger.logerror($"服务主机启动失败,监听地址:{endpoint}。 "); } } public void closeasync() { task.run(async () => { await _channel.eventloop.shutdowngracefullyasync(); await _channel.closeasync(); }).wait(); } #region implementation of idisposable /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void dispose() { task.run(async () => { await _channel.disconnectasync(); }).wait(); } #endregion implementation of idisposable #region help class private class serverhandler : channelhandleradapter { private readonly action<ichannelhandlercontext, transportmessage> _readaction; private readonly ilogger _logger; public serverhandler(action<ichannelhandlercontext, transportmessage> readaction, ilogger logger) { _readaction = readaction; _logger = logger; } #region overrides of channelhandleradapter public override void channelread(ichannelhandlercontext context, object message) { task.run(() => { var transportmessage = (transportmessage)message; _readaction(context, transportmessage); }); } public override void channelreadcomplete(ichannelhandlercontext context) { context.flush(); } public override void exceptioncaught(ichannelhandlercontext context, exception exception) { context.closeasync();//客户端主动断开需要应答,否则socket变成close_wait状态导致socket资源耗尽 if (_logger.isenabled(loglevel.error)) _logger.logerror(null,exception, $"与服务器:{context.channel.remoteaddress}通信时发送了错误。"); } #endregion overrides of channelhandleradapter } #endregion help class }
2. 指令执行器(红色框)
上层接口:
/// <summary> /// 一个抽象的服务执行器。 /// </summary> public interface iserviceexecutor { /// <summary> /// 执行。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">调用消息。</param> task executeasync(imessagesender sender, transportmessage message); }
实现类,代码中没有处理逻辑(服务发现、执行,后续有专题来谈),只有输出打印 "服务提供者接收到消息。" :
public class httpserviceexecutor : iserviceexecutor { #region field private readonly ilogger<httpserviceexecutor> _logger; #endregion field #region constructor public httpserviceexecutor(ilogger<httpserviceexecutor> logger) { _logger = logger; } #endregion constructor #region implementation of iserviceexecutor /// <summary> /// 执行。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">调用消息。</param> public async task executeasync(imessagesender sender, transportmessage message) { if (_logger.isenabled(loglevel.trace)) _logger.logtrace("服务提供者接收到消息。"); return; } #endregion implementation of iserviceexecutor }
3. 消息发送器(蓝色框)
上层接口:
/// <summary> /// 一个抽象的发送者。 /// </summary> public interface imessagesender { /// <summary> /// 发送消息。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> task sendasync(transportmessage message); /// <summary> /// 发送消息并清空缓冲区。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> task sendandflushasync(transportmessage message); }
实现类基类:
/// <summary> /// 基于dotnetty的消息发送者基类。 /// </summary> public abstract class dotnettymessagesender { private readonly itransportmessageencoder _transportmessageencoder; protected dotnettymessagesender(itransportmessageencoder transportmessageencoder) { _transportmessageencoder = transportmessageencoder; } protected ibytebuffer getbytebuffer(transportmessage message) { var data = _transportmessageencoder.encode(message); //var buffer = pooledbytebufferallocator.default.buffer(); return unpooled.wrappedbuffer(data); } }
服务端:
/// <summary> /// 基于dotnetty服务端的消息发送者。 /// </summary> public class dotnettyservermessagesender : dotnettymessagesender, imessagesender { private readonly ichannelhandlercontext _context; public dotnettyservermessagesender(itransportmessageencoder transportmessageencoder, ichannelhandlercontext context) : base(transportmessageencoder) { _context = context; } #region implementation of imessagesender /// <summary> /// 发送消息。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> public async task sendasync(transportmessage message) { var buffer = getbytebuffer(message); await _context.writeasync(buffer); } /// <summary> /// 发送消息并清空缓冲区。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> public async task sendandflushasync(transportmessage message) { var buffer = getbytebuffer(message); await _context.writeandflushasync(buffer); } #endregion implementation of imessagesender }
客户端:
/// <summary> /// 基于dotnetty客户端的消息发送者。 /// </summary> public class dotnettymessageclientsender : dotnettymessagesender, imessagesender, idisposable { private readonly ichannel _channel; public dotnettymessageclientsender(itransportmessageencoder transportmessageencoder, ichannel channel) : base(transportmessageencoder) { _channel = channel; } #region implementation of idisposable /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void dispose() { task.run(async () => { await _channel.disconnectasync(); }).wait(); } #endregion implementation of idisposable #region implementation of imessagesender /// <summary> /// 发送消息。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> public async task sendasync(transportmessage message) { var buffer = getbytebuffer(message); await _channel.writeandflushasync(buffer); } /// <summary> /// 发送消息并清空缓冲区。 /// </summary> /// <param name="message">消息内容。</param> /// <returns>一个任务。</returns> public async task sendandflushasync(transportmessage message) { var buffer = getbytebuffer(message); await _channel.writeandflushasync(buffer); } #endregion implementation of imessagesender }
4. 客户端工厂(绿色框)
上层接口:
/// <summary> /// 一个抽象的传输客户端工厂。 /// </summary> public interface itransportclientfactory { /// <summary> /// 创建客户端。 /// </summary> /// <param name="endpoint">终结点。</param> /// <returns>传输客户端实例。</returns> task<itransportclient> createclientasync(endpoint endpoint); }
/// <summary> /// 一个抽象的传输客户端。 /// </summary> public interface itransportclient { /// <summary> /// 发送消息。 /// </summary> /// <param name="message">远程调用消息模型。</param> /// <returns>远程调用消息的传输消息。</returns> task sendasync(transportmessage transportmessage); }
实现类:
/// <summary> /// 基于dotnetty的传输客户端工厂。 /// </summary> public class dotnettytransportclientfactory : itransportclientfactory, idisposable { #region field private readonly itransportmessageencoder _transportmessageencoder; private readonly itransportmessagedecoder _transportmessagedecoder; private readonly ilogger<dotnettytransportclientfactory> _logger; private readonly iserviceexecutor _serviceexecutor; private readonly concurrentdictionary<endpoint, lazy<task<itransportclient>>> _clients = new concurrentdictionary<endpoint, lazy<task<itransportclient>>>(); private readonly bootstrap _bootstrap; private static readonly attributekey<imessagesender> messagesenderkey = attributekey<imessagesender>.valueof(typeof(dotnettytransportclientfactory), nameof(imessagesender)); private static readonly attributekey<imessagelistener> messagelistenerkey = attributekey<imessagelistener>.valueof(typeof(dotnettytransportclientfactory), nameof(imessagelistener)); private static readonly attributekey<endpoint> origendpointkey = attributekey<endpoint>.valueof(typeof(dotnettytransportclientfactory), nameof(endpoint)); #endregion field #region constructor public dotnettytransportclientfactory(itransportmessagecodecfactory codecfactory, ilogger<dotnettytransportclientfactory> logger) : this(codecfactory, logger, null) { } public dotnettytransportclientfactory(itransportmessagecodecfactory codecfactory, ilogger<dotnettytransportclientfactory> logger, iserviceexecutor serviceexecutor) { _transportmessageencoder = codecfactory.getencoder(); _transportmessagedecoder = codecfactory.getdecoder(); _logger = logger; _serviceexecutor = serviceexecutor; _bootstrap = getbootstrap(); _bootstrap.handler(new actionchannelinitializer<isocketchannel>(c => { var pipeline = c.pipeline; pipeline.addlast(new lengthfieldprepender(4)); pipeline.addlast(new lengthfieldbasedframedecoder(int.maxvalue, 0, 4, 0, 4)); pipeline.addlast(new transportmessagechannelhandleradapter(_transportmessagedecoder)); pipeline.addlast(new defaultchannelhandler(this)); })); } #endregion constructor #region implementation of itransportclientfactory /// <summary> /// 创建客户端。 /// </summary> /// <param name="endpoint">终结点。</param> /// <returns>传输客户端实例。</returns> public async task<itransportclient> createclientasync(endpoint endpoint) { var key = endpoint; if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备为服务端地址:{key}创建客户端。"); try { return await _clients.getoradd(key , k => new lazy<task<itransportclient>>(async () => { //客户端对象 var bootstrap = _bootstrap; //异步连接返回channel var channel = await bootstrap.connectasync(k); var messagelistener = new dotnettyclientmessagelistener(); //设置监听 channel.getattribute(messagelistenerkey).set(messagelistener); //实例化发送者 var messagesender = new dotnettymessageclientsender(_transportmessageencoder, channel); //设置channel属性 channel.getattribute(messagesenderkey).set(messagesender); channel.getattribute(origendpointkey).set(k); //创建客户端 var client = new dotnettytransportclient(messagesender, messagelistener, _logger, _serviceexecutor); return client; } )).value;//返回实例 } catch { throw; } } #endregion implementation of itransportclientfactory #region implementation of idisposable /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void dispose() { foreach (var client in _clients.values.where(i => i.isvaluecreated)) { (client.value as idisposable)?.dispose(); } } #endregion implementation of idisposable private static bootstrap getbootstrap() { ieventloopgroup group; var bootstrap = new bootstrap(); group = new multithreadeventloopgroup(); bootstrap.channel<tcpserversocketchannel>(); bootstrap .channel<tcpsocketchannel>() .option(channeloption.tcpnodelay, true) .option(channeloption.allocator, pooledbytebufferallocator.default) .group(group); return bootstrap; } protected class defaultchannelhandler : channelhandleradapter { private readonly dotnettytransportclientfactory _factory; public defaultchannelhandler(dotnettytransportclientfactory factory) { this._factory = factory; } #region overrides of channelhandleradapter public override void channelinactive(ichannelhandlercontext context) { _factory._clients.tryremove(context.channel.getattribute(origendpointkey).get(), out var value); } public override void channelread(ichannelhandlercontext context, object message) { var transportmessage = message as transportmessage; var messagelistener = context.channel.getattribute(messagelistenerkey).get(); var messagesender = context.channel.getattribute(messagesenderkey).get(); messagelistener.onreceived(messagesender, transportmessage); } #endregion overrides of channelhandleradapter } }
/// <summary> /// 一个默认的传输客户端实现。 /// </summary> public class dotnettytransportclient : itransportclient, idisposable { #region field private readonly imessagesender _messagesender; private readonly imessagelistener _messagelistener; private readonly ilogger _logger; private readonly iserviceexecutor _serviceexecutor; #endregion field #region constructor public dotnettytransportclient(imessagesender messagesender, imessagelistener messagelistener, ilogger logger, iserviceexecutor serviceexecutor) { _messagesender = messagesender; _messagelistener = messagelistener; _logger = logger; _serviceexecutor = serviceexecutor; messagelistener.received += messagelistener_received; } #endregion constructor #region implementation of itransportclient /// <summary> /// 发送消息。 /// </summary> /// <param name="message">远程调用消息模型。</param> /// <returns>远程调用消息的传输消息。</returns> public async task sendasync(transportmessage transportmessage) { try { if (_logger.isenabled(loglevel.debug)) _logger.logdebug("准备发送消息。"); try { //发送 await _messagesender.sendandflushasync(transportmessage); } catch (exception exception) { throw new exception("与服务端通讯时发生了异常。", exception); } if (_logger.isenabled(loglevel.debug)) _logger.logdebug("消息发送成功。"); } catch (exception exception) { if (_logger.isenabled(loglevel.error)) _logger.logerror(null,exception, "消息发送失败。"); throw; } } #endregion implementation of itransportclient #region implementation of idisposable /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void dispose() { (_messagesender as idisposable)?.dispose(); (_messagelistener as idisposable)?.dispose(); } #endregion implementation of idisposable #region private method private async task messagelistener_received(imessagesender sender, transportmessage message) { if (_logger.isenabled(loglevel.trace)) _logger.logtrace("服务消费者接收到消息。"); if (_serviceexecutor != null) await _serviceexecutor.executeasync(sender, message); } #endregion private method }
5. 序列化工具(白色框)
需要继承自 dotnetty.transport.channels.channelhandleradapter,才能被 netty 调用:
public class transportmessagechannelhandleradapter : channelhandleradapter { private readonly itransportmessagedecoder _transportmessagedecoder; public transportmessagechannelhandleradapter(itransportmessagedecoder transportmessagedecoder) { _transportmessagedecoder = transportmessagedecoder; } #region overrides of channelhandleradapter public override void channelread(ichannelhandlercontext context, object message) { var buffer = (ibytebuffer)message; var data = new byte[buffer.readablebytes]; buffer.readbytes(data); var transportmessage = _transportmessagedecoder.decode(data); context.firechannelread(transportmessage); referencecountutil.release(buffer); } #endregion overrides of channelhandleradapter }