kafka学习:produce消息
kafka学习:produce消息
由于历史原因,我们公司用的还是比较老的kafka版本0.10.1.0,最近由于项目需要,学习了kafka的使用,学习过程中了解到的一些知识记录下,以便日后查阅;
目录
1. broker & topic & partition
-
broker: kafka服务端;
-
topic: 消息队列,可以按照业务类型分成不同的消息队列,最大的看到有些大厂topic数有8K多;
-
partition: 分区,一个消息topic队列可以有多个分区,将分区分散到不同的broker时,可以提高消费队列总体的读写效率;
三者之间关系图如下:
下图以常见部署方式为例,3台服务器,每个服务器部署一个broker, 3个broker组成一个kafka集群,集群下创建2个top消息队列,每个topic创建3个partition, 每个partition分布到不同的服务器上;
2. 发送消息分区选择
分区选择策略,可以根据需要选择多种:
-
指定分区
-
轮询
-
hash
3. 数据包格式
当前我们用的版本比较低,所以数据格式相对比较简单
-
CRC: 对所有字段校验后的值
-
属性:消息压缩编码格式
当前的数据头固定部分已经22B, 有些字段在后续的版本里已经用了可变长度字段,一般的key的长度都用不到4B,1B已经可以解决大部分场景需要,可变长度大致意思是这样的:
最高为位特殊标志位:
-
0:下一个自己是新的编码
-
1:编码尚未结束,还需要读取后面的字节完成编码;
低7位,借鉴了Google ProtoBuffer里的Zig-zag编码方式,即围绕0进行无符号编码,如下:
0 编码成 0
-1 -> 1
1 -> 2
-2 -> 3
2 -> 4
...
因此,1B去掉最高1位特殊标志位,只有7位有效(0~127),而所有的正数都会被编码成2倍的数字,则key或value的值一旦超过64(128/2),长度信息就需要2B保存。
4. golang数据封装
goalng客户端在数据封装前进行了一个有意思的封装,先定义了一个接口,接口里声明了各种数据接收函数,然后定义了一个消息预处理结构体,消息预处理结构体的主要作用是获取各种数据所占总空间大小,消息真实处理结构体也实现了接口,并会根据预处理结构体获取到的消息长度先分配实际消息最大长度,再逐个存储消息;
-
定义接口
type packetEncoder interface { // Primitives putInt8(in int8) putInt16(in int16) putInt32(in int32) ... }
-
消息预处理
type prepEncoder struct { length int } // primitives func (pe *prepEncoder) putInt8(in int8) { pe.length++ } func (pe *prepEncoder) putInt16(in int16) { pe.length += 2 } func (pe *prepEncoder) putInt32(in int32) { pe.length += 4 } ...
-
消息真实处理
type realEncoder struct { raw []byte off int stack []pushEncoder registry metrics.Registry } // primitives func (re *realEncoder) putInt8(in int8) { re.raw[re.off] = byte(in) re.off++ } func (re *realEncoder) putInt16(in int16) { binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in)) re.off += 2 } func (re *realEncoder) putInt32(in int32) { binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in)) re.off += 4 } ...
5. Produce底层发送数据流程
golang版本的客户端,数据发送与缓存大小、消息数量、刷新频率三个因素有关;
-
缓存大小
每创建一个produce客户端,golang客户端sarma就会分配100MB(默认)的缓存空间,达到最大缓存空间时需要将数据发送到服务器端
-
消息数量
每个produce客户端,可以配置缓存消息数量,达到该值时,需要将数据发送到服务器端
-
刷新频率
刷新频率默认为0,可将值根据项目需要设置一个合理值,如20ms,基本能满足大部分场景需要
6. 客户端推荐
golang的客户端主要有2个:
-
sarma(推荐)
-
confluent
sarma是纯go语言开发的,confluent底层是调用的c库,接口用go语言封装了下,个人比较推荐sarma;使用过c++/c语言的开发者都知道,内存管理是个让人吐血的问题,c++/c开发的代码很容易出现内存越界,非法内存访问等等问题,一旦使用cgo调用c库,很可能由于c库的内存问题导致golang代码报莫名其妙的错误,本人就多次碰到过golang调用第三方库c库导致程序很不稳定,放弃cgo调用后程序就稳定了,至少golang程序遇到panic异常时能比较准确的告知错误位置;
7. 参考资料
-
《Apache Kafka实战》 胡夕 著
深入学习kafka,强烈推荐这本书,整个书的架构调理清晰,原理分析的通俗易懂;
推荐阅读
-
Java使用kafka发送和生产消息的示例
-
利用Python学习RabbitMQ消息队列
-
activemq消息持久化方式(activemq和kafka区别)
-
Android贝塞尔曲线初步学习第二课 仿QQ未读消息气泡拖拽黏连效果
-
activemq消息持久化方式(activemq和kafka区别)
-
Kafka的消息会丢失和重复吗?——如何实现Kafka精确传递一次语义
-
学习强国怎么设置红包提醒? 学习强国红包消息开启方法
-
浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
-
利用Python学习RabbitMQ消息队列
-
js自定义消息机制研究学习(一) ——看百度搜索输入提示