Kafka-producer端-network层消息发送的源码探究 KafkaProducer
程序员文章站
2022-05-11 21:59:46
...
前面我们对应用层的一些原理进行了特别的源码跟踪分析,今天我们就深入到网络层,看下producer端的sender线程是如何把一个消息发送到server端的。
Sender实现了Runnable接口,最后被一个IOThread启动,核心的逻辑是在一个void run(long now)方法中去实现的,具体的代码解释之前的博客中有,这里就不一一介绍了,这里种点介绍发送的最后两个流程,即send和poll,具体的代码行为
此处的client对象为NetworkClient,实现的是KafkaClient,我们先来看下KafkaClient定义的几个核心的接口
send主要是进行ClientRequest的发送的,poll是做具体的IO的
接下来我们看NetworkClient的实现
send方法主要做了三件事情:1。检查node是否ready;2。把当前请求添加到队列中;3。调用Selector接口的send方法进行发送;此处可以发现,KafkaClient中的发送并不是具体的send实现,具体的send还是依赖的是network层的Selector实现的。
那我们再来看下Selector的结构,Selector实现了Selectable,Selectable定义的核心方法摘要如下
可以发现定义的方法和NetworkClient的方法是一一对应的,也就是说他们是不同分层中对同一个方法的实现,而且很明显的是KafkaClient是依赖于Selectable的实现的;
那我们再来看下Selectable的实现类Selector,看下对send方法的实现:
其实就是获取了当前请求的Node的通道-Channel,然后把Channel中的Send设置一个值,并没有实际的IO操作,如建联,读写,释放等,这让我们想到了接口中定义的另外一个方法,没错,就是poll,poll方法才是做实际的IO操作的,和接口中申明的方法一致。
再回到NetworkClient的poll方法中
主要也是做了三件事。1.更新metadata信息;2.调用Selector的send方法进行IO;3.对已经完成的请求进行处理;4. 调用回调函数
看到这里终于看到了metadata相关的实际更新操作,之前在看的时候一只没看到和metadata相关的具体的读写是在哪儿进行的,终于在这里发现了 。
接下来我们还是关注Selector的send方法,先上代码:
首先获取准备好的事件
这里主要调用的NIO的Selector的select方法
如果有准备好的事件
在看下具体的pollSelectionKeys方法
这里边做的操作就是传统的NIO对SocketChannel的操作模型,获取key类型,然后获取通道,针对不同的key类型做不同的逻辑处理。
但是请注意:此时,我们还没有将数据推到网络中,看代码
此时我们只是将数据写入到KafkaChannel中,在看KafkaChannel中的处理
关键看这句
调用send方法把数据写入到TransportLayer层(传输层)
底层还是调用了Channel的write方法,Kafka实现了自己的GatheringByteChannel,最后调用的还是PlaintextTransportLayer的write方法,看下PlaintextTransportLayer的数据结构,先看下构造方法
可以看到实际调用的就是NIO的SocketChannel的write方法,实现数据从内存到网卡的写入。
至此一个写的分析就基本完成的。
建连和读取基本类似,这里就不扒代码了。
Sender实现了Runnable接口,最后被一个IOThread启动,核心的逻辑是在一个void run(long now)方法中去实现的,具体的代码解释之前的博客中有,这里就不一一介绍了,这里种点介绍发送的最后两个流程,即send和poll,具体的代码行为
for (ClientRequest request : requests) client.send(request, now); this.client.poll(pollTimeout, now);
此处的client对象为NetworkClient,实现的是KafkaClient,我们先来看下KafkaClient定义的几个核心的接口
public void send(ClientRequest request, long now); public List<ClientResponse> poll(long timeout, long now);
send主要是进行ClientRequest的发送的,poll是做具体的IO的
接下来我们看NetworkClient的实现
@Override public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); selector.send(request.request()); }
send方法主要做了三件事情:1。检查node是否ready;2。把当前请求添加到队列中;3。调用Selector接口的send方法进行发送;此处可以发现,KafkaClient中的发送并不是具体的send实现,具体的send还是依赖的是network层的Selector实现的。
那我们再来看下Selector的结构,Selector实现了Selectable,Selectable定义的核心方法摘要如下
/** * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls * @param send The request to send */ public void send(Send send); /** * Do I/O. Reads, writes, connection establishment, etc. * @param timeout The amount of time to block if there is nothing to do * @throws IOException */ public void poll(long timeout) throws IOException;
可以发现定义的方法和NetworkClient的方法是一一对应的,也就是说他们是不同分层中对同一个方法的实现,而且很明显的是KafkaClient是依赖于Selectable的实现的;
那我们再来看下Selectable的实现类Selector,看下对send方法的实现:
public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); try { channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }
其实就是获取了当前请求的Node的通道-Channel,然后把Channel中的Send设置一个值,并没有实际的IO操作,如建联,读写,释放等,这让我们想到了接口中定义的另外一个方法,没错,就是poll,poll方法才是做实际的IO操作的,和接口中申明的方法一致。
再回到NetworkClient的poll方法中
@Override public List<ClientResponse> poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { try { response.request().callback().onComplete(response); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } return responses; }
主要也是做了三件事。1.更新metadata信息;2.调用Selector的send方法进行IO;3.对已经完成的请求进行处理;4. 调用回调函数
看到这里终于看到了metadata相关的实际更新操作,之前在看的时候一只没看到和metadata相关的具体的读写是在哪儿进行的,终于在这里发现了 。
接下来我们还是关注Selector的send方法,先上代码:
@Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); } addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); maybeCloseOldestConnection(); }
首先获取准备好的事件
int readyKeys = select(timeout);
这里主要调用的NIO的Selector的select方法
如果有准备好的事件
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); }
在看下具体的pollSelectionKeys方法
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); lruConnections.put(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
这里边做的操作就是传统的NIO对SocketChannel的操作模型,获取key类型,然后获取通道,针对不同的key类型做不同的逻辑处理。
但是请注意:此时,我们还没有将数据推到网络中,看代码
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } }
此时我们只是将数据写入到KafkaChannel中,在看KafkaChannel中的处理
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
关键看这句
send.writeTo(transportLayer);
调用send方法把数据写入到TransportLayer层(传输层)
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel. // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than // GatheringByteChannel or ScatteringByteChannel. if (channel instanceof TransportLayer) pending = ((TransportLayer) channel).hasPendingWrites(); return written; }
底层还是调用了Channel的write方法,Kafka实现了自己的GatheringByteChannel,最后调用的还是PlaintextTransportLayer的write方法,看下PlaintextTransportLayer的数据结构,先看下构造方法
public PlaintextTransportLayer(SelectionKey key) throws IOException { this.key = key; this.socketChannel = (SocketChannel) key.channel(); } ... @Override public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); }
可以看到实际调用的就是NIO的SocketChannel的write方法,实现数据从内存到网卡的写入。
至此一个写的分析就基本完成的。
建连和读取基本类似,这里就不扒代码了。
上一篇: weichat 微信开发之接受普通消息
下一篇: kafka producer服务端