欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Reactor模式的.net版本简单实现--DEMO

程序员文章站 2022-06-30 15:58:08
近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。 闲话少说 ......

     近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。

     闲话少说,进入正题。netty的性能之所以能够达到如此的高度。主要由于他使用Reactor模式处理socket的请求,让服务器的使用率最大化,且尽量减少线程的开销。本文章主要简单介绍下Reactor模式。

一、reactor概论

reactor模式主要解决处理多个客户端请求的设计模式。

Reactor模式的.net版本简单实现--DEMO

首先从类图我们可以得知:

Dispatcher:Handler管理器,以及调用度。他依赖于Demultiplexer类

Demultiplexer:事件管理器,接受外部的事件,并提供给Dispatch使用。

Handle:事件源,表示触发了那些事件

EventHandler:各种类型的处理器,用于处理具体的业务,以及I/O的读写

当然,也可以通过序列图看出首先需要初始化Dispatcher, Demultiplexer等相关类,以及注册具体的事件处理器。

二、代码的具体实现

类图如下[源码下载]:

Reactor模式的.net版本简单实现--DEMO

2.1 多路复用事件处理器的代码

public class Demultiplexer
    {
        private ConcurrentQueue<Event> eventQuene = new ConcurrentQueue<Event>();
        private Object lockObj = new Object();

        public List<Event> Select()
        {
            return this.Select(0);
        }
        public List<Event> Select(int time)
        {
            if(time > 0)
            {
                if (this.eventQuene.IsEmpty)
                {
                    lock (lockObj)
                    {
                        if (this.eventQuene.IsEmpty)
                        {
                            System.Threading.Thread.Sleep(time);
                        }
                    }
                }
            }
            List<Event> events = new List<Event>();
            while(this.eventQuene.Count > 0)
            {
                Event tmp;
                if(this.eventQuene.TryDequeue(out tmp))
                {
                    events.Add(tmp);
                }
            }
            return events;
        }
        public void AddEvent(Event argEvent)
        {
            this.eventQuene.Enqueue(argEvent);
        }
    }

此类主要防止多线程的共同竞争,因为多路径复用选择器会被多个线程同时使用。所以使用的线程安全的Queue。

2.2 Handler触发器和管理器

/// <summary>
    /// Reactor的事件Handler触发器,提供事件Handler的注册,移除
    /// </summary>
    public class EventDispatch
    {
        private Demultiplexer demultiplexer;
        Dictionary<EventType, EventHandler> eventHandlerMap = new Dictionary<EventType, EventHandler>();

        public EventDispatch(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void RegisterHandler(EventType eventType, EventHandler eventHandler)
        {
            this.eventHandlerMap.Add(eventType, eventHandler);
        }
        public void RemoveHandler(EventType eventType)
        {
            this.eventHandlerMap.Remove(eventType);
        }

        public void HandleEvents()
        {
            this.Dispatch();
        }
        public void Dispatch()
        {
            string log = string.Format("thread id: {0} Dispatch", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                List<Event> events = this.demultiplexer.Select();
                foreach(var itemEvent in events)
                {
                    EventHandler eventHandler = this.eventHandlerMap[itemEvent.EventType];
                    eventHandler.Handle(itemEvent);
                }
                System.Threading.Thread.Sleep(1000);
            }
        }
    }

主要职责,对Handler的注册、移除的管理,以及通过 多路复用选择器 选择相应的Handler进行处理。

2.3 服务端的实现

/// <summary>
    /// 开启接受请求的服务端
    /// </summary>
    public class AcceptRuner
    {
        private System.Collections.Concurrent.ConcurrentQueue<object> sourceQueue = new System.Collections.Concurrent.ConcurrentQueue<object>();

        private Demultiplexer demultiplexer;

        public AcceptRuner(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void adConnection(object source)
        {
            this.sourceQueue.Enqueue(source);
        }

        public void Run()
        {
            string log = string.Format("thread id: {0} AcceptRunner", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                object source;
                if(this.sourceQueue.TryDequeue(out source))
                {
                    Event acceptEvent = new Event()
                    {
                        EventType = EventType.Accept,
                        Source = source
                    };
                    this.demultiplexer.AddEvent(acceptEvent);
                }
            }
        }
    }

此类效仿netty的serverBoostrap的实现,将外部新的连接以事件对象的形式添加到 多路复用选择器上。

2.4 其他类

Event:事件基类

EventHandler:事件处理器抽象基类。他派生了:AcceptEventHandler,ReadEventHandler。

EventType:事件类型

三、备注说明

1. 代码没有贴完整。但下载包就是完整的。

2. 这只我对Reactor模式的理解,如有偏颇之处,还望各拉指点一二。