Kafka源码分析 Producer客户端
本节从新的Producer API示例开始,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; public Producer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } ); } else { // Send synchronously producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); } ++messageNo; } } } |
- KafkaProducer需要指定消息Key,Value的类型<Integer,String>,ProducerRecord还需要指定topic
- 根据配置文件创建KafkaProducer,指定了Broker地址,Key,Value的序列化方式,消息必须要指定topic
- 发送消息的返回结果RecordMetadata记录元数据包括了消息的offset(在哪个partition的哪里offset)
blocking vs non-blocking
- blocking:在调用send返回Future时,立即调用get,因为Future.get在没有返回结果时会一直阻塞
- non-block:提供一个callback,调用send后,可以继续发送消息而不用等待。当有结果返回时,callback会被自动通知执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); // 序列化key和value byte[] serializedKey= keySerializer.serialize(record.topic(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value()); // 选择这条消息的Partition int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); TopicPartition tp = new TopicPartition(record.topic(), partition); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs); // 在每次追加一条消息到收集器之后,都要判断是否满了.如果满了,就执行一次Sender操作,通知Sender将这批数据发送到Kafka if (result.batchIsFull || result.newBatchCreated) this.sender.wakeup(); return result.future; } |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic test
1 2 3 4 5 6 7 |
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; } |
正常来说每个Partition都是有Leader Partition的。如果Partition没有Leader的话,说明这个Partition就是有问题的。
1 2 3 4 5 6 |
List<PartitionInfo> availablePartitions = new ArrayList<>(); for (PartitionInfo part : partitionList) { if (part.leader() != null) availablePartitions.add(part); } this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions)); |
消息有key的话,对key进行hash,然后和partitions数量取模,类似于round-robin的方式来确定key所在的partition达到负载均衡。如果消息没有key, 会根据递增的counter的值确定partition,count不断递增,确保消息不会都发到同一个partition里。
问题:写入消息时是写到Leader Partition的话,下面的代码如何体现Leader?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 这个topic所有的partitions. 用来负载均衡, 即Leader Partition不要都分布在同一台机器上 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); // 这个topic可以使用的partitions: availablePartitionsByTopic List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } |
注意:replicas并不是一个PartitionInfo对象,它们仅仅是某个Partition编号对应的PartitionInfo的replicas信息。即partitionsForTopic和availablePartitionsForTopic里面其实是没有follower replics的。因为如果Replicas都算作PartitionInfo的话,则Partition编号就不好表示了(4个Partition,每个Partition由3个副本)。
实际上在选择Partition的时候,根本就先不要考虑replicas的存在,就只有Partition编号。每个Partition是分布在不同的节点上的(可以把这个Partition就认为是Leader Partition)。然后在写消息的时候采用round-robin方式将消息平均负载到每一个Partition上。假设第一条消息写到了topic1-part1,则下一条消息就写到topic1-part2,以此类推。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) { Deque<RecordBatch> dq = dequeFor(tp); synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); // 有旧的batch, 并且能往这个batch继续追加消息 if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // 队列为空(没有一个RecordBatch,last=null), 或者新的RecordBatch为空(旧的Batch没有空间了,future=null), 则新分配一个Batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // 内存的ByteBuffer, 追加新消息时,会最终写到这个ByteBuffer中 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } |
1 2 3 4 5 6 7 8 9 10 |
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) { boolean roomEnough = this.records.hasRoomFor(key, value) if(!roomEnough) return null; this.records.append(0L, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<Node>(); // batches: 每个TopicPartition都对应了一个双端队列 for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); // 找出这个TopicPartition的Leader节点, 在正式开始发送消息时, 会先建立到这些节点的连接 Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { // 加入到等待连接的节点中. readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>(); for (Node node : nodes) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // 节点上所有的Partition List<RecordBatch> ready = new ArrayList<RecordBatch>(); // 用来保存这个节点的Batch int start = drainIndex = drainIndex % parts.size(); // 为了不被饿死,start并不是从0开始. 初始时,start=drainIndex do { PartitionInfo part = parts.get(drainIndex); Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition())); if (deque != null) { // 并不是所有的Partition都有队列的 synchronized (deque) { // 队列不是线程安全的,需要同步块 RecordBatch first = deque.peekFirst(); // Batch加入到队列的时候是加到尾部, 拉取Batch时则从头部, 所以叫做双端队列嘛 if (first != null) { RecordBatch batch = deque.pollFirst(); // 上面并没有把Batch从队列中删除, 如果这个Batch真的可以被消费,才真正删除(在first后做了一些判断,这里省略了) batch.records.close(); // 释放内存 ready.add(batch); // 添加到待发送列表中 } } } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); // 直到遍历完这个节点所有的Partition,说明这个节点不会有其他的Partition了,可以放心地退出循环 batches.put(node.id(), ready); // Batch是以Node为级别的.表示这个Node可以接受一批的RecordBatch. 因为每个RecordBatch的Partition都是无序的. } return batches; } |
1 2 3 4 5 6 7 8 9 |
public void run() { while (running) { run(time.milliseconds()); } while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { run(time.milliseconds()); } this.client.close(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public void run(long now) { Cluster cluster = metadata.fetch(); // ① get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // ② remove any nodes we aren't ready to send to 建立到Leader的Socket连接 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // ③ create produce requests 之前加入到了accumulator收集器中, 现在从收集器获取出最开始放入的消息 Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // ④ Transfer the record batches into a list of produce requests on a per-node basis 以节点为级别的生产请求列表. 即每个节点只有一个ClientRequest List<ClientRequest> requests = createProduceRequests(batches, now); long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); // ⑤ Queue up the given request for sending. Requests can only be sent out to ready nodes. 从注释中可以看出这是一个入队列的操作 for (ClientRequest request : requests) client.send(request, now); // ⑥ Do actual reads and writes to sockets. 这里才是真正的读写操作 this.client.poll(pollTimeout, now); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; // 每个RecordBatch都有唯一的TopicPartition produceRecordsByPartition.put(tp, batch.records.buffer()); // RecordBatch的records是MemoryRecords,底层是ByteBuffer recordsByPartition.put(tp, batch); } // 构造生产者的请求(每个Partition都有生产记录), 并指定目标节点,请求头和请求内容, 转换为发送请求对象 ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); // 回调函数会作为客户端请求的一个成员变量, 当客户端请求完成后, 会触发回调函数的执行! RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; return new ClientRequest(now, acks != 0, send, callback); } |
ClientRequest & ClientResponse & Callback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// A request being sent to the server. This holds both the network send as well as the client-level metadata. public final class ClientRequest { private final long createdTimeMs; private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; private final boolean isInitiatedByNetworkClient; private long sendTimeMs; } // A response from the server. Contains both the body of the response as well as the correlated request that was originally sent. public class ClientResponse { private final long receivedTimeMs; private final boolean disconnected; private final ClientRequest request; private final Struct responseBody; } |
1 2 3 4 5 6 7 8 9 10 11 |
public List<ClientResponse> poll(long timeout, long now) { // .....真正的读写操作, 会生成responses // invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { response.request().callback().onComplete(response); } } return responses; } |
1 2 |
for (ClientRequest request : requests) client.send(request, now); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { if (response.hasResponse()) { // if we have a response, parse it ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); // 每一个TopicPartition都对应一个PartitionResponse ProduceResponse.PartitionResponse partResp = entry.getValue(); Errors error = Errors.forCode(partResp.errorCode); RecordBatch batch = batches.get(tp); // 因为batches中对一个Partition只会有一个RecordBatch completeBatch(batch, error, partResp.baseOffset, correlationId, now); // 完成这个RecordBatch, 调用RecordBatch.done } } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NONE, -1L, correlationId, now); } } private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { batch.done(baseOffset, error.exception()); // tell the user the result of their request this.accumulator.deallocate(batch); // release resource include remove from incomplete and deallocate batch's records memory } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void done(long baseOffset, RuntimeException exception) { // execute callbacks for (int i = 0; i < this.thunks.size(); i++) { Thunk thunk = this.thunks.get(i); if (exception == null) { RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset()); thunk.callback.onCompletion(metadata, null); } else { thunk.callback.onCompletion(null, exception); } } this.produceFuture.done(topicPartition, baseOffset, exception); } |
1 2 3 4 5 |
new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } |
1 2 3 4 5 6 7 8 9 10 |
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); // 这是客户端,所以返回的是Socket socketChannel.connect(address); // 连接服务端, 注意这里并没有开始真正连接, 或者说因为是非阻塞方式, 是发起一个连接 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); // 连接事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); // 会创建包括底层的transportLayer等. key.attach(channel); // 将KafkaChannel注册到SelectionKey this.channels.put(id, channel); // Selector维护了每个nodeConnectionId以及KafkaChannel } |
KafkaChannel finishConnect
1 2 3 4 |
public void finishConnect() throws IOException { socketChannel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); } |
1 2 3 4 5 6 7 |
public void send(Send send) { // NetworkClient的send方法发送的是ClientRequest的RequestSend KafkaChannel channel = channelOrFail(send.destination()); channel.setSend(send); // 设置当前发送的Send请求是KafkaChannel要处理的请求 } private KafkaChannel channelOrFail(String id) { return this.channels.get(id); } |
虽然一个KafkaChannel一次只能处理一个Send请求,每次Send时都要添加WRITE事件,当Send发送成功后,就要取消掉WRITE。下一个Send请求事件进来时,继续添加WRITE,然后在请求发送成功后,又取消WRITE。因为KafkaChannel是由请求事件驱动的,如果没有请求就不需要监听WRITE,KafkaChannel就不需要做写操作。基本流程就是:开始发送一个Send请求->注册OP_WRITE-> 发送请求… ->Send请求发送完成->取消OP_WRITE
1 2 3 4 5 6 7 8 9 10 |
public void setSend(Send send) { // 如果存在send请求,说明之前的Send请求还没有发送完毕,新的请求不能进来! 什么时候send=null: 请求发送完毕时 if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // 这是KafkaChannel的transportLayer的方法, transportLayer的key来自于buildChannel的SelectionKey public void addInterestOps(int ops) { key.interestOps(key.interestOps() | ops); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public void poll(long timeout) throws IOException { clear(); if (hasStagedReceives()) timeout = 0; int readyKeys = select(timeout); //选择器,触发立即调用,或者定时调用 if (readyKeys > 0) { Set<SelectionKey> keys = this.nioSelector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); KafkaChannel channel = channel(key); //获得绑定到SelectionKey的通道 /* complete any connections that have finished their handshake */ if (key.isConnectable()) channel.finishConnect(); /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) this.completedSends.add(send); } /* cancel any defunct sockets */ if (!key.isValid()) close(channel); } } addToCompletedReceives(); // 没有新的SelectionKey了! 说明要读取的已经都读取完了. //不过Selector.poll<-NetworkClient.poll<-Sender.run是在这里循环的,每隔一段时间就会poll一次! } |
KafkaChannel write
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { //如果send方法返回值为false,表示send.completed=false,即这个请求还没有发送成功 result = send; send = null; //发送完毕后,设置send=null,这样下一个请求判断到send=null,就可以将新的Send设置为KafkaChannel的当前send } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); //transportLayer有SocketChannel,所以是真正发生写的地方 if (send.completed()) //只有Send请求全部写出去了,才对transportLayer取消WRITE事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); //如果Send只发送了一点点,则SelectionKey还会监听到Writable事件的 } |
KafkaChannel read
第二次读取时,receive!=null,继续从transportLayer读取到receive对象中,这次成功地读完了,设置result为读取成功的receive NetworkReceive,这个result不为空,while循环结束,调用addToStagedReceives。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) receive = new NetworkReceive(maxReceiveSize, id); receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; } private long receive(NetworkReceive receive) throws IOException { return receive.readFrom(transportLayer); } |
complte sends and receives
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// 一次读操作就会有一个NetworkReceive生成,并加入到channel对应的队列中 private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); Deque<NetworkReceive> deque = stagedReceives.get(channel); deque.add(receive); } // 只有在本次轮询中没有读操作了(也没有写了), 在退出轮询时, 将上一步的所有NetworkReceive加到completedReceives private void addToCompletedReceives() { if (this.stagedReceives.size() > 0) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { // 当前通道上没有OP_READ事件时 Deque<NetworkReceive> deque = entry.getValue(); NetworkReceive networkReceive = deque.poll(); this.completedReceives.add(networkReceive); if (deque.size() == 0) iter.remove(); } } } } |
1 2 3 |
public boolean isMute() { return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private void doSend(ClientRequest request, long now) { this.inFlightRequests.add(request); //还没开始真正发送,先加入到队列中 selector.send(request.request()); //标记下收到的是Send请求. } public List<ClientResponse> poll(long timeout, long now) { // ① Selector轮询, 真正读写发生的地方. 如果客户端请求被完整地处理过了, 会加入到completeSends或complteReceives中 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); // ② process completed actions 处理已经完成的动作,如果没有收到完整的请求,则不会被加入到completeXXX中 List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); //完成发送的handler handleCompletedReceives(responses, updatedNow); //完成接收的handler handleDisconnections(responses, updatedNow); //断开连接的handler handleConnections(); //处理连接的handler handleTimedOutRequests(responses, updatedNow); //超时请求的handler // ③ invoke callbacks 将responses用于触发回调函数的调用 return responses; } |
Sender的run中,在drain produce requests前会先判断readyNodes是否已经准备好了,因为不会发送请求给没有准备好的节点。
1 2 3 4 5 6 7 8 9 |
Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Begin connecting to the given node, return true if we are already connected and ready to send to that node. public boolean ready(Node node, long now) { if (isReady(node, now)) return true; if (connectionStates.canConnect(node.idString(), now)) initiateConnect(node, now); // if we are interested in sending to a node and we don't have a connection to it, initiate one return false; } // Initiate a connection to the given node private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); this.connectionStates.connecting(nodeConnectionId, now); selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } // Check if the node with the given id is ready to send more requests. public boolean isReady(Node node, long now) { return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString()); } // Are we connected and ready and able to send more requests to the given connection? private boolean canSendRequest(String node) { return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } |
1 2 3 4 5 |
public boolean canSendMore(String node) { Deque<ClientRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); } |
1 2 3 |
public boolean completed() { return remaining <= 0 && !pending; } |
1 2 3 4 5 6 7 8 |
public void add(ClientRequest request) { Deque<ClientRequest> reqs = this.requests.get(request.request().destination()); if (reqs == null) { reqs = new ArrayDeque<>(); this.requests.put(request.request().destination(), reqs); } reqs.addFirst(request); // 新的请求总是加到队列的头部. } |
requests Map的key是request.request().destination(),表示这个请求要发送到哪个Broker节点上。所以从这里也可以看出,现在的作用域都只是在客户端,因为只有客户端才有目标节点destination。如果是Kafka作为服务端(目标节点),客户端连接服务端,就可以通过SocketChannel和服务端通信。
client-server mode
complete handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// Handle any completed request send. In particular if no response is expected, consider the request complete. private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { // 这里获取目标节点的队列中第一个请求, 但并没有从队列中删除, 取出之后判断这个请求是否期望得到响应 ClientRequest request = this.inFlightRequests.lastSent(send.destination()); // 如果不需要响应,当Send请求完成时,就直接返回. 不过还是有ClientResponse对象的, 只不过最后一个参数为null,表示没有响应内容 if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(new ClientResponse(request, now, false, null)); } // 如果客户端请求需要有响应, 那么它的响应是在下面的handleCompletedReceives中设置的. } } |
不需要响应的流程:开始发送请求->添加到inFlightRequests-> 发送请求… ->请求发送成功->从inFlightRequests删除请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// Handle any completed receives and update the response list with the responses received. private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); // 接收到完整的响应了, 现在可以删除inFlightRequests中的ClientRequest了. ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); // Always expect the response version id to be the same as the request version id short apiKey = req.request().header().apiKey(); short apiVer = req.request().header().apiVersion(); Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload()); correlate(req.request().header(), header); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body)); } } |
InFlightRequests complete
1 2 3 4 5 6 7 8 9 10 11 12 |
//Get the oldest request (the one that that will be completed next) for the given node public ClientRequest completeNext(String node) { return requestQueue(node).pollLast(); } // Get the last request we sent to the given node (but don't remove it from the queue) public ClientRequest lastSent(String node) { return requestQueue(node).peekFirst(); } // Complete the last request that was sent to a particular node. public ClientRequest completeLastSent(String node) { return requestQueue(node).pollFirst(); } |
I’m reading new client design of version 0.9. and I has a question of inFlightRequests in and out.
Here is the basic flow :
When Sender send a ClientRequest to NetworkClient, it add to inFlightRequests indicator in-flight requests
1 2 3 4 |
private void doSend(ClientRequest request, long now) { this.inFlightRequests.add(request); selector.send(request.request()); } |
the inFlightRequests map node to deque. the new request add as first element of deque
1 2 3 4 5 6 7 8 |
public void add(ClientRequest request) { Deque<ClientRequest> reqs = this.requests.get(request.request().destination()); if (reqs == null) { reqs = new ArrayDeque<>(); this.requests.put(request.request().destination(), reqs); } reqs.addFirst(request); } |
then poll happen on client and then selector, after success send this ClientRequest, the send will add to selector’s completedSends
1 2 3 4 5 6 7 8 9 10 |
private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(new ClientResponse(request, now, false, null)); } } } |
if this request does’t need response, the ClientRequest will remove from inFlightRequest
1 2 3 |
public ClientRequest completeLastSent(String node) { return requestQueue(node).pollFirst(); } |
I’m curios why poll First? A scene like this: after the first ClientRequest sended out success,
and not yet execute to handleCompletedSends, another ClientRequest coming, and the new request addFirst to deque.
then pollFirst execute, as first element of deque now become to the new request, pollFirst will delete the new one, not old one.
1 2 3 4 5 6 |
CR1->inFlightRequests | CR1 send success | CR2->inFlightRequests | completeSends | pollFirst first last first last ------------- ------------- CR1 CR2 CR1 CR1 poll CR2, but CR2 is just come in! ------------- ------------- |
I has also check NetworkClient.send->canSendRequest -> inFlightRequests.canSendMore(node) -> queue.peekFirst().request().completed()
only the first element of deque finish, then new request can send to the same node. but the condition of completed
by ByteBufferSend
is remaining <= 0 && !pending
. which means If Send sended success to server, it’s completed!
Am I missig something(are there any other limitation)? Can some on point out. Tks.
上一篇: 【阿里云镜像】使用阿里云oVirt镜像部署oVirt平台
下一篇: 数组的定义与遍历