欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

KafkaProducer源码分析

程序员文章站 2022-10-29 22:38:11
Kafka常用术语 Broker :Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求 Topic :主题,Kafka承载消息的逻辑容器,每条发布到Kafka的消息都有对应的逻辑容器,工作中多用于区分业务 Partition :分区,是物理 ......

kafka常用术语

broker:kafka的服务端即kafka实例,kafka集群由一个或多个broker组成,主要负责接收和处理客户端的请求

topic:主题,kafka承载消息的逻辑容器,每条发布到kafka的消息都有对应的逻辑容器,工作中多用于区分业务

partition:分区,是物理概念,代表有序不变的消息序列,每个topic由一个或多个partion组成

replica:副本,kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为leader和follower,角色不同作用不同,副本是对partition而言的,每个分区可配置多个副本来实现高可用

record:消息,kafka处理的对象

offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值

producer:生产者,向主题发送新消息的应用程序

consumer:消费者,从主题订阅新消息的应用程序

consumer offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移

consumer group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源

reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程

下面用一张图展示上面提到的部分概念(用ppt画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)

KafkaProducer源码分析

消息生产流程

先来个kafkaproducer的小demo

public static void main(string[] args) throws executionexception, interruptedexception {
        if (args.length != 2) {
            throw new illegalargumentexception("usage: com.ding.kafkaproducerdemo bootstrap-servers topic-name");
        }

        properties props = new properties();
        // kafka服务器ip和端口,多个用逗号分割
        props.put("bootstrap.servers", args[0]);
        // 确认信号配置
        // ack=0 代表producer端不需要等待确认信号,可用性最低
        // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功
        // 消息丢失
        // ack=all leader需要等待所有follower成功备份,可用性最高
        props.put("ack", "all");
        // 重试次数
        props.put("retries", 0);
        // 批处理消息的大小,批处理可以增加吞吐量
        props.put("batch.size", 16384);
        // 延迟发送消息的时间
        props.put("linger.ms", 1);
        // 用来换出数据的内存大小
        props.put("buffer.memory", 33554432);
        // key 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        // value 序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

        // 创建kafkaproducer对象,创建时会启动sender线程
        producer<string, string> producer = new kafkaproducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 往recordaccumulator中写消息
            future<recordmetadata> result = producer.send(new producerrecord<>(args[1], integer.tostring(i), integer.tostring(i)));
            recordmetadata rm = result.get();
            system.out.println("topic: " + rm.topic() + ", partition: " +  rm.partition() + ", offset: " + rm.offset());
        }
        producer.close();
    }

实例化

kafkaproducer构造方法主要是根据配置文件进行一些实例化操作

1.解析clientid,若没有配置则由是producer-递增的数字

2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)

3.解析key、value的序列化方式并实例化

4.解析并实例化拦截器

5.解析并实例化recordaccumulator,主要用于存放消息(kafkaproducer主线程往recordaccumulator中写消息,sender线程从recordaccumulator中读消息并发送到kafka中)

6.解析broker地址

7.创建一个sender线程并启动

...
this.sender = newsender(logcontext, kafkaclient, this.metadata);
this.iothread = new kafkathread(iothreadname, this.sender, true);
this.iothread.start();
...

消息发送流程

消息的发送入口是kafkaproducer.send方法,主要过程如下

kafkaproducer.send
kafkaproducer.dosend
// 获取集群信息
kafkaproducer.waitonmetadata 
// key/value序列化
key\value serialize
// 分区
kafkaproducer.partion
// 创建topcipartion对象,记录消息的topic和partion信息
topicpartition
// 写入消息
recordaccumulator.applend
// 唤醒sender线程
sender.wakeup

recordaccumulator

recordaccumulator是消息队列用于缓存消息,根据topicpartition对消息分组

重点看下recordaccumulator.applend追加消息的流程

// 记录进行applend的线程数
appendsinprogress.incrementandget();
// 根据topicpartition获取或新建deque双端队列
deque<producerbatch> dq = getorcreatedeque(tp);
...
private deque<producerbatch> getorcreatedeque(topicpartition tp) {
    deque<producerbatch> d = this.batches.get(tp);
    if (d != null)
        return d;
    d = new arraydeque<>();
    deque<producerbatch> previous = this.batches.putifabsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}
