kafka SocketServer类
程序员文章站
2022-07-14 14:04:29
...
SocketServer是kafka nio,包含一个accept线程,接受socket连接,并把连接(平均)放入processors中,多个processor线程接受nio的处理请求和相应
processor请求只是将request放入requestchannel queue中(由KafkaRequestHandlerPool中handler完成)
processor响应是在requestchannel上注册对应的processor,processor将response发送给client
/** * Start the socket server */ def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) //processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses // 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections //接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应 this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") }
获得请求,放入queue中,等待handler处理
/* * Process reads from ready sockets */ def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req)//放入queue中,等待handler处理 key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } }
响应请求
/* * Process writes to ready sockets */ def write(key: SelectionKey) { val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel)//将response发送相应给client trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } }