欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[metaq]Consumer metaqpullconsumerkafka

程序员文章站 2022-05-11 19:27:42
...

Metaq是一个类是kafka的消息系统,开源地址https://github.com/killme2008/Metamorphosis。

基于Pull的消息系统,consumer端保持了很多逻辑,比如当前拉取消息的offset,loadbalance等,使用zookeeper作为coordination。

简单类图

[metaq]Consumer
            
    
    
        metaqpullconsumerkafka

核心类ZKLoadRebalanceListener,负责集群感知,当有broker退出或consumer退出时,重新balance。

FetchManager是具体的worker线程,负责从处理FetchRequest,从某一个partition拉去数据,并回掉业务方的接口实现

 

其实现流程

1.注册consumer id

/meta/consumers/meta-example/ids新建临时节点,data为topicString,就是这个consumer订阅了那些topic

2.loadBalanceListener监听/meta/consumers/meta-example/ids children consumer数目变化

3.loadBalanceListener监听/meta/brokers/topics-sub/meta-test children分区变化

4.主动balance,分区和同组consumer之间均分

 

4.1 /meta/consumers/meta-example/ids下读取自己刚才写入的data,topics列表,构造一个myConsumerPerTopicMap,topic-->consumer

4.2 /meta/brokers/topics-sub/ids下读取所有brokers信息,成为Cluster

4.3 /meta/consumers/meta-example/ids读取所有consumer

4.4 /meta/consumers/meta-example/ids读取consumer的data,topics列表,构造一个consumersPerTopicMap,topic-->consumer list

4.5 拿自己订阅的topics的所有分区,partitionsPerTopicMap,topic-->partition list

4.5.1 /meta/brokers/topics-sub/meta-test读取所有children

4.5.2 每个child代表一个broker,其data为‘{"broker":"0-m","numParts":2}’,保存broker的分区信息

4.5.3 构造一个partList代表这个topic的所有分区信息,单个分区类是‘0-0’,‘0-1’

4.6 看看自己订阅的topic里,有哪些topic的broker或者分区有变化,初始化时old为空,都不一样,返回relevantTopicConsumerIdMap,topic-->consumer

4.7 暂停Fetcher

4.8 提交offset

4.9 对于每个变化的topic,根据负载均衡策略获取这个consumer对应的partition列表

4.10 查看当前这个topic的分区列表,查看是否有变更,topicRegistry中

4.11 新分配的分区添加,/meta/consumers/meta-example/owners/meta-test下创建临时节点,名称为分区名‘0-0’,data为consumerId

4.12 拿offset信息,/meta/consumers/meta-example/offsets/meta-test/0-0的data,内容如‘251537821439885312-545’,前为msgId,后为offset

4.13 初始化时,认为offset为0

4.14 分配给自己的分区随机取master或slave的一个读,master和slave的brokerId一样

4.15 添加FetchRequest到FetchManager

4.16 给需要请求的broker创立连接

4.17 关闭之前创建的无用的连接

4.18 启动fetch线程

 

5.FetchRequestRunner启动,开始拉数据

5.1 组装GetCommand,发送,戴上offset信息,每次取1M 

5.2 返回的数据弄一个MessageIterator包装下

6 数据处理

6.1 判断FetchRequest的重试次数是否超过限制,默认5次,超过则存本地,跳过这跳消息,继续消费

6.2 decodeMessage成message对象

6.3 调用业务接口

6.4 消息处理完后,ack request,修改内存中的TopicPartitionRegInfo的offset信息,后续Timer线程会commit

6.5 重新添加请求,继续拉取数据

 

Timer线程扫描topicRegistry中的所有topic的offset内存数据,修改对应zk中的offset节点中数据,比如‘/meta/consumers/meta-example/offsets/meta-test/0-0’

 

小节

1.partition由broker指定,同一个broker可以制定多个partition

2.每个partion只能分配给同一个group下的的一个consumer

3.每个consumer可以分配多个partition,订阅多个topic

4.consumer集群变化时执行load balance,重新分配partition

5.提交offset使用异步批量提交 

  • [metaq]Consumer
            
    
    
        metaqpullconsumerkafka
  • 大小: 222.3 KB