// 尝试将消息加入到缓冲区中
// 加锁保证同一个topicpartition写入有序
synchronized (dq) {
    if (closed)
        throw new kafkaexception("producer closed while send in progress");
    // 尝试写入
    recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
    if (appendresult != null)
        return appendresult;
}
private recordappendresult tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, deque<producerbatch> deque) {
    // 从双端队列的尾部取出producerbatch
    producerbatch last = deque.peeklast();
    if (last != null) {
        // 取到了,尝试添加消息
        futurerecordmetadata future = last.tryappend(timestamp, key, value, headers, callback, time.milliseconds());
        // 空间不够,返回null
        if (future == null)
            last.closeforrecordappends();
        else
            return new recordappendresult(future, deque.size() > 1 || last.isfull(), false);
    }
    // 取不到返回null
    return null;
}
public futurerecordmetadata tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, long now) {
    // 空间不够,返回null
    if (!recordsbuilder.hasroomfor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 真正添加消息
        long checksum = this.recordsbuilder.append(timestamp, key, value, headers);
        ...
        futurerecordmetadata future = ...
        // future和回调callback进行关联    
        thunks.add(new thunk(callback, future));
        ...
        return future;
    }
}
// 尝试applend失败(返回null),会走到这里。如果tryapplend成功直接返回了
// 从bufferpool中申请内存空间,用于创建新的producerbatch
buffer = free.allocate(size, maxtimetoblock);
synchronized (dq) {
    // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加?
    // 因为可能已经有其他线程创建了producerbatch或者之前的producerbatch已经被sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间
    recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
    if (appendresult != null) {
        return appendresult;
    }

    // 尝试添加失败了,新建producerbatch
    memoryrecordsbuilder recordsbuilder = recordsbuilder(buffer, maxusablemagic);
    producerbatch batch = new producerbatch(tp, recordsbuilder, time.milliseconds());
    futurerecordmetadata future = utils.notnull(batch.tryappend(timestamp, key, value, headers, callback, time.milliseconds()));

    dq.addlast(batch);
    incomplete.add(batch);
    // 将buffer置为null,避免在finally汇总释放空间
    buffer = null;
    return new recordappendresult(future, dq.size() > 1 || batch.isfull(), true);
}
finally {
    // 最后如果再次尝试添加成功,会释放之前申请的内存(为了新建producerbatch)
    if (buffer != null)
        free.deallocate(buffer);
    appendsinprogress.decrementandget();
}
// 将消息写入缓冲区
recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey,serializedvalue, headers, interceptcallback, remainingwaitms);
if (result.batchisfull || result.newbatchcreated) {
    // 缓冲区满了或者新创建的producerbatch,唤起sender线程
    this.sender.wakeup();
}
return result.future;

sender发送消息线程

主要流程如下

sender.run
sender.runonce
sender.sendproducerdata
// 获取集群信息
metadata.fetch
// 获取可以发送消息的分区且已经获取到了leader分区的节点
recordaccumulator.ready
// 根据准备好的节点信息从缓冲区中获取topicpartion对应的deque队列中取出producerbatch信息
recordaccumulator.drain
// 将消息转移到每个节点的生产请求队列中
sender.sendproducerequests
// 为消息创建生产请求队列
sender.sendproducerrequest
kafkaclient.newclientrequest
// 下面是发送消息
kafkaclient.sent
networkclient.dosent
selector.send
// 其实上面并不是真正执行i/o,只是写入到kafkachannel中
// poll 真正执行i/o
kafkaclient.poll

通过源码分析下sender线程的主要流程

kafkaproducer的构造方法在实例化时启动一个kafkathread线程来执行sender

// kafkaproducer构造方法启动sender
string iothreadname = network_thread_prefix + " | " + clientid;
this.iothread = new kafkathread(iothreadname, this.sender, true);
this.iothread.start();
// sender->run()->runonce()
long currenttimems = time.milliseconds();
// 发送生产的消息
long polltimeout = sendproducerdata(currenttimems);
// 真正执行i/o操作
client.poll(polltimeout, currenttimems);
// 获取集群信息
cluster cluster = metadata.fetch();
// 获取准备好可以发送消息的分区且已经获取到leader分区的节点
recordaccumulator.readycheckresult result = this.accumulator.ready(cluster, now);
// readycheckresult 包含可以发送消息且获取到leader分区的节点集合、未获取到leader分区节点的topic集合
public final set<node> 的节点;
public final long nextreadycheckdelayms;
public final set<string> unknownleadertopics;

