欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka与Spark Streaming集成,如何保证exactly once语义

程序员文章站 2022-07-14 21:48:16
...


      spark streaming集成Kafka时,数据处理的语义很重要,如何保证数据只能被处理一次而不重复?接下来将详细介绍。

一、流处理系统中的三种消息传递语义

  • at least once:每条消息会被收到1次或多次
  • at most once:每条消息会被收到0次或1次
  • exactly once:保证每条消息,只会被处理一次。是最强最精确的语义,也最难实现
          一个Spark Streaming程序由三步组成:输入、处理、输出。要达到exactly once的理想状态,需要三步协同进行,而不是只与处理逻辑有关。

二、Kafka输入端

      Kafka集成spark streaming有两种方法,旧的基于receiver的方法,新的基于direct stream。由于最新版本的spark(2.4.4)只支持direct,所以就以direct模式为例进行分析。
      spark streaming的driver进程每次只需要从Kafka获得批次消息的offset range,然后Executor进程根据offset range去读取该批次的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。
      不过,这种模式有一个突出的问题,就是offset的管理。一旦程序崩溃,整个流程只能从earliest或latest点恢复,中间的数据会消费不到,肯定不妥。所以需要自己管理offset,在zookeeper、外部的MySQL、check point中均可。

三、Spark Streaming处理端

      spark streaming处理Kafka过来的数据要达到exactly once,即要实现数据只处理一次,不会漏处理也不会多次处理数据。这一点spark就天生具备了,spark RDD是弹性分布式数据集,具有不可变、可分区、并行计算、容错的特征。一个RDD只能由稳定的数据集生成或从其他RDD转换而来。如果在处理RDD的过程中失败,只要源数据不发生变化,执行多少次lineage都是一样的、确定的结果。

四、输出端

      最后,spark streaming逻辑处理的结果也要保证exactly once语义。spark streaming的输出一般是用foreachRDD()算子来实现的,它默认是at least once的。即如果foreachRDD输出过程中出错,那么就会重复写入,知道写入成功。为了让它符合exactly once,有两种方法:a.幂等性写入;b.事务性写入。

  • 幂等性写入:幂等原来是数学里的概念,即f(f(x)) = f(x)。幂等写入就是多次写入与一次写入的结果完全相同。这样就保证了如果失败多次写入时结果一样了。
// 实现,要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等
//如果条件更严格,只能使用第二种,事务性写入了
 stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }
  • 事务性写入:这里的事务与RDBMS中的事务含义基本相同,就是对数据进行一系列的访问与更新操作所组成的逻辑块。事务操作是在foreachRDD()时进行的,如果数据写入失败,或者offset写入时和当前offset range不匹配,那么这一批次的数据全部都将失败且回滚。
//为了符合事务的ACID特性,必须引入一个唯一ID标识当前的处理逻辑,并且将计算结果与该ID一起落盘。ID可以由主题、分区、时间、offset等共同组成。
// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }

参考:https://www.jianshu.com/p/10de8f3b1be8

相关标签: BigData