【Kafka】Exactly Once语义与事务
kafka在0.11.0.0之前的版本中只支持at least once
和at most once
语义,尚不支持exactly once
语义。
但是在很多要求严格的场景下,如使用kafka处理交易数据,exactly once
语义是必须的。我们可以通过让下游系统具有幂等性来配合kafka的at least once
语义来间接实现exactly once
。但是:
- 该方案要求下游系统支持幂等操作,限制了kafka的适用场景
- 实现门槛相对较高,需要用户对kafka的工作机制非常了解
- 对于kafka stream而言,kafka本身即是自己的下游系统,但kafka在0.11.0.0版本之前不具有幂等发送能力
因此,kafka本身对exactly once
语义的支持就非常必要。
操作原子性
操作的原子性是指,多个操作要么全部成功要么全部失败,不存在部分成功部分失败的可能。
实现原子性操作的意义在于:
- 操作结果更可控,有助于提升数据一致性
- 便于故障恢复。因为操作是原子的,从故障中恢复时只需要重试该操作(如果原操作失败)或者直接跳过该操作(如果原操作成功),而不需要记录中间状态,更不需要针对中间状态作特殊处理
实现事务机制的几个阶段
幂等性发送
上文提到,实现exactly once
的一种方法是让下游系统具有幂等处理特性,而在kafka stream中,kafka producer本身就是“下游”系统,因此如果能让producer具有幂等处理特性,那就可以让kafka stream在一定程度上支持exactly once
语义。
为了实现producer的幂等语义,kafka引入了producer id
(即pid
)和sequence number
。每个新的producer在初始化的时候会被分配一个唯一的pid,该pid对用户完全透明而不会暴露给用户。
对于每个pid,该producer发送数据的每个<topic, partition>
都对应一个从0开始单调递增的sequence number
。
类似地,broker端也会为每个<pid, topic, partition>
维护一个序号,并且每次commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比broker维护的序号(即最后一次commit的消息的序号)大一,则broker会接受它,否则将其丢弃:
- 如果消息序号比broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时broker拒绝该消息,producer抛出
invalidsequencenumber
- 如果消息序号小于等于broker维护的序号,说明该消息已被保存,即为重复消息,broker直接丢弃该消息,producer抛出
duplicatesequencenumber
上述设计解决了0.11.0.0之前版本中的两个问题:
- broker保存消息后,发送ack前宕机,producer认为消息未发送成功并重试,造成数据重复
- 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序
事务性保证
上述幂等设计只能保证单个producer对于同一个<topic, partition>
的exactly once
语义。
另外,它并不能保证写操作的原子性——即多个写操作,要么全部被commit要么全部不被commit。
更不能保证多个读写操作的的原子性。尤其对于kafka stream应用而言,典型的操作即是从某个topic消费数据,经过一系列转换后写回另一个topic,保证从源topic的读取与向目标topic的写入的原子性有助于从故障中恢复。
事务保证可使得应用程序将生产数据和消费数据当作一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或消费跨多个<topic, partition>
。
另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的id,也即transaction id
。transactin id
与pid
可能一一对应。区别在于transaction id
由用户提供,而pid
是内部的实现对用户透明。
另外,为了保证新的producer启动后,旧的具有相同transaction id
的producer即失效,每次producer通过transaction id
拿到pid的同时,还会获取一个单调递增的epoch。由于旧的producer的epoch比新producer的epoch小,kafka可以很容易识别出该producer是老的producer并拒绝其请求。
有了transaction id
后,kafka可保证:
- 跨session的数据幂等发送。当具有相同
transaction id
的新的producer实例被创建且工作时,旧的且拥有相同transaction id
的producer将不再工作。 - 跨session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么commit要么abort,使得新实例从一个正常状态开始工作。
事务机制原理
事务性消息传递
这一节所说的事务主要指原子性,也即producer将多条消息作为一个事务批量发送,要么全部成功要么全部失败。
为了实现这一点,kafka 0.11.0.0引入了一个服务器端的模块,名为transaction coordinator
,用于管理producer发送的消息的事务性。
该transaction coordinator
维护transaction log
,该log存于一个内部的topic内。由于topic数据具有持久性,因此事务的状态也具有持久性。
producer并不直接读写transaction log
,它与transaction coordinator
通信,然后由transaction coordinator
将该事务的状态插入相应的transaction log
。
transaction log
的设计与offset log
用于保存consumer的offset类似。
事务中offset的提交
许多基于kafka的应用,尤其是kafka stream应用中同时包含consumer和producer,前者负责从kafka中获取消息,后者负责将处理完的数据写回kafka的其它topic中。
为了实现该场景下的事务的原子性,kafka需要保证对consumer offset的commit与producer对发送消息的commit包含在同一个事务中。否则,如果在二者commit中间发生异常,根据二者commit的顺序可能会造成数据丢失和数据重复:
- 如果先commit producer发送数据的事务再commit consumer的offset,即
at least once
语义,可能造成数据重复。 - 如果先commit consumer的offset,再commit producer数据发送事务,即
at most once
语义,可能造成数据丢失。
总结
-
pid
与sequence number
的引入实现了写操作的幂等性 - 写操作的幂等性结合
at least once
语义实现了单一session内的exactly once
语义 -
transaction marker
与pid
提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性 - offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(offset)操作的事务处理
- kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的offset的更新进行同样的标记(即
transaction marker
)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见 - kafka只提供对kafka本身的读写操作的事务性,不提供包含外部系统的事务性
出处:
推荐阅读
-
【Kafka】Exactly Once语义与事务
-
Kafka与Spark Streaming集成,如何保证exactly once语义
-
Flink Kafka Connector 与 Exactly Once 剖析 数据结构c工作算法
-
Kafka 幂等性(Exactly-Once处理数据丢失和数据重复)
-
Kafka Exactly-Once 之事务性实现
-
Flink source之与kafka以及exactly-once
-
流处理引擎(SPE)中的的分布式一致性语义之Exactly-Once和Effectively-Onece区别
-
Flink Kafka Connector 与 Exactly Once 剖析 数据结构c工作算法
-
【Kafka】Exactly Once语义与事务
-
Kafka 幂等性(Exactly-Once处理数据丢失和数据重复)