欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka-producer端-network层消息发送的源码探究 KafkaProducer 

程序员文章站 2022-05-11 21:59:46
...
前面我们对应用层的一些原理进行了特别的源码跟踪分析,今天我们就深入到网络层,看下producer端的sender线程是如何把一个消息发送到server端的。

    Sender实现了Runnable接口,最后被一个IOThread启动,核心的逻辑是在一个void run(long now)方法中去实现的,具体的代码解释之前的博客中有,这里就不一一介绍了,这里种点介绍发送的最后两个流程,即send和poll,具体的代码行为
for (ClientRequest request : requests)
     client.send(request, now);
this.client.poll(pollTimeout, now);

此处的client对象为NetworkClient,实现的是KafkaClient,我们先来看下KafkaClient定义的几个核心的接口
public void send(ClientRequest request, long now);
public List<ClientResponse> poll(long timeout, long now);

send主要是进行ClientRequest的发送的,poll是做具体的IO的
接下来我们看NetworkClient的实现
@Override
public void send(ClientRequest request, long now) {
    String nodeId = request.request().destination();
    if (!canSendRequest(nodeId))
        throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
    request.setSendTimeMs(now);
    this.inFlightRequests.add(request);
     selector.send(request.request());
}

send方法主要做了三件事情:1。检查node是否ready;2。把当前请求添加到队列中;3。调用Selector接口的send方法进行发送;此处可以发现,KafkaClient中的发送并不是具体的send实现,具体的send还是依赖的是network层的Selector实现的。
    那我们再来看下Selector的结构,Selector实现了Selectable,Selectable定义的核心方法摘要如下
/**
     * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
     * @param send The request to send
     */
    public void send(Send send);

    /**
     * Do I/O. Reads, writes, connection establishment, etc.
     * @param timeout The amount of time to block if there is nothing to do
     * @throws IOException
     */
    public void poll(long timeout) throws IOException;

可以发现定义的方法和NetworkClient的方法是一一对应的,也就是说他们是不同分层中对同一个方法的实现,而且很明显的是KafkaClient是依赖于Selectable的实现的;
那我们再来看下Selectable的实现类Selector,看下对send方法的实现:
public void send(Send send) {
    KafkaChannel channel = channelOrFail(send.destination());
    try {
        channel.setSend(send);
     } catch (CancelledKeyException e) {
        this.failedSends.add(send.destination());
        close(channel);
     }
}

其实就是获取了当前请求的Node的通道-Channel,然后把Channel中的Send设置一个值,并没有实际的IO操作,如建联,读写,释放等,这让我们想到了接口中定义的另外一个方法,没错,就是poll,poll方法才是做实际的IO操作的,和接口中申明的方法一致。
再回到NetworkClient的poll方法中
@Override
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

主要也是做了三件事。1.更新metadata信息;2.调用Selector的send方法进行IO;3.对已经完成的请求进行处理;4. 调用回调函数
看到这里终于看到了metadata相关的实际更新操作,之前在看的时候一只没看到和metadata相关的具体的读写是在哪儿进行的,终于在这里发现了Kafka-producer端-network层消息发送的源码探究
            
    
    
        KafkaProducer  Kafka-producer端-network层消息发送的源码探究
            
    
    
        KafkaProducer  Kafka-producer端-network层消息发送的源码探究
            
    
    
        KafkaProducer 
接下来我们还是关注Selector的send方法,先上代码:
@Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }


首先获取准备好的事件
int readyKeys = select(timeout);

这里主要调用的NIO的Selector的select方法
如果有准备好的事件
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
}

在看下具体的pollSelectionKeys方法
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }

                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }


这里边做的操作就是传统的NIO对SocketChannel的操作模型,获取key类型,然后获取通道,针对不同的key类型做不同的逻辑处理。
但是请注意:此时,我们还没有将数据推到网络中,看代码
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
   Send send = channel.write();
   if (send != null) {
       this.completedSends.add(send);
       this.sensors.recordBytesSent(channel.id(), send.size());
   }
}

此时我们只是将数据写入到KafkaChannel中,在看KafkaChannel中的处理
public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }


关键看这句
        send.writeTo(transportLayer);

调用send方法把数据写入到TransportLayer层(传输层)
@Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
        // GatheringByteChannel or ScatteringByteChannel.
        if (channel instanceof TransportLayer)
            pending = ((TransportLayer) channel).hasPendingWrites();

        return written;
    }

底层还是调用了Channel的write方法,Kafka实现了自己的GatheringByteChannel,最后调用的还是PlaintextTransportLayer的write方法,看下PlaintextTransportLayer的数据结构,先看下构造方法
public PlaintextTransportLayer(SelectionKey key) throws IOException {
        this.key = key;
        this.socketChannel = (SocketChannel) key.channel();
    }
...
@Override
    public int write(ByteBuffer src) throws IOException {
        return socketChannel.write(src);
    }

可以看到实际调用的就是NIO的SocketChannel的write方法,实现数据从内存到网卡的写入。
至此一个写的分析就基本完成的。
    建连和读取基本类似,这里就不扒代码了。
相关标签: Kafka Producer