ready方法主要是遍历在上面介绍recordaccumulator添加消息的容器,map<topicpartition, deque>,从集群信息中根据topicpartition获取leader分区所在节点,找不到对应leader节点但有要发送的消息的topic添加到unknownleadertopics中。同时把那些根据topicpartition可以获取leader分区且消息满足发送的条件的节点添加到的节点中

// 遍历batches
for (map.entry<topicpartition, deque<producerbatch>> entry : this.batches.entryset()) {
    topicpartition part = entry.getkey();
    deque<producerbatch> deque = entry.getvalue();
    // 根据topicpartition从集群信息获取leader分区所在节点
    node leader = cluster.leaderfor(part);
    synchronized (deque) {
        if (leader == null && !deque.isempty()) {
            // 添加未找到对应leader分区所在节点但有要发送的消息的topic
            unknownleadertopics.add(part.topic());
        } else if (!readynodes.contains(leader) && !ismuted(part, nowms)) {
                ....
                if (sendable && !backingoff) {
                    // 添加准备好的节点
                    readynodes.add(leader);
                } else {
                   ...
}

然后对返回的unknownleadertopics进行遍历,将topic加入到metadata信息中,调用metadata.requestupdate方法请求更新metadata信息

for (string topic : result.unknownleadertopics)
    this.metadata.add(topic);
    result.unknownleadertopics);
    this.metadata.requestupdate();

对已经准备好的节点进行最后的检查,移除那些节点连接没有就绪的节点,主要根据kafkaclient.ready方法进行判断

iterator<node> iter = result.readynodes.iterator();
long notreadytimeout = long.max_value;
while (iter.hasnext()) {
    node node = iter.next();
    // 调用kafkaclient.ready方法验证节点连接是否就绪
    if (!this.client.ready(node, now)) {
        // 移除没有就绪的节点
        iter.remove();
        notreadytimeout = math.min(notreadytimeout, this.client.polldelayms(node, now));
    }
}

下面开始创建生产消息的请求

// 从recordaccumulator中取出topicpartition对应的deque双端队列,然后从双端队列头部取出producerbatch,作为要发送的信息
map<integer, list<producerbatch>> batches = this.accumulator.drain(cluster, result.readynodes, this.maxrequestsize, now);

把消息封装成clientrequest

clientrequest clientrequest = client.newclientrequest(nodeid, requestbuilder, now, acks != 0,requesttimeoutms, callback);

调用kafkaclient发送消息(并非真正执行i/o),涉及到kafkachannel。kafka的通信采用的是nio方式

// networkclient.dosent方法
string destination = clientrequest.destination();
requestheader header = clientrequest.makeheader(request.version());
...
send send = request.tosend(destination, header);
inflightrequest inflightrequest = new inflightrequest(clientrequest,header,isinternalrequest,request,send,now);
this.inflightrequests.add(inflightrequest);
selector.send(send);

...

// selector.send方法    
string connectionid = send.destination();
kafkachannel channel = openorclosingchannelorfail(connectionid);
if (closingchannels.containskey(connectionid)) {
    this.failedsends.add(connectionid);
} else {
    try {
        channel.setsend(send);
    ...

到这里,发送消息的工作准备的差不多了,调用kafkaclient.poll方法,真正执行i/o操作

client.poll(polltimeout, currenttimems);

用一张图总结sender线程的流程

KafkaProducer源码分析

通过上面的介绍,我们梳理出了kafka生产消息的主要流程,涉及到主线程往recordaccumulator中写入消息,同时后台的sender线程从recordaccumulator中获取消息,使用nio的方式把消息发送给kafka,用一张图总结

KafkaProducer源码分析

后记

这是本公众号第一次尝试写源码相关的文章,说实话真不知道该如何下笔,代码截图、贴整体代码等感觉都被我否定了,最后采用了这种方式,介绍主要流程,把无关代码省略,配合流程图。

上周参加了华为云kafka实战课程,简单看了下kafka的生产和消费代码,想简单梳理下,然后在周日中午即8.17开始阅读源码,梳理流程,一直写到了晚上12点多,还剩一点没有完成,周一早晨早起完成了这篇文章。当然这篇文章忽略了很多更细节的东西,后面会继续深入,勇于尝试,不断精进,加油!

参考资料

华为云实战

极客时间kafka专栏