[metaq]Consumer metaqpullconsumerkafka
Metaq是一个类是kafka的消息系统,开源地址https://github.com/killme2008/Metamorphosis。
基于Pull的消息系统,consumer端保持了很多逻辑,比如当前拉取消息的offset,loadbalance等,使用zookeeper作为coordination。
简单类图
核心类ZKLoadRebalanceListener,负责集群感知,当有broker退出或consumer退出时,重新balance。
FetchManager是具体的worker线程,负责从处理FetchRequest,从某一个partition拉去数据,并回掉业务方的接口实现
其实现流程
1.注册consumer id
/meta/consumers/meta-example/ids新建临时节点,data为topicString,就是这个consumer订阅了那些topic
2.loadBalanceListener监听/meta/consumers/meta-example/ids children consumer数目变化
3.loadBalanceListener监听/meta/brokers/topics-sub/meta-test children分区变化
4.主动balance,分区和同组consumer之间均分
4.1 /meta/consumers/meta-example/ids下读取自己刚才写入的data,topics列表,构造一个myConsumerPerTopicMap,topic-->consumer
4.2 /meta/brokers/topics-sub/ids下读取所有brokers信息,成为Cluster
4.3 /meta/consumers/meta-example/ids读取所有consumer
4.4 /meta/consumers/meta-example/ids读取consumer的data,topics列表,构造一个consumersPerTopicMap,topic-->consumer list
4.5 拿自己订阅的topics的所有分区,partitionsPerTopicMap,topic-->partition list
4.5.1 /meta/brokers/topics-sub/meta-test读取所有children
4.5.2 每个child代表一个broker,其data为‘{"broker":"0-m","numParts":2}’,保存broker的分区信息
4.5.3 构造一个partList代表这个topic的所有分区信息,单个分区类是‘0-0’,‘0-1’
4.6 看看自己订阅的topic里,有哪些topic的broker或者分区有变化,初始化时old为空,都不一样,返回relevantTopicConsumerIdMap,topic-->consumer
4.7 暂停Fetcher
4.8 提交offset
4.9 对于每个变化的topic,根据负载均衡策略获取这个consumer对应的partition列表
4.10 查看当前这个topic的分区列表,查看是否有变更,topicRegistry中
4.11 新分配的分区添加,/meta/consumers/meta-example/owners/meta-test下创建临时节点,名称为分区名‘0-0’,data为consumerId
4.12 拿offset信息,/meta/consumers/meta-example/offsets/meta-test/0-0的data,内容如‘251537821439885312-545’,前为msgId,后为offset
4.13 初始化时,认为offset为0
4.14 分配给自己的分区随机取master或slave的一个读,master和slave的brokerId一样
4.15 添加FetchRequest到FetchManager
4.16 给需要请求的broker创立连接
4.17 关闭之前创建的无用的连接
4.18 启动fetch线程
5.FetchRequestRunner启动,开始拉数据
5.1 组装GetCommand,发送,戴上offset信息,每次取1M
5.2 返回的数据弄一个MessageIterator包装下
6 数据处理
6.1 判断FetchRequest的重试次数是否超过限制,默认5次,超过则存本地,跳过这跳消息,继续消费
6.2 decodeMessage成message对象
6.3 调用业务接口
6.4 消息处理完后,ack request,修改内存中的TopicPartitionRegInfo的offset信息,后续Timer线程会commit
6.5 重新添加请求,继续拉取数据
Timer线程扫描topicRegistry中的所有topic的offset内存数据,修改对应zk中的offset节点中数据,比如‘/meta/consumers/meta-example/offsets/meta-test/0-0’
小节
1.partition由broker指定,同一个broker可以制定多个partition
2.每个partion只能分配给同一个group下的的一个consumer
3.每个consumer可以分配多个partition,订阅多个topic
4.consumer集群变化时执行load balance,重新分配partition
5.提交offset使用异步批量提交