网络编程18
AbstractUnsafe
- register方法:把channel和selector挂钩,如果pipeline的通道已经是active状态的,会调用beginRead方法,beginRead方法又会调用doBeginRead方法,把真正关心的事件进行注册
服务器启动流程
-
1.首先创造了一个eventloopgroup,支持指定线程数,如果不传,默认会从配置中读取,如果配置没有,就是CPU逻辑核心数乘2,io操作的线程数一般都是这么设置的,会进入到MultithreadEventExecutorGroup方法中
-
MultithreadEventExecutorGroup
1.executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
是这样一个执行器,初始化时创建了一个线程工厂,并且在execute方法里面就会创建一个新的线程,其中对应于下面bind—doBind—initAndRegister—init中加入接受连接的handler的异步任务,每一个eventloopgroup都会有一个executor。
2.children就是每一个eventloop。
3.按照传进来的线程数,依次进行相关的数组循环,每一个数组元素进行newChild操作,就是一个eventloop。
eventloop的构造方法里面,把selector相关的参数初始化出来,selector就是通过openSelector获得的。
#所以第一步,new出来一个eventloopgroup的时候,就是按照cpu核心数创建相对应数量的eventloop,在每一个eventloop里面初始化出一个selector,方便后面channel将关注的事件注册到上面,而NioEventLoop里面有一个专门的run方法,在一个for循环里面,不断调用selector相关方法,处理各种事件集,netty这里有一个小小的技巧,因为NioEventLoop除了处理io事件之外,有一些系统的任务也是由它处理,需要进行区分,引入了一个ioRatio参数,即处理io事件的比例,根据比例来分配资源处理io事件,所以eventloop会判断执行当前任务的线程是不是eventloop本身的线程,缺省情况ioRatio是50%,即一半一半,eventloop里面包装了一个线程,但是这个线程不是在NioEventLoop中声明的,而是在它的父类SingleThreadEventExecutors,里面声明了thread变量和taskQueue队列,而eventloopgroup里面execute会创建一个线程并执行,但是还没有和eventloop里面的线程挂钩,在哪里挂钩?
-
-
2.拿到ServerBootstrap 这个服务器启动必备的实例
-
3.把channel、通讯地址、handler这些参数set到ServerBootstrap 中,唯一有点的区别的是channel,它设置的class对象,是反射的方法拿到每一个channel的相关实例
b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 把新创建的handler加到SocketChannel里面 ch.pipeline().addLast(serverHandler); } });
-
4.ChannelFuture f = b.bind().sync();
其中bind方法是非常关键的,是关注的重点,它是有它的父类AbstractBootstrap实现的,而bind方法又跳进doBind方法,
-
doBind方法
1.执行initAndRegister()方法
1.1.通过反射工厂new出channel来
$NioServerSocketChannel在初始化的时候做了什么?
拿到原生jdk中的网络编程的socketChannel,设定当前NioServerSocketChannel关注的事件是接受连接事件,但是还没有往selector上面注册
$在AbstractChannel的构造方法里面做了什么?
会new出一个pipeline,用的是DefaultChannelPipeline,会在构造方法中创建两个handler,head和tail
#newChannel会创建一个defaultPipeline,而pipeline已经有两个handler了
***总结1.1:把serverSocketChannel new出来,配置接受连接事件,并创建好pipeline
1.2.init(channel)------初始化channel,首先对serverSocketChannel各种相关参数进行配置,把子channel的相关参数也设置好,往pipeline里面加入handler,同时除了我们set进来的handler,给我们的主channel加一个netty自带的ServerBootstrapAcceptor的handler,主channel里面就有3个handler了,这个handler是用来接收连接的,所以从理论上来说是入站handler,所以要来处理channelRead事件,而channelRead中接收的Object-msg本质上是一个一个的相关channel,对这些channel做一些处理,就是用之前自己set进去的handler进行过滤,我们定义的serverBootstrap.childHandler,包括编码解码、解决粘包半包等各种handler,所以给msg里面创造出来的每一个子channel都加入我们自己定义的子handler,并进行相关的注册,
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } // 注意这里的加入是异步加入的,所以会执行startThread方法 private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } } // 然后继续执行doStartThread方法 private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // Thread.currentThread()不是主线程了 // 是打包成Runnable,放到executor.execute中执行 // 而这个runnable就被打包到eventloopgroup中new出来的executor中执行execute方法时的Runnable command // 而指向runnable方法的线程就是NioEventLoop里面它应该持有的线程来执行的 // thread是eventloop自己的线程 // 把eventloop和执行的单线程挂钩 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { // 进行事件轮询,做各种io事件的处理 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } } terminationFuture.setSuccess(null); } } } } }); }
$为什么channelRead中接受的msg是channel?
(1).等到看到处理服务器接受连接事件的时候就知道了
(2).AbstractNioMessageChannel里面doReadMessage接受了一个list,netty是按照channel处理的,对于服务端的serverSocketChannel,它处理的channelRead,自然就是channel,传入类型是object,而不是一个byte数组,至于怎么传进来的,在后面服务器接受连接事件时就明白了
$我们写的各种子channel里面有pipeline,那父channel里面也有pipeline吗?
一样是有的,pipeline是在AbstractChannel里面创建的,所以它的子类里面肯定继承了它所有的成员变量,不管是NioServerSocketChannel还是NioSocketChannel里面,都有pipeline,只是说对于NioServerSocketChannel而言,我们只是不加各种handler的,一样是可以加的,只是在网络通讯中,我们关注的网络数据的处理,所以去给每一个socketChannel去加handler,
***总结1.2:把eventloop当前的线程创造出来,并且和eventloop进行挂钩,并且加入一个ServerBootstrapAcceptor,入站专门处理客户端连接的handler,以及启动selector相关的方法,不断轮询,准备处理各种事件
1.3config().group().register(channel),并进行注册相关工作
®ister是一个抽象方法,通过MultithreadEventLoopGroup实现的register方法,MultithreadEventLoopGroup继续调用SingleThreadEventLoop的register方法进行注册,这个方法又调用了promise.channel().unsafe().register(this, promise)来进行注册,也就是unsafe的register进行注册,这个register主要是把当前channel和selector进行挂钩,但是还没有真正写入关注的事件,注册的是一个空事件
2.根据regFuture,通过promise判断初始化和注册做完没有,如果做完了,直接调用doBind0方法,如果没有做完,加入一个侦听器,等到初始化和注册做完了之后,调用侦听器里面的方法,最终起始还是调用了doBind0方法
2.1doBind0方法
通过一个异步的方式来执行channel.bind方法,channel.bind其实会调用channelPipeline.bind的绑定方法,意味着这个事件开始在serverSocketChannel上面所属的pipeline上开始传播,pipeline上有3个handler-----head、serverAccept、tail,只有head是出站和入站兼有,其余两个都是入站,因为bind是出站操作(因为是服务器端接收数据后,做出的响应),所以最终由head来执行绑定方法
$HeadContext.bind方法
该方法调用unsafe.bind(),又跳到AbstractUnsafe方法里面的bind方法,这里要调用的是AbstractChannel的doBind(localAddress)方法,是一个抽象方法,又跳到NioServerSocketChannel里的doBind方法,里面jdk原生的bind操作,当绑定操作执行完后,又回到AbstractChannel,触发主channel的fireChannelActive()事件,channelActive是一个入站事件(所有以fire开头的都是入站事件),同样要在3个handler上面进行流转,
$$channelActive流转
(1)head,ctx.fireChannelActive()继续往后面传播,readIfIsAutoRead()中调用channel.read方法,又调用了pipeline.read事件,read是一个出站操作,所以只有head处理,调用read方法,又调用unsafe.beginRead()方法,又调用AbstractUnsafe.doBeginRead方法,又调用AbstractNioChannel.doBeginRead方法,才真正把当前channel关注的事件和selector进行挂钩
到此时才算完成了bind操作,我们自己实现服务器时很简单的两行代码,但是netty底层帮我们走完了这些复杂的逻辑
-
selector什么时候open的
- eventloop的构造方法中