欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka SocketServer类

程序员文章站 2022-07-14 14:04:29
...

 

 

SocketServer是kafka nio,包含一个accept线程,接受socket连接,并把连接(平均)放入processors中,多个processor线程接受nio的处理请求和相应

processor请求只是将request放入requestchannel queue中(由KafkaRequestHandlerPool中handler完成)

processor响应是在requestchannel上注册对应的processor,processor将response发送给client

 

/**
   * Start the socket server
   */
  def startup() {
    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    for(i <- 0 until numProcessorThreads) {
      processors(i) = new Processor(i, 
                                    time, 
                                    maxRequestSize, 
                                    aggregateIdleMeter,
                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                    numProcessorThreads, 
                                    requestChannel,
                                    quotas,
                                    connectionsMaxIdleMs)
      //processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write
      Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
    }

    newGauge("ResponsesBeingSent", new Gauge[Int] {
      def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
    })

    // register the processor threads for notification of responses
    // 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id
    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
   
    // start accepting connections
    //接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应
    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
    Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
    acceptor.awaitStartup
    info("Started")
  }

 

获得请求,放入queue中,等待handler处理

  /*
   * Process reads from ready sockets
   */
  def read(key: SelectionKey) {
    lruConnections.put(key, currentTimeNanos)
    val socketChannel = channelFor(key)
    var receive = key.attachment.asInstanceOf[Receive]
    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }
    val read = receive.readFrom(socketChannel)
    val address = socketChannel.socket.getRemoteSocketAddress();
    trace(read + " bytes read from " + address)
    if(read < 0) {
      close(key)
    } else if(receive.complete) {
      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)//放入queue中,等待handler处理
      key.attach(null)
      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {
      // more reading to be done
      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
      wakeup()
    }
  }

 

 响应请求

  /*
   * Process writes to ready sockets
   */
  def write(key: SelectionKey) {
    val socketChannel = channelFor(key)
    val response = key.attachment().asInstanceOf[RequestChannel.Response]
    val responseSend = response.responseSend
    if(responseSend == null)
      throw new IllegalStateException("Registered for write interest but no response attached to key.")
    val written = responseSend.writeTo(socketChannel)//将response发送相应给client
    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
    if(responseSend.complete) {
      response.request.updateRequestMetrics()
      key.attach(null)
      trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
    } else {
      trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_WRITE)
      wakeup()
    }
  }