欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

网络编程18

程序员文章站 2022-07-12 08:06:58
...

AbstractUnsafe

  • register方法:把channel和selector挂钩,如果pipeline的通道已经是active状态的,会调用beginRead方法,beginRead方法又会调用doBeginRead方法,把真正关心的事件进行注册

服务器启动流程

  • 1.首先创造了一个eventloopgroup,支持指定线程数,如果不传,默认会从配置中读取,如果配置没有,就是CPU逻辑核心数乘2,io操作的线程数一般都是这么设置的,会进入到MultithreadEventExecutorGroup方法中

    • MultithreadEventExecutorGroup

      1.executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

      是这样一个执行器,初始化时创建了一个线程工厂,并且在execute方法里面就会创建一个新的线程,其中对应于下面bind—doBind—initAndRegister—init中加入接受连接的handler的异步任务,每一个eventloopgroup都会有一个executor。

      2.children就是每一个eventloop。

      3.按照传进来的线程数,依次进行相关的数组循环,每一个数组元素进行newChild操作,就是一个eventloop。

      eventloop的构造方法里面,把selector相关的参数初始化出来,selector就是通过openSelector获得的。

    #所以第一步,new出来一个eventloopgroup的时候,就是按照cpu核心数创建相对应数量的eventloop,在每一个eventloop里面初始化出一个selector,方便后面channel将关注的事件注册到上面,而NioEventLoop里面有一个专门的run方法,在一个for循环里面,不断调用selector相关方法,处理各种事件集,netty这里有一个小小的技巧,因为NioEventLoop除了处理io事件之外,有一些系统的任务也是由它处理,需要进行区分,引入了一个ioRatio参数,即处理io事件的比例,根据比例来分配资源处理io事件,所以eventloop会判断执行当前任务的线程是不是eventloop本身的线程,缺省情况ioRatio是50%,即一半一半,eventloop里面包装了一个线程,但是这个线程不是在NioEventLoop中声明的,而是在它的父类SingleThreadEventExecutors,里面声明了thread变量和taskQueue队列,而eventloopgroup里面execute会创建一个线程并执行,但是还没有和eventloop里面的线程挂钩,在哪里挂钩?

  • 2.拿到ServerBootstrap 这个服务器启动必备的实例

  • 3.把channel、通讯地址、handler这些参数set到ServerBootstrap 中,唯一有点的区别的是channel,它设置的class对象,是反射的方法拿到每一个channel的相关实例

    b.group(group)
     .channel(NioServerSocketChannel.class)
     .localAddress(new InetSocketAddress(port)
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
             // 把新创建的handler加到SocketChannel里面
             ch.pipeline().addLast(serverHandler);
         }
     });
    
  • 4.ChannelFuture f = b.bind().sync();

    其中bind方法是非常关键的,是关注的重点,它是有它的父类AbstractBootstrap实现的,而bind方法又跳进doBind方法,

    • doBind方法

      1.执行initAndRegister()方法

      1.1.通过反射工厂new出channel来

      $NioServerSocketChannel在初始化的时候做了什么?

      拿到原生jdk中的网络编程的socketChannel,设定当前NioServerSocketChannel关注的事件是接受连接事件,但是还没有往selector上面注册

      $在AbstractChannel的构造方法里面做了什么?

      会new出一个pipeline,用的是DefaultChannelPipeline,会在构造方法中创建两个handler,head和tail

      #newChannel会创建一个defaultPipeline,而pipeline已经有两个handler了

      ***总结1.1:把serverSocketChannel new出来,配置接受连接事件,并创建好pipeline

      1.2.init(channel)------初始化channel,首先对serverSocketChannel各种相关参数进行配置,把子channel的相关参数也设置好,往pipeline里面加入handler,同时除了我们set进来的handler,给我们的主channel加一个netty自带的ServerBootstrapAcceptor的handler,主channel里面就有3个handler了,这个handler是用来接收连接的,所以从理论上来说是入站handler,所以要来处理channelRead事件,而channelRead中接收的Object-msg本质上是一个一个的相关channel,对这些channel做一些处理,就是用之前自己set进去的handler进行过滤,我们定义的serverBootstrap.childHandler,包括编码解码、解决粘包半包等各种handler,所以给msg里面创造出来的每一个子channel都加入我们自己定义的子handler,并进行相关的注册,

      @Override
      public void execute(Runnable task) {
          if (task == null) {
              throw new NullPointerException("task");
          }
      
          boolean inEventLoop = inEventLoop();
          addTask(task);
          if (!inEventLoop) {
              startThread();
              if (isShutdown() && removeTask(task)) {
                  reject();
              }
          }
      
          if (!addTaskWakesUp && wakesUpForTask(task)) {
              wakeup(inEventLoop);
          }
      }
      // 注意这里的加入是异步加入的,所以会执行startThread方法
      
      private void startThread() {
          if (state == ST_NOT_STARTED) {
              if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                  try {
                      doStartThread();
                  } catch (Throwable cause) {
                      STATE_UPDATER.set(this, ST_NOT_STARTED);
                      PlatformDependent.throwException(cause);
                  }
              }
          }
      }
      // 然后继续执行doStartThread方法
      
      private void doStartThread() {
          assert thread == null;
          executor.execute(new Runnable() {
              @Override
              public void run() {
                  // Thread.currentThread()不是主线程了
                  // 是打包成Runnable,放到executor.execute中执行
                  // 而这个runnable就被打包到eventloopgroup中new出来的executor中执行execute方法时的Runnable command
                  // 而指向runnable方法的线程就是NioEventLoop里面它应该持有的线程来执行的
                  // thread是eventloop自己的线程
                  // 把eventloop和执行的单线程挂钩
                  thread = Thread.currentThread();
                  if (interrupted) {
                      thread.interrupt();
                  }
      
                  boolean success = false;
                  updateLastExecutionTime();
                  try {
                      // 进行事件轮询,做各种io事件的处理
                      SingleThreadEventExecutor.this.run();
                      success = true;
                  } catch (Throwable t) {
                      logger.warn("Unexpected exception from an event executor: ", t);
                  } finally {
                      for (;;) {
                          int oldState = state;
                          if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                              SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                              break;
                          }
                      }
      
                      // Check if confirmShutdown() was called at the end of the loop.
                      if (success && gracefulShutdownStartTime == 0) {
                          if (logger.isErrorEnabled()) {
                              logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                           SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                           "be called before run() implementation terminates.");
                          }
                      }
      
                      try {
                          // Run all remaining tasks and shutdown hooks.
                          for (;;) {
                              if (confirmShutdown()) {
                                  break;
                              }
                          }
                      } finally {
                          try {
                              cleanup();
                          } finally {
                              STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                              threadLock.release();
                              if (!taskQueue.isEmpty()) {
                                  if (logger.isWarnEnabled()) {
                                      logger.warn("An event executor terminated with " +
                                                  "non-empty task queue (" + taskQueue.size() + ')');
                                  }
                              }
      
                              terminationFuture.setSuccess(null);
                          }
                      }
                  }
              }
          });
      }
      

      $为什么channelRead中接受的msg是channel?

      (1).等到看到处理服务器接受连接事件的时候就知道了

      (2).AbstractNioMessageChannel里面doReadMessage接受了一个list,netty是按照channel处理的,对于服务端的serverSocketChannel,它处理的channelRead,自然就是channel,传入类型是object,而不是一个byte数组,至于怎么传进来的,在后面服务器接受连接事件时就明白了

      $我们写的各种子channel里面有pipeline,那父channel里面也有pipeline吗?

      一样是有的,pipeline是在AbstractChannel里面创建的,所以它的子类里面肯定继承了它所有的成员变量,不管是NioServerSocketChannel还是NioSocketChannel里面,都有pipeline,只是说对于NioServerSocketChannel而言,我们只是不加各种handler的,一样是可以加的,只是在网络通讯中,我们关注的网络数据的处理,所以去给每一个socketChannel去加handler,

      ***总结1.2:把eventloop当前的线程创造出来,并且和eventloop进行挂钩,并且加入一个ServerBootstrapAcceptor,入站专门处理客户端连接的handler,以及启动selector相关的方法,不断轮询,准备处理各种事件

      1.3config().group().register(channel),并进行注册相关工作

      &register是一个抽象方法,通过MultithreadEventLoopGroup实现的register方法,MultithreadEventLoopGroup继续调用SingleThreadEventLoop的register方法进行注册,这个方法又调用了promise.channel().unsafe().register(this, promise)来进行注册,也就是unsafe的register进行注册,这个register主要是把当前channel和selector进行挂钩,但是还没有真正写入关注的事件,注册的是一个空事件

      2.根据regFuture,通过promise判断初始化和注册做完没有,如果做完了,直接调用doBind0方法,如果没有做完,加入一个侦听器,等到初始化和注册做完了之后,调用侦听器里面的方法,最终起始还是调用了doBind0方法

      2.1doBind0方法

      通过一个异步的方式来执行channel.bind方法,channel.bind其实会调用channelPipeline.bind的绑定方法,意味着这个事件开始在serverSocketChannel上面所属的pipeline上开始传播,pipeline上有3个handler-----head、serverAccept、tail,只有head是出站和入站兼有,其余两个都是入站,因为bind是出站操作(因为是服务器端接收数据后,做出的响应),所以最终由head来执行绑定方法

      $HeadContext.bind方法

      该方法调用unsafe.bind(),又跳到AbstractUnsafe方法里面的bind方法,这里要调用的是AbstractChannel的doBind(localAddress)方法,是一个抽象方法,又跳到NioServerSocketChannel里的doBind方法,里面jdk原生的bind操作,当绑定操作执行完后,又回到AbstractChannel,触发主channel的fireChannelActive()事件,channelActive是一个入站事件(所有以fire开头的都是入站事件),同样要在3个handler上面进行流转,

      $$channelActive流转

      (1)head,ctx.fireChannelActive()继续往后面传播,readIfIsAutoRead()中调用channel.read方法,又调用了pipeline.read事件,read是一个出站操作,所以只有head处理,调用read方法,又调用unsafe.beginRead()方法,又调用AbstractUnsafe.doBeginRead方法,又调用AbstractNioChannel.doBeginRead方法,才真正把当前channel关注的事件和selector进行挂钩

      到此时才算完成了bind操作,我们自己实现服务器时很简单的两行代码,但是netty底层帮我们走完了这些复杂的逻辑

selector什么时候open的

  • eventloop的构造方法中
相关标签: 网络编程 网络