欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka 获取metadata

程序员文章站 2022-07-14 14:04:47
...

问题:

<Failed to update metadata after 3000 ms.>

 

sender类的发送数据时候,会

List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据

 

NetworkClient类,方法poll,检查metadata是否需要更新

方法:

    /**
     * Add a metadata request to the list of sends if we can make one
     */
    private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
        // Beware that the behavior of this method and the computation of timeouts for poll() are
        // highly dependent on the behavior of leastLoadedNode.
        // 最新的可用node
        Node node = this.leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            // mark the timestamp for no node available to connect
            this.lastNoNodeAvailableMs = now;
            return;
        }

        log.debug("Trying to send metadata request to node {}", node.id());
        if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
            Set<String> topics = metadata.topics();
            this.metadataFetchInProgress = true;
            //生成metadata请求,加入到sends队列中
            ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
            sends.add(metadataRequest.request());
            this.inFlightRequests.add(metadataRequest);
        } else if (connectionStates.canConnect(node.id(), now)) {
            // we don't have a connection to this node right now, make one
            log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
            initiateConnect(node, now);
            // If initiateConnect failed immediately, this node will be put into blackout and we
            // should allow immediately retrying in case there is another candidate node. If it
            // is still connecting, the worst case is that we end up setting a longer timeout
            // on the next round and then wait for the response.
        } else { // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network tevent to let us know the seleced
            // connection might be usable again.
            this.lastNoNodeAvailableMs = now;
        }
    }

 

    

选择一个请求最少,并且链接状态可用的host,作为获取metadata的host

这里

 

/**
     * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
     * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
     * connection if all existing connections are in use. This method will never choose a node for which there is no
     * existing connection and from which we have disconnected within the reconnect backoff period.
     * @return The node with the fewest in-flight requests.
     */
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadata.fetch().nodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        for (int i = 0; i < nodes.size(); i++) {
            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
            if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
                // if we find an established connection with no in-flight requests we can stop right away
                return node;
            } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) {
                // otherwise if this is the best we have found so far, record that
                inflight = currInflight;
                found = node;
            }
        }

        return found;
    }

 

 

 

 

相关标签: kafka metadata