Kafka Network层解析,还是有人把它说清楚了
我们知道kafka是基于tcp连接的。其并没有像很多中间件使用netty作为tcp服务器。而是自己基于java nio写了一套。
几个重要类
先看下kafka client的网络层架构。
本文主要分析的是network层。
network层有两个重要的类:selector
和kafkachannel
。
这两个类和java nio层的java.nio.channels.selector
和channel
有点类似。
selector几个关键字段如下 // jdk nio中的selector java.nio.channels.selector nioselector; // 记录当前selector的所有连接信息 map<string, kafkachannel> channels; // 已发送完成的请求 list<send> completedsends; // 已收到的请求 list<networkreceive> completedreceives; // 还没有完全收到的请求,对上层不可见 map<kafkachannel, deque<networkreceive>> stagedreceives; // 作为client端,调用connect连接远端时返回true的连接 set<selectionkey> immediatelyconnectedkeys; // 已经完成的连接 list<string> connected; // 一次读取的最大大小 int maxreceivesize;
从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠selector
和kafkachannel
进行网络传输。在network层两端的区别并不大。
建立连接
kafka的client端启动时会调用selector#connect
(下文中如无特殊注明,均指org.apache.kafka.common.network.selector
)方法建立连接。
public void connect(string id, inetsocketaddress address, int sendbuffersize, int receivebuffersize) throws ioexception { if (this.channels.containskey(id)) throw new illegalstateexception("there is already a connection for id " + id); // 创建一个socketchannel socketchannel socketchannel = socketchannel.open(); // 设置为非阻塞模式 socketchannel.configureblocking(false); // 创建socket并设置相关属性 socket socket = socketchannel.socket(); socket.setkeepalive(true); if (sendbuffersize != selectable.use_default_buffer_size) socket.setsendbuffersize(sendbuffersize); if (receivebuffersize != selectable.use_default_buffer_size) socket.setreceivebuffersize(receivebuffersize); socket.settcpnodelay(true); boolean connected; try { // 调用socketchannel的connect方法,该方法会向远端发起tcp建连请求 // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。一般来说server和client在一台机器上,该方法可能返回true。 connected = socketchannel.connect(address); } catch (unresolvedaddressexception e) { socketchannel.close(); throw new ioexception("can't resolve address: " + address, e); } catch (ioexception e) { socketchannel.close(); throw e; } // 对connect事件进行注册 selectionkey key = socketchannel.register(nioselector, selectionkey.op_connect); kafkachannel channel; try { // 构造一个kafkachannel channel = channelbuilder.buildchannel(id, key, maxreceivesize); } catch (exception e) { ... } // 将kafkachannel绑定到selectionkey上 key.attach(channel); // 放入到map中,id是远端服务器的名称 this.channels.put(id, channel); // connectct为true代表该连接不会再触发connect事件,所以这里要单独处理 if (connected) { // op_connect won't trigger for immediately connected channels log.debug("immediately connected to node {}", channel.id()); // 加入到一个单独的集合中 immediatelyconnectedkeys.add(key); // 取消对该连接的connect事件的监听 key.interestops(0); } }
这里的流程和标准的nio流程差不多,需要单独说下的是socketchannel#connect
方法返回true的场景,该方法的注释中有提到
* <p> if this channel is in non-blocking mode then an invocation of this * method initiates a non-blocking connection operation. if the connection * is established immediately, as can happen with a local connection, then * this method returns <tt>true</tt>. otherwise this method returns * <tt>false</tt> and the connection operation must later be completed by * invoking the {@link #finishconnect finishconnect} method.
也就是说在非阻塞模式下,对于local connection
,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect
事件。因此kafka用一个单独的集合immediatelyconnectedkeys
将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。
之后会调用poll方法对网络事件监听:
public void poll(long timeout) throws ioexception { ... // select方法是对java.nio.channels.selector#select的一个简单封装 int readykeys = select(timeout); ... // 如果有就绪的事件或者immediatelyconnectedkeys非空 if (readykeys > 0 || !immediatelyconnectedkeys.isempty()) { // 对已就绪的事件进行处理,第2个参数为false pollselectionkeys(this.nioselector.selectedkeys(), false, endselect); // 对immediatelyconnectedkeys进行处理。第2个参数为true pollselectionkeys(immediatelyconnectedkeys, true, endselect); } addtocompletedreceives(); ... } private void pollselectionkeys(iterable<selectionkey> selectionkeys, boolean isimmediatelyconnected, long currenttimenanos) { iterator<selectionkey> iterator = selectionkeys.iterator(); // 遍历集合 while (iterator.hasnext()) { selectionkey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的kafkachannel kafkachannel channel = channel(key); ... try { // 如果当前处理的是immediatelyconnectedkeys集合的元素或处理的是connect事件 if (isimmediatelyconnected || key.isconnectable()) { // finishconnect中会增加read事件的监听 if (channel.finishconnect()) { this.connected.add(channel.id()); this.sensors.connectioncreated.record(); ... } else continue; } // 对于ssl的连接还有些额外的步骤 if (channel.isconnected() && !channel.ready()) channel.prepare(); // 如果是read事件 if (channel.ready() && key.isreadable() && !hasstagedreceive(channel)) { networkreceive networkreceive; while ((networkreceive = channel.read()) != null) addtostagedreceives(channel, networkreceive); } // 如果是write事件 if (channel.ready() && key.iswritable()) { send send = channel.write(); if (send != null) { this.completedsends.add(send); this.sensors.recordbytessent(channel.id(), send.size()); } } // 如果连接失效 if (!key.isvalid()) close(channel, true); } catch (exception e) { string desc = channel.socketdescription(); if (e instanceof ioexception) log.debug("connection with {} disconnected", desc, e); else log.warn("unexpected error from {}; closing connection", desc, e); close(channel, true); } finally { mayberecordtimeperconnection(channel, channelstarttimenanos); } } }
因为immediatelyconnectedkeys
中的连接不会触发connnect事件,所以在poll时会单独对immediatelyconnectedkeys
的channel调用finishconnect
方法。在明文传输模式下该方法会调用到plaintexttransportlayer#finishconnect
,其实现如下:
public boolean finishconnect() throws ioexception { // 返回true代表已经连接好了 boolean connected = socketchannel.finishconnect(); if (connected) // 取消监听connect事件,增加read事件的监听 key.interestops(key.interestops() & ~selectionkey.op_connect | selectionkey.op_read); return connected; }
关于immediatelyconnectedkeys
更详细的内容可以看看这里。
发送数据
kafka发送数据分为两个步骤:
1.调用selector#send
将要发送的数据保存在对应的kafkachannel
中,该方法并没有进行真正的网络io。
// selector#send public void send(send send) { string connectionid = send.destination(); // 如果所在的连接正在关闭中,则加入到失败集合failedsends中 if (closingchannels.containskey(connectionid)) this.failedsends.add(connectionid); else { kafkachannel channel = channelorfail(connectionid, false); try { channel.setsend(send); } catch (cancelledkeyexception e) { this.failedsends.add(connectionid); close(channel, false); } } } //kafkachannel#setsend public void setsend(send send) { // 如果还有数据没有发送出去则报错 if (this.send != null) throw new illegalstateexception("attempt to begin a send operation with prior send operation still in progress."); // 保存下来 this.send = send; // 添加对write事件的监听 this.transportlayer.addinterestops(selectionkey.op_write); } 调用selector#poll,在第一步中已经对该channel注册了write事件的监听,所以在当channel可写时,会调用到pollselectionkeys将数据真正的发送出去。 private void pollselectionkeys(iterable<selectionkey> selectionkeys, boolean isimmediatelyconnected, long currenttimenanos) { iterator<selectionkey> iterator = selectionkeys.iterator(); // 遍历集合 while (iterator.hasnext()) { selectionkey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的kafkachannel kafkachannel channel = channel(key); ... try { ... // 如果是write事件 if (channel.ready() && key.iswritable()) { // 真正的网络写 send send = channel.write(); // 一个send对象可能会被拆成几次发送,write非空代表一个send发送完成 if (send != null) { // completedsends代表已发送完成的集合 this.completedsends.add(send); this.sensors.recordbytessent(channel.id(), send.size()); } } ... } catch (exception e) { ... } finally { mayberecordtimeperconnection(channel, channelstarttimenanos); } } }
当可写时,会调用kafkachannel#write
方法,该方法中会进行真正的网络io:
public send write() throws ioexception { send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(send send) throws ioexception { // 最终调用socketchannel#write进行真正的写 send.writeto(transportlayer); if (send.completed()) // 如果写完了,则移除对write事件的监听 transportlayer.removeinterestops(selectionkey.op_write); return send.completed(); }
接收数据
如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。
public void poll(long timeout) throws ioexception { ... // select方法是对java.nio.channels.selector#select的一个简单封装 int readykeys = select(timeout); ... // 如果有就绪的事件或者immediatelyconnectedkeys非空 if (readykeys > 0 || !immediatelyconnectedkeys.isempty()) { // 对已就绪的事件进行处理,第2个参数为false pollselectionkeys(this.nioselector.selectedkeys(), false, endselect); // 对immediatelyconnectedkeys进行处理。第2个参数为true pollselectionkeys(immediatelyconnectedkeys, true, endselect); } addtocompletedreceives(); ... } private void pollselectionkeys(iterable<selectionkey> selectionkeys, boolean isimmediatelyconnected, long currenttimenanos) { iterator<selectionkey> iterator = selectionkeys.iterator(); // 遍历集合 while (iterator.hasnext()) { selectionkey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的kafkachannel kafkachannel channel = channel(key); ... try { ... // 如果是read事件 if (channel.ready() && key.isreadable() && !hasstagedreceive(channel)) { networkreceive networkreceive; // read方法会从网络中读取数据,但可能一次只能读取一个req的部分数据。只有读到一个完整的req的情况下,该方法才返回非null while ((networkreceive = channel.read()) != null) // 将读到的请求存在stagedreceives中 addtostagedreceives(channel, networkreceive); } ... } catch (exception e) { ... } finally { mayberecordtimeperconnection(channel, channelstarttimenanos); } } } private void addtostagedreceives(kafkachannel channel, networkreceive receive) { if (!stagedreceives.containskey(channel)) stagedreceives.put(channel, new arraydeque<networkreceive>()); deque<networkreceive> deque = stagedreceives.get(channel); deque.add(receive); }
在之后的addtocompletedreceives
方法中会对该集合进行处理。
private void addtocompletedreceives() { if (!this.stagedreceives.isempty()) { iterator<map.entry<kafkachannel, deque<networkreceive>>> iter = this.stagedreceives.entryset().iterator(); while (iter.hasnext()) { map.entry<kafkachannel, deque<networkreceive>> entry = iter.next(); kafkachannel channel = entry.getkey(); // 对于client端来说该ismute返回为false,server端则依靠该方法保证消息的顺序 if (!channel.ismute()) { deque<networkreceive> deque = entry.getvalue(); addtocompletedreceives(channel, deque); if (deque.isempty()) iter.remove(); } } } } private void addtocompletedreceives(kafkachannel channel, deque<networkreceive> stageddeque) { // 将每个channel的第一个networkreceive加入到completedreceives networkreceive networkreceive = stageddeque.poll(); this.completedreceives.add(networkreceive); this.sensors.recordbytesreceived(channel.id(), networkreceive.payload().limit()); }
读出数据后,会先放到stagedreceives集合中,然后在addtocompletedreceives
方法中对于每个channel都会从stagedreceives取出一个networkreceive(如果有的话),放入到completedreceives中。
这样做的原因有两点:
- 对于ssl的连接来说,其数据内容是加密的,所以不能精准的确定本次需要读取的数据大小,只能尽可能的多读,这样会导致可能会比请求的数据读的要多。那如果该channel之后没有数据可以读,会导致多读的数据将不会被处理。
- kafka需要确保一个channel上request被处理的顺序是其发送的顺序。因此对于每个channel而言,每次poll上层最多只能看见一个请求,当该请求处理完成之后,再处理其他的请求。在sever端,每次poll后都会将该channel给
mute
掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute
,即之后可以从该socket上读取数据。而client端则是通过inflightrequests#cansendmore
控制。
代码中关于这段逻辑的注释如下:
/* in the "plaintext" setting, we are using socketchannel to read & write to the network. but for the "ssl" setting, * we encrypt the data before we use socketchannel to write data to the network, and decrypt before we return the responses. * this requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted * we won't be able to read exact no.of bytes as kafka protocol requires. we read as many bytes as we can, up to sslengine's * application buffer size. this means we might be reading additional bytes than the requested size. * if there is no further data to read from socketchannel selector won't invoke that channel and we've have additional bytes * in the buffer. to overcome this issue we added "stagedreceives" map which contains per-channel deque. when we are * reading a channel we read as many responses as we can and store them into "stagedreceives" and pop one response during * the poll to add the completedreceives. if there are any active channels in the "stagedreceives" we set "timeout" to 0 * and pop response and add to the completedreceives. * atmost one entry is added to "completedreceives" for a channel in each poll. this is necessary to guarantee that * requests from a channel are processed on the broker in the order they are sent. since outstanding requests added * by socketserver to the request queue may be processed by different request handler threads, requests on each * channel must be processed one-at-a-time to guarantee ordering. */
end
本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络io的不是send方法等等。
本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q