kafka-producer端-系统设计关注点的源码探究 Kafkaproducer
目前我对kafka producer的关注点大概有这三个:1.producer如何去支持分区(分布式);2. 如何保证消息的可靠发送(ACK);3. 如何保证可用性(异步之后,如何对内存进行管理(缓存消息在内存中的存储结构),以及OOM后发送线程的状态,以及IO线程的实现)。所以今天着重这三个点去了解源码的实现,其余的点如producer端对有序的保证,协议层,网络层的实现等后期有时间再写。
基本概念
先从一张图上看下什么是分区
分区是什么,结构?
每一个分区都是一组有顺序的,不可改变的,但是能被持续添加的消息序列。分区上的每条消息都有一个唯一的序列ID(offset)来唯一的标识在这个partition中定位这条消息。
为什么要有分区?
1. 分布式,允许存储超过单台更多的数据量
2. 并行处理,加快发送和消费的速度
注意:分区是broker去负责实现的概念,但是本身并不支持对提交过的消息做分区。也就是说broker支持分区的功能,但是怎么保证消息去哪个分区broker是不关心的。这就需要producer端在消息发送的时候需要指定发送到哪个分区。
Producer端
1. Kafka producer如何(分区)
/** * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { … int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); … } private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); if (partition != null) { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int lastPartition = partitions.size() - 1; // they have given us a partition, use it if (partition < 0 || partition > lastPartition) { throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); } return partition; } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
每次在dosend的时候都去为当前的日志选择partition,如果在发送的时候已指定好partition,并且partition在Custer中存储的partition列表中,则以指定的partition为主。否则,则使用自定的partition class的分区函数进行分区。
2. 每条日志在内存中存储的大小
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); /** * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords} * for the in-memory representation. */ public interface Records extends Iterable<LogEntry> { int SIZE_LENGTH = 4; int OFFSET_LENGTH = 8; int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH; /** * The size of these records in bytes */ int sizeInBytes(); }
每条日志都是4个byte的size和8个byte的offset,然后再加上具体日志的内容
3. Kafka producer端的内存队列
producer端产生的日志都会存储到一个叫做RecordAccumulator的存储器中,其中
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
可以看到数据结构是一个线程安全的map,其中key为分区信息,其中
TopicPartition的结构如下:
public TopicPartition(String topic, int partition) { this.partition = partition; this.topic = topic; }
可以看到,RecordAccumulator为每一个topic的的每一个分区都维护了一个双端队列。
看下消息是如何存放到队列中的:
// check if we have an in-progress batch Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }
获取队列,加锁,然后进行tryAppend,再看tryAppend的代码:
/** * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary * resources (like compression streams buffers). */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) { RecordBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) last.records.close(); else return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); } return null; }
正常情况下我们直接把新的日志直接放到队列中退出就好了,但是此处producer并不是直接这么简单的处理的,我们可以继续往下分析看下,producer在此处的设计巧妙之处。
先从队列取队尾的一个元素,数据结构为RecordBatch,看下构造方法
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.createdMs = now; this.lastAttemptMs = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); this.thunks = new ArrayList<Thunk>(); this.lastAppendTime = createdMs; this.retry = false; }
其中records就是对日志消息的封装,也就是说deque中并不是直接存放的消息,而是对消息的封装,进入到下一行,如果队尾不为空,则FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
貌似是直接塞到队列peekLast获取到的这个对象中的,而不是新产生一个RecordBatch,看方法tryAppend,
/** * Append the record to the current record set and return the relative offset within that record set * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
看方法说明,直接把新的消息追加到当前的这个消息中,果不其然。等会再看看这样设计的出发点,这样设计最起码可以减少对象的产生,但更重要的原因目测应该是和网络IO的考量有关。继续往下看
this.records.hasRoomFor(key, value)
判断当前的memoryRecords是否有足够的空间,如果没有则直接返回空;
如果有,则调用memoryRecords的append方法,
public long append(long offset, long timestamp, byte[] key, byte[] value) {
if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = Record.recordSize(key, value);
compressor.putLong(offset);
compressor.putInt(size);
long crc = compressor.putRecord(timestamp, key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
return crc;
}
Append方法中主要调用的是compressor的api来写入数据,根据命名来看应该是叫做压缩器,以前听说kafka的消息占用的字节数小,和数据压缩有很大的关系,现在我们就进这个compressor中看下如何实现的,先看构造函数
public Compressor(ByteBuffer buffer, CompressionType type) { this.type = type; this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; this.compressionRate = 1; this.maxTimestamp = Record.NO_TIMESTAMP; if (type != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata // and move the starting position to the value payload offset buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); } // create the stream bufferStream = new ByteBufferOutputStream(buffer); appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE); }
关注bufferStream,自定义的字节缓冲区输出流,
public ByteBufferOutputStream(ByteBuffer buffer) { this.buffer = buffer; }
包装了一个自己缓冲区,
public void write(int b) { if (buffer.remaining() < 1) expandBuffer(buffer.capacity() + 1); buffer.put((byte) b); }
并且在write的时候能够自动对缓冲区进行扩容。
再关注appendStream,是一个DataOutputStream,其中
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: try { OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } case LZ4: try { OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } default: throw new IllegalArgumentException("Unknown compression type: " + type); } } catch (IOException e) { throw new KafkaException(e); } }
提供了对gzip的支持。
现在我们在回到memoryRecord的append方法中来看下,其实这里也就是往compressor中的dataoutputstream中按照固定的格式写入日志的header信息,metadata信息,日志的内容以及日志摘要信息。
再来到append的方法上层,是recordBatch的tryAppend方法
long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future;
在执行玩append方法后,构造了一个FutureRecordMetadata对象,FutureRecordMetadata实现了future接口,可以获取异步计算的结果。
再回到RecordAccumulator的append方法中,
如果
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
如果在tryAppend失败的情况下
// we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
接下来进行的工作就是申请内存,创建日志的各种包装数据结构。
至此,日志已经存放在内存成功。
4. 日志发送
接下来我们再看看backup的iothread如何进行日志的发送。
Kafka主要的发送逻辑由一个叫做sender的线程去实现的,看代码
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);
感觉代码注释的很清楚了。
首先获取Cluster中和当前producer相关的每个topic的每个partition的leader节点,如果集群中存在有leader节点未知的情况,则请求更新medadata信息,然后遍历这些leader节点,如果物理连接没有建立,则从当前leader列表中去除这些实际上还未准备好的leader,下次再发送;
然后对所有的消息根绝nodeId进行分组(之前在内存中是根据TopicPartition分组进行存储的,这次是根据物理节点的情况进行分组,做最后发送前的准备)。如果配置了guaranteeMessageOrder属性,则需要保证发送顺序。然后取消一些已经超时了的消息。然后开始生成ClientRequest的列表,调用kafkaClient进行发送。然后循环此逻辑,进行下一次发送。
5. 对ACK的处理
/** * Transfer the record batches into a list of produce requests on a per-node basis */ private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size()); for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; }
我们分析sender在doSend的时候通过调用createProduceRequests方法的细节。进入到produceRequest看具体是如何生成ClientRequest对象的。首先分析ClientRequest的数据结构
/** * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created. * @param expectResponse Should we expect a response message or is this request complete once it is sent? * @param request The request * @param callback A callback to execute when the response has been received (or null if no callback is necessary) * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its * response will be consumed by network client */ public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) { this.createdTimeMs = createdTimeMs; this.callback = callback; this.request = request; this.expectResponse = expectResponse; this.isInitiatedByNetworkClient = isInitiatedByNetworkClient; }
其中需要传入具体的RequestSend对象,和RequestCompletionHandler对象,RequestCompletionHandler是一个接口,其中只有一个方法
public void onComplete(ClientResponse response);
此接口应该充当的是回调函数,其中在Request发送成功后onComplete方法被调用。
再回到createProduceRequests方法中,我们看下produceRequest的方法
/** * Create a produce request from the given record batches */ private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; return new ClientRequest(now, acks != 0, send, callback); }
其中重点关注callback的实现逻辑handleProduceResponse,
/** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { int correlationId = response.request().request().header().correlationId(); if (response.wasDisconnected()) { log.trace("Cancelled request {} due to node {} being disconnected", response, response.request() .request() .destination()); for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now); } else { log.trace("Received produce response from node {} with correlation id {}", response.request().request().destination(), correlationId); // if we have a response, parse it if (response.hasResponse()) { ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); Errors error = Errors.forCode(partResp.errorCode); RecordBatch batch = batches.get(tp); completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(response.request().request().destination(), produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now); } } }
其中主要是根据response的情况进行一些简单的逻辑处理,然后主要的完成逻辑都是在completeBatch方法中。其中如果acks=0的话,直接
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
如果ack不为0 的情况下,则 completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
进入到completeBatch方法中,
/** * Complete or retry the given batch of records. * * @param batch The record batch * @param error The error (or null if none) * @param baseOffset The base offset assigned to the records if successful * @param timestamp The timestamp returned by the broker for this batch * @param correlationId The correlation id for the request * @param now The current POSIX time stamp in milliseconds */ private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) { if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts - 1, error); this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { RuntimeException exception; if (error == Errors.TOPIC_AUTHORIZATION_FAILED) exception = new TopicAuthorizationException(batch.topicPartition.topic()); else exception = error.exception(); // tell the user the result of their request batch.done(baseOffset, timestamp, exception); this.accumulator.deallocate(batch); if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) metadata.requestUpdate(); // Unmute the completed partition. if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition); }
其中,如果response中有error信息,配且配置了重试,则直接进行reenqueue操作, this.accumulator.reenqueue(batch, now);
否则,就进行异常的包装,当前request的后续收尾工作,以及消息占用的内存的释放