欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka学习:produce消息

程序员文章站 2022-05-29 20:41:34
...

kafka学习:produce消息

由于历史原因,我们公司用的还是比较老的kafka版本0.10.1.0,最近由于项目需要,学习了kafka的使用,学习过程中了解到的一些知识记录下,以便日后查阅;

目录

kafka学习:produce消息

1. broker & topic & partition

2. 发送消息分区选择

3. 数据包格式

4. golang数据封装

5. Produce底层发送数据流程

6. 客户端推荐

7. 参考资料


1. broker & topic & partition

  • broker: kafka服务端;

  • topic: 消息队列,可以按照业务类型分成不同的消息队列,最大的看到有些大厂topic数有8K多;

  • partition: 分区,一个消息topic队列可以有多个分区,将分区分散到不同的broker时,可以提高消费队列总体的读写效率;

三者之间关系图如下:

下图以常见部署方式为例,3台服务器,每个服务器部署一个broker, 3个broker组成一个kafka集群,集群下创建2个top消息队列,每个topic创建3个partition, 每个partition分布到不同的服务器上;

 

kafka学习:produce消息

 

2. 发送消息分区选择

分区选择策略,可以根据需要选择多种:

  • 指定分区

  • 轮询

  • hash

 

3. 数据包格式

 

kafka学习:produce消息

当前我们用的版本比较低,所以数据格式相对比较简单

  • CRC: 对所有字段校验后的值

  • 属性:消息压缩编码格式

当前的数据头固定部分已经22B, 有些字段在后续的版本里已经用了可变长度字段,一般的key的长度都用不到4B,1B已经可以解决大部分场景需要,可变长度大致意思是这样的:

 

kafka学习:produce消息

最高为位特殊标志位:

  • 0:下一个自己是新的编码

  • 1:编码尚未结束,还需要读取后面的字节完成编码;

低7位,借鉴了Google ProtoBuffer里的Zig-zag编码方式,即围绕0进行无符号编码,如下:

0 编码成 0

-1 -> 1

1 -> 2

-2 -> 3

2 -> 4

...

因此,1B去掉最高1位特殊标志位,只有7位有效(0~127),而所有的正数都会被编码成2倍的数字,则key或value的值一旦超过64(128/2),长度信息就需要2B保存。

4. golang数据封装

goalng客户端在数据封装前进行了一个有意思的封装,先定义了一个接口,接口里声明了各种数据接收函数,然后定义了一个消息预处理结构体,消息预处理结构体的主要作用是获取各种数据所占总空间大小,消息真实处理结构体也实现了接口,并会根据预处理结构体获取到的消息长度先分配实际消息最大长度,再逐个存储消息;

  • 定义接口

     
    type packetEncoder interface {
    	// Primitives
    	putInt8(in int8)
    	putInt16(in int16)
    	putInt32(in int32)
    	...
    }

     

  • 消息预处理

    type prepEncoder struct {
    	length int
    }
    
    // primitives
    
    func (pe *prepEncoder) putInt8(in int8) {
    	pe.length++
    }
    
    func (pe *prepEncoder) putInt16(in int16) {
    	pe.length += 2
    }
    
    func (pe *prepEncoder) putInt32(in int32) {
    	pe.length += 4
    }
    ...

     

  • 消息真实处理

    type realEncoder struct {
    	raw      []byte
    	off      int
    	stack    []pushEncoder
    	registry metrics.Registry
    }
    
    // primitives
    
    func (re *realEncoder) putInt8(in int8) {
    	re.raw[re.off] = byte(in)
    	re.off++
    }
    
    func (re *realEncoder) putInt16(in int16) {
    	binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
    	re.off += 2
    }
    
    func (re *realEncoder) putInt32(in int32) {
    	binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
    	re.off += 4
    }
    ...

     

 

5. Produce底层发送数据流程

golang版本的客户端,数据发送与缓存大小、消息数量、刷新频率三个因素有关;

  • 缓存大小

    每创建一个produce客户端,golang客户端sarma就会分配100MB(默认)的缓存空间,达到最大缓存空间时需要将数据发送到服务器端

  • 消息数量

    每个produce客户端,可以配置缓存消息数量,达到该值时,需要将数据发送到服务器端

  • 刷新频率

    刷新频率默认为0,可将值根据项目需要设置一个合理值,如20ms,基本能满足大部分场景需要

kafka学习:produce消息

6. 客户端推荐

golang的客户端主要有2个:

  • sarma(推荐)

  • confluent

sarma是纯go语言开发的,confluent底层是调用的c库,接口用go语言封装了下,个人比较推荐sarma;使用过c++/c语言的开发者都知道,内存管理是个让人吐血的问题,c++/c开发的代码很容易出现内存越界,非法内存访问等等问题,一旦使用cgo调用c库,很可能由于c库的内存问题导致golang代码报莫名其妙的错误,本人就多次碰到过golang调用第三方库c库导致程序很不稳定,放弃cgo调用后程序就稳定了,至少golang程序遇到panic异常时能比较准确的告知错误位置;

 

7. 参考资料

  • 《Apache Kafka实战》 胡夕 著

    深入学习kafka,强烈推荐这本书,整个书的架构调理清晰,原理分析的通俗易懂;