kafka producer服务端
producer服务端:
1.nio接受请求 http://blackproof.iteye.com/blog/2239949
2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953
3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:
def handle(request: RequestChannel.Request) { try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里
appendToLocalLog是主要的处理类
3.2 按照客户produer设置的ack级别,处理如何返回客户端
0,不做任何返回,直接wake处理之后的请求
1,获取leader的result,并返回
-1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息
如果未满足且没有错误信息,则设置watcher
如果超时则放入到delay操作的队列中
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { ...... val sTime = SystemTime.milliseconds //将数据插入到本地log(默认本地为leader) val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) //获得结果,是否有错误信息(throw error) val numPartitionsInError = localProduceResults.count(_.error.isDefined) if(produceRequest.requiredAcks == 0) { //当acks基本为0,则无需任务响应,直接返回执行成功 // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since // no response is expected by the producer the handler will send a close connection response to the socket server // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata if (numPartitionsInError != 0) { info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) } else { if (firstErrorCode == ErrorMapping.NoError) offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) //offset,producer两种请求 if (offsetCommitRequestOpt.isDefined) { val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else requestChannel.noOperation(request.processor, request) } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { //需要leader确认请求,才返回执行成功 if (firstErrorCode == ErrorMapping.NoError) { //offsetsCache 更新offsetmanager的offset内存 offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) } val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) //返回处理之后的response,包含produceresult信息ProducerResponseStatus requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else { //需要所有replica都受到请求,才返回成功 // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, produceRequest.ackTimeoutMs.toLong, produceRequest, statuses, offsetCommitRequestOpt) //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能) // add the produce request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) if (satisfiedByMe) producerRequestPurgatory.respond(delayedRequest) } }appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicAndPartition, brokerId)) }
3.1
appendMessagesToLeader
...... //检查isr之后,进行真正往log里写的方法 val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应 replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较, maybeIncrementLeaderHW(leaderReplica) ......
3.1.1.log.append方法:
检查是否需要segment生成新文件,数据入segment,更新lastoffset
// maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush val segment = maybeRoll(validMessages.sizeInBytes) // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里; // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的) segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1)
segment.append方法:
数据入file channel流里,判断是否如index中
def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引 index.append(offset, log.sizeInBytes()) this.bytesSinceLastIndexEntry = 0 } // append the messages log.append(messages) this.bytesSinceLastIndexEntry += messages.sizeInBytes } }
3.1.2 unblockDelayedFetchRequests
检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
def unblockDelayedFetchRequests(key: TopicAndPartition) { val satisfied = fetchRequestPurgatory.update(key) debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size)) // send any newly unblocked responses satisfied.foreach(fetchRequestPurgatory.respond(_)) }
3.1.3 maybeIncrementLeaderHW
private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) //message offset相减,获得最小的offset(最迟更新的) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed val requestKey = new TopicAndPartition(this.topic, this.partitionId) replicaManager.unblockDelayedFetchRequests(requestKey) replicaManager.unblockDelayedProduceRequests(requestKey) } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } }
下一篇: 不要把专业做成流氓 RubyOS
推荐阅读
-
kafka应用实例(kafka简单的入门案例)
-
tensorflow使用range_input_producer多线程读取数据实例
-
服务端 VBScript 与 JScript 几个相同特性的写法与示例
-
vue服务端渲染页面缓存和组件缓存的实例详解
-
突袭HTML5之Javascript API扩展5—其他扩展(应用缓存/服务端消息/桌面通知)
-
spring boot 与kafka集成的示例代码
-
C#使用PHP服务端的Web Service通信实例
-
oracle查看字符集后修改oracle服务端和客户端字符集的步骤
-
ASP.NET MVC传送参数至服务端详解及实例
-
kafka应用实例(kafka简单的入门案例)