Kafka与Spark Streaming集成,如何保证exactly once语义
spark streaming集成Kafka时,数据处理的语义很重要,如何保证数据只能被处理一次而不重复?接下来将详细介绍。
一、流处理系统中的三种消息传递语义
- at least once:每条消息会被收到1次或多次
- at most once:每条消息会被收到0次或1次
- exactly once:保证每条消息,只会被处理一次。是最强最精确的语义,也最难实现
一个Spark Streaming程序由三步组成:输入、处理、输出。要达到exactly once的理想状态,需要三步协同进行,而不是只与处理逻辑有关。
二、Kafka输入端
Kafka集成spark streaming有两种方法,旧的基于receiver的方法,新的基于direct stream。由于最新版本的spark(2.4.4)只支持direct,所以就以direct模式为例进行分析。
spark streaming的driver进程每次只需要从Kafka获得批次消息的offset range,然后Executor进程根据offset range去读取该批次的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。
不过,这种模式有一个突出的问题,就是offset的管理。一旦程序崩溃,整个流程只能从earliest或latest点恢复,中间的数据会消费不到,肯定不妥。所以需要自己管理offset,在zookeeper、外部的MySQL、check point中均可。
三、Spark Streaming处理端
spark streaming处理Kafka过来的数据要达到exactly once,即要实现数据只处理一次,不会漏处理也不会多次处理数据。这一点spark就天生具备了,spark RDD是弹性分布式数据集,具有不可变、可分区、并行计算、容错的特征。一个RDD只能由稳定的数据集生成或从其他RDD转换而来。如果在处理RDD的过程中失败,只要源数据不发生变化,执行多少次lineage都是一样的、确定的结果。
四、输出端
最后,spark streaming逻辑处理的结果也要保证exactly once语义。spark streaming的输出一般是用foreachRDD()算子来实现的,它默认是at least once的。即如果foreachRDD输出过程中出错,那么就会重复写入,知道写入成功。为了让它符合exactly once,有两种方法:a.幂等性写入;b.事务性写入。
- 幂等性写入:幂等原来是数学里的概念,即f(f(x)) = f(x)。幂等写入就是多次写入与一次写入的结果完全相同。这样就保证了如果失败多次写入时结果一样了。
// 实现,要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等
//如果条件更严格,只能使用第二种,事务性写入了
stream.foreachRDD { rdd =>
rdd.foreachPartition { iter =>
// make sure connection pool is set up on the executor before writing
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.foreach { case (key, msg) =>
DB.autoCommit { implicit session =>
// the unique key for idempotency is just the text of the message itself, for example purposes
sql"insert into idem_data(msg) values (${msg})".update.apply
}
}
}
}
- 事务性写入:这里的事务与RDBMS中的事务含义基本相同,就是对数据进行一系列的访问与更新操作所组成的逻辑块。事务操作是在foreachRDD()时进行的,如果数据写入失败,或者offset写入时和当前offset range不匹配,那么这一批次的数据全部都将失败且回滚。
//为了符合事务的ACID特性,必须引入一个唯一ID标识当前的处理逻辑,并且将计算结果与该ID一起落盘。ID可以由主题、分区、时间、offset等共同组成。
// localTx is transactional, if metric update or offset update fails, neither will be committed
DB.localTx { implicit session =>
// store metric data
val metricRows = sql"""
update txn_data set metric = metric + ${metric}
where topic = ${osr.topic}
""".update.apply()
if (metricRows != 1) {
throw new Exception("...")
}
上一篇: Spark Streaming核心概念
下一篇: hudi-hive-sync