KafkaProducer源码分析
kafka常用术语
broker:kafka的服务端即kafka实例,kafka集群由一个或多个broker组成,主要负责接收和处理客户端的请求
topic:主题,kafka承载消息的逻辑容器,每条发布到kafka的消息都有对应的逻辑容器,工作中多用于区分业务
partition:分区,是物理概念,代表有序不变的消息序列,每个topic由一个或多个partion组成
replica:副本,kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为leader和follower,角色不同作用不同,副本是对partition而言的,每个分区可配置多个副本来实现高可用
record:消息,kafka处理的对象
offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值
producer:生产者,向主题发送新消息的应用程序
consumer:消费者,从主题订阅新消息的应用程序
consumer offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移
consumer group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源)
reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程
下面用一张图展示上面提到的部分概念(用ppt画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)
消息生产流程
先来个kafkaproducer的小demo
public static void main(string[] args) throws executionexception, interruptedexception { if (args.length != 2) { throw new illegalargumentexception("usage: com.ding.kafkaproducerdemo bootstrap-servers topic-name"); } properties props = new properties(); // kafka服务器ip和端口,多个用逗号分割 props.put("bootstrap.servers", args[0]); // 确认信号配置 // ack=0 代表producer端不需要等待确认信号,可用性最低 // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功 // 消息丢失 // ack=all leader需要等待所有follower成功备份,可用性最高 props.put("ack", "all"); // 重试次数 props.put("retries", 0); // 批处理消息的大小,批处理可以增加吞吐量 props.put("batch.size", 16384); // 延迟发送消息的时间 props.put("linger.ms", 1); // 用来换出数据的内存大小 props.put("buffer.memory", 33554432); // key 序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); // value 序列化方式 props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); // 创建kafkaproducer对象,创建时会启动sender线程 producer<string, string> producer = new kafkaproducer<>(props); for (int i = 0; i < 100; i++) { // 往recordaccumulator中写消息 future<recordmetadata> result = producer.send(new producerrecord<>(args[1], integer.tostring(i), integer.tostring(i))); recordmetadata rm = result.get(); system.out.println("topic: " + rm.topic() + ", partition: " + rm.partition() + ", offset: " + rm.offset()); } producer.close(); }
实例化
kafkaproducer构造方法主要是根据配置文件进行一些实例化操作
1.解析clientid,若没有配置则由是producer-递增的数字
2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)
3.解析key、value的序列化方式并实例化
4.解析并实例化拦截器
5.解析并实例化recordaccumulator,主要用于存放消息(kafkaproducer主线程往recordaccumulator中写消息,sender线程从recordaccumulator中读消息并发送到kafka中)
6.解析broker地址
7.创建一个sender线程并启动
... this.sender = newsender(logcontext, kafkaclient, this.metadata); this.iothread = new kafkathread(iothreadname, this.sender, true); this.iothread.start(); ...
消息发送流程
消息的发送入口是kafkaproducer.send方法,主要过程如下
kafkaproducer.send kafkaproducer.dosend // 获取集群信息 kafkaproducer.waitonmetadata // key/value序列化 key\value serialize // 分区 kafkaproducer.partion // 创建topcipartion对象,记录消息的topic和partion信息 topicpartition // 写入消息 recordaccumulator.applend // 唤醒sender线程 sender.wakeup
recordaccumulator
recordaccumulator是消息队列用于缓存消息,根据topicpartition对消息分组
重点看下recordaccumulator.applend追加消息的流程
// 记录进行applend的线程数 appendsinprogress.incrementandget();
// 根据topicpartition获取或新建deque双端队列 deque<producerbatch> dq = getorcreatedeque(tp); ... private deque<producerbatch> getorcreatedeque(topicpartition tp) { deque<producerbatch> d = this.batches.get(tp); if (d != null) return d; d = new arraydeque<>(); deque<producerbatch> previous = this.batches.putifabsent(tp, d); if (previous == null) return d; else return previous; }
// 尝试将消息加入到缓冲区中 // 加锁保证同一个topicpartition写入有序 synchronized (dq) { if (closed) throw new kafkaexception("producer closed while send in progress"); // 尝试写入 recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq); if (appendresult != null) return appendresult; }
private recordappendresult tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, deque<producerbatch> deque) { // 从双端队列的尾部取出producerbatch producerbatch last = deque.peeklast(); if (last != null) { // 取到了,尝试添加消息 futurerecordmetadata future = last.tryappend(timestamp, key, value, headers, callback, time.milliseconds()); // 空间不够,返回null if (future == null) last.closeforrecordappends(); else return new recordappendresult(future, deque.size() > 1 || last.isfull(), false); } // 取不到返回null return null; }
public futurerecordmetadata tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, long now) { // 空间不够,返回null if (!recordsbuilder.hasroomfor(timestamp, key, value, headers)) { return null; } else { // 真正添加消息 long checksum = this.recordsbuilder.append(timestamp, key, value, headers); ... futurerecordmetadata future = ... // future和回调callback进行关联 thunks.add(new thunk(callback, future)); ... return future; } }
// 尝试applend失败(返回null),会走到这里。如果tryapplend成功直接返回了 // 从bufferpool中申请内存空间,用于创建新的producerbatch buffer = free.allocate(size, maxtimetoblock);
synchronized (dq) { // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加? // 因为可能已经有其他线程创建了producerbatch或者之前的producerbatch已经被sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间 recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq); if (appendresult != null) { return appendresult; } // 尝试添加失败了,新建producerbatch memoryrecordsbuilder recordsbuilder = recordsbuilder(buffer, maxusablemagic); producerbatch batch = new producerbatch(tp, recordsbuilder, time.milliseconds()); futurerecordmetadata future = utils.notnull(batch.tryappend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addlast(batch); incomplete.add(batch); // 将buffer置为null,避免在finally汇总释放空间 buffer = null; return new recordappendresult(future, dq.size() > 1 || batch.isfull(), true); }
finally { // 最后如果再次尝试添加成功,会释放之前申请的内存(为了新建producerbatch) if (buffer != null) free.deallocate(buffer); appendsinprogress.decrementandget(); }
// 将消息写入缓冲区 recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey,serializedvalue, headers, interceptcallback, remainingwaitms); if (result.batchisfull || result.newbatchcreated) { // 缓冲区满了或者新创建的producerbatch,唤起sender线程 this.sender.wakeup(); } return result.future;
sender发送消息线程
主要流程如下
sender.run sender.runonce sender.sendproducerdata // 获取集群信息 metadata.fetch // 获取可以发送消息的分区且已经获取到了leader分区的节点 recordaccumulator.ready // 根据准备好的节点信息从缓冲区中获取topicpartion对应的deque队列中取出producerbatch信息 recordaccumulator.drain // 将消息转移到每个节点的生产请求队列中 sender.sendproducerequests // 为消息创建生产请求队列 sender.sendproducerrequest kafkaclient.newclientrequest // 下面是发送消息 kafkaclient.sent networkclient.dosent selector.send // 其实上面并不是真正执行i/o,只是写入到kafkachannel中 // poll 真正执行i/o kafkaclient.poll
通过源码分析下sender线程的主要流程
kafkaproducer的构造方法在实例化时启动一个kafkathread线程来执行sender
// kafkaproducer构造方法启动sender string iothreadname = network_thread_prefix + " | " + clientid; this.iothread = new kafkathread(iothreadname, this.sender, true); this.iothread.start();
// sender->run()->runonce() long currenttimems = time.milliseconds(); // 发送生产的消息 long polltimeout = sendproducerdata(currenttimems); // 真正执行i/o操作 client.poll(polltimeout, currenttimems);
// 获取集群信息 cluster cluster = metadata.fetch();
// 获取准备好可以发送消息的分区且已经获取到leader分区的节点 recordaccumulator.readycheckresult result = this.accumulator.ready(cluster, now); // readycheckresult 包含可以发送消息且获取到leader分区的节点集合、未获取到leader分区节点的topic集合 public final set<node> 的节点; public final long nextreadycheckdelayms; public final set<string> unknownleadertopics;
ready方法主要是遍历在上面介绍recordaccumulator添加消息的容器,map<topicpartition, deque
然后对返回的unknownleadertopics进行遍历,将topic加入到metadata信息中,调用metadata.requestupdate方法请求更新metadata信息 对已经准备好的节点进行最后的检查,移除那些节点连接没有就绪的节点,主要根据kafkaclient.ready方法进行判断 下面开始创建生产消息的请求 把消息封装成clientrequest 调用kafkaclient发送消息(并非真正执行i/o),涉及到kafkachannel。kafka的通信采用的是nio方式 到这里,发送消息的工作准备的差不多了,调用kafkaclient.poll方法,真正执行i/o操作 用一张图总结sender线程的流程 通过上面的介绍,我们梳理出了kafka生产消息的主要流程,涉及到主线程往recordaccumulator中写入消息,同时后台的sender线程从recordaccumulator中获取消息,使用nio的方式把消息发送给kafka,用一张图总结 这是本公众号第一次尝试写源码相关的文章,说实话真不知道该如何下笔,代码截图、贴整体代码等感觉都被我否定了,最后采用了这种方式,介绍主要流程,把无关代码省略,配合流程图。 上周参加了华为云kafka实战课程,简单看了下kafka的生产和消费代码,想简单梳理下,然后在周日中午即8.17开始阅读源码,梳理流程,一直写到了晚上12点多,还剩一点没有完成,周一早晨早起完成了这篇文章。当然这篇文章忽略了很多更细节的东西,后面会继续深入,勇于尝试,不断精进,加油! 华为云实战 极客时间kafka专栏// 遍历batches
for (map.entry<topicpartition, deque<producerbatch>> entry : this.batches.entryset()) {
topicpartition part = entry.getkey();
deque<producerbatch> deque = entry.getvalue();
// 根据topicpartition从集群信息获取leader分区所在节点
node leader = cluster.leaderfor(part);
synchronized (deque) {
if (leader == null && !deque.isempty()) {
// 添加未找到对应leader分区所在节点但有要发送的消息的topic
unknownleadertopics.add(part.topic());
} else if (!readynodes.contains(leader) && !ismuted(part, nowms)) {
....
if (sendable && !backingoff) {
// 添加准备好的节点
readynodes.add(leader);
} else {
...
}
for (string topic : result.unknownleadertopics)
this.metadata.add(topic);
result.unknownleadertopics);
this.metadata.requestupdate();
iterator<node> iter = result.readynodes.iterator();
long notreadytimeout = long.max_value;
while (iter.hasnext()) {
node node = iter.next();
// 调用kafkaclient.ready方法验证节点连接是否就绪
if (!this.client.ready(node, now)) {
// 移除没有就绪的节点
iter.remove();
notreadytimeout = math.min(notreadytimeout, this.client.polldelayms(node, now));
}
}
// 从recordaccumulator中取出topicpartition对应的deque双端队列,然后从双端队列头部取出producerbatch,作为要发送的信息
map<integer, list<producerbatch>> batches = this.accumulator.drain(cluster, result.readynodes, this.maxrequestsize, now);
clientrequest clientrequest = client.newclientrequest(nodeid, requestbuilder, now, acks != 0,requesttimeoutms, callback);
// networkclient.dosent方法
string destination = clientrequest.destination();
requestheader header = clientrequest.makeheader(request.version());
...
send send = request.tosend(destination, header);
inflightrequest inflightrequest = new inflightrequest(clientrequest,header,isinternalrequest,request,send,now);
this.inflightrequests.add(inflightrequest);
selector.send(send);
...
// selector.send方法
string connectionid = send.destination();
kafkachannel channel = openorclosingchannelorfail(connectionid);
if (closingchannels.containskey(connectionid)) {
this.failedsends.add(connectionid);
} else {
try {
channel.setsend(send);
...
client.poll(polltimeout, currenttimems);
后记
参考资料