kafka KafkaRequestHandlerPool类
程序员文章站
2022-07-14 14:03:41
...
KafkaRequestHandlerPool是KafkaRequestHandler的handler池,处理所有请求队列
具体的处理,会交由KafkaApis类
for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }
run方法:
def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { //获得请求 // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300)//blockqueue poll 获得请求request val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } //处理 if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) //交由apis类负责处理request apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }
上一篇: 模版方法,策略模式和状态模式之间的区别
下一篇: Ruby 常用代码