欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka producer服务端

程序员文章站 2022-07-14 14:05:18
...

 

producer服务端:

1.nio接受请求 http://blackproof.iteye.com/blog/2239949

 

2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953

 

3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:

  def handle(request: RequestChannel.Request) {
    try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

 

 3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里

         appendToLocalLog是主要的处理类

 3.2 按照客户produer设置的ack级别,处理如何返回客户端

           0,不做任何返回,直接wake处理之后的请求

           1,获取leader的result,并返回

           -1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息

                   如果未满足且没有错误信息,则设置watcher

                   如果超时则放入到delay操作的队列中

 

  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    ......
    val sTime = SystemTime.milliseconds
    //将数据插入到本地log(默认本地为leader)
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false
    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))

    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)

    //获得结果,是否有错误信息(throw error)
    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
    if(produceRequest.requiredAcks == 0) {
      //当acks基本为0,则无需任务响应,直接返回执行成功
      // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
      // no response is expected by the producer the handler will send a close connection response to the socket server
      // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
      if (numPartitionsInError != 0) {
        info(("Send the close connection response due to error handling produce request " +
          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
        requestChannel.closeConnection(request.processor, request)
      } else {

        if (firstErrorCode == ErrorMapping.NoError)
          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))

        //offset,producer两种请求
        if (offsetCommitRequestOpt.isDefined) {
          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
        } else
          requestChannel.noOperation(request.processor, request)
      }
    } else if (produceRequest.requiredAcks == 1 ||
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {
      //需要leader确认请求,才返回执行成功

      if (firstErrorCode == ErrorMapping.NoError) {
        //offsetsCache 更新offsetmanager的offset内存
        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
      }

      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
      //返回处理之后的response,包含produceresult信息ProducerResponseStatus
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    } else {
      //需要所有replica都受到请求,才返回成功
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(
        producerRequestKeys,
        request,
        produceRequest.ackTimeoutMs.toLong,
        produceRequest,
        statuses,
        offsetCommitRequestOpt)

      //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能)
      // add the produce request for watch if it's not satisfied, otherwise send the response back
      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
      if (satisfiedByMe)
        producerRequestPurgatory.respond(delayedRequest)
    }
  }
 appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法

 

 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
        val info = partitionOpt match {
          case Some(partition) =>
            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader
          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicAndPartition, brokerId))
        }

 3.1

appendMessagesToLeader

          ...... 
         //检查isr之后,进行真正往log里写的方法
          val info = log.append(messages, assignOffsets = true)
          // probably unblock some follower fetch requests since log end offset has been updated
          // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较,
          maybeIncrementLeaderHW(leaderReplica)
         ...... 

3.1.1.log.append方法:

检查是否需要segment生成新文件,数据入segment,更新lastoffset

        // maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush
        val segment = maybeRoll(validMessages.sizeInBytes)

        // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里;
        // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的)
        segment.append(appendInfo.firstOffset, validMessages)

        // increment the log end offset
        updateLogEndOffset(appendInfo.lastOffset + 1)

 segment.append方法:

数据入file channel流里,判断是否如index中

  def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

 3.1.2 unblockDelayedFetchRequests

检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应

  def unblockDelayedFetchRequests(key: TopicAndPartition) {
    val satisfied = fetchRequestPurgatory.update(key)
    debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))

    // send any newly unblocked responses
    satisfied.foreach(fetchRequestPurgatory.respond(_))
  }

 3.1.3 maybeIncrementLeaderHW

  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
    //message offset相减,获得最小的offset(最迟更新的)
    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
    val oldHighWatermark = leaderReplica.highWatermark
    if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark
      leaderReplica.highWatermark = newHighWatermark
      debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
      // some delayed requests may be unblocked after HW changed
      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
      replicaManager.unblockDelayedFetchRequests(requestKey)
      replicaManager.unblockDelayedProduceRequests(requestKey)
    } else {
      debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
        .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
    }
  }