欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark streaming中维护kafka偏移量到外部介质

程序员文章站 2024-01-05 17:18:46
spark streaming中维护kafka偏移量到外部介质 以kafka偏移量维护到redis为例。 redis存储格式 使用的数据结构为 ,其中key为 ,value为 。 例如 这个 下有3个分区,则key value结构如下: 的偏移量为x 的偏移量为y 的偏移量为z 消费时指定offse ......

spark streaming中维护kafka偏移量到外部介质

以kafka偏移量维护到redis为例。

redis存储格式

使用的数据结构为string,其中key为topic:partition,value为offset

例如bobo这个topic下有3个分区,则key-value结构如下:

  • bobo:0的偏移量为x
  • bobo:1的偏移量为y
  • bobo:2的偏移量为z

消费时指定offset

主要是如下两个方法:

  • createkafkastream()创建kakfa流
  • getoffsets()从redis中获取offsets
/**
  * kakfa参数
  */
private val kafkaparams = map[string, object](
  "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
  "key.deserializer" -> classof[stringdeserializer],
  "value.deserializer" -> classof[stringdeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  // 注意这里是none。
  "auto.offset.reset" -> "none",
  "enable.auto.commit" -> (false: java.lang.boolean)
)

// `bobo`topic下有3个分区
private val topicpartitions = map[string, int]("bobo" -> 3)

// 从redis中获取offsets
def getoffsets: map[topicpartition, long] = {
  val jedis = internalredisclient.getresource

  // 设置每个分区起始的offset
  val offsets = mutable.map[topicpartition, long]()

  topicpartitions.foreach { it =>
    val topic = it._1
    val partitions = it._2
    // 遍历分区,设置每个topic下对应partition的offset
    for (partition <- 0 until partitions) {
      val topicpartitionkey = topic + ":" + partition
      var lastoffset = 0l
      val lastsavedoffset = jedis.get(topicpartitionkey)

      if (null != lastsavedoffset) {
        try {
          lastoffset = lastsavedoffset.tolong
        } catch {
          case e: exception =>
            log.error("get lastsavedoffset error", e)
            system.exit(1)
        }
      }
      log.info("from redis topic: {}, partition: {}, lastoffset: {}", topic, partition, lastoffset)

      // 添加
      offsets += (new topicpartition(topic, partition) -> lastoffset)
    }
  }

  internalredisclient.returnresource(jedis)

  offsets.tomap
}

/**
  * 创建kakfa流
  *
  * @param ssc streamingcontext
  * @return inputdstream
  */
def createkafkastream(ssc: streamingcontext): inputdstream[consumerrecord[string, string]] = {
  val offsets = getoffsets

  // 创建kafka stream
  val stream = kafkautils.createdirectstream[string, string](
    ssc,
    locationstrategies.preferconsistent,
    consumerstrategies.assign[string, string](offsets.keys.tolist, kafkaparams, offsets)
  )
  stream
}

其中:核心是通过consumerstrategies.assign方法来指定topic下对应partitionoffset信息。

更新offset到redis

最后将offset信息维护到redis即可。

/**
  * 消费
  *
  * @param stream inputdstream
  */
def consume(stream: inputdstream[consumerrecord[string, string]]): unit = {
  stream.foreachrdd { rdd =>
    // 获取offset信息
    val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges

    // 计算相关指标,这里就统计下条数了
    val total = rdd.count()

    val jedis = internalredisclient.getresource
    val pipeline = jedis.pipelined()
    // 会阻塞redis
    pipeline.multi()

    // 更新相关指标
    pipeline.incrby("totalrecords", total)

    // 更新offset
    offsetranges.foreach { offsetrange =>
      log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetrange.topic, offsetrange.partition, offsetrange.untiloffset)
      val topicpartitionkey = offsetrange.topic + ":" + offsetrange.partition
      pipeline.set(topicpartitionkey, offsetrange.untiloffset + "")
    }

    // 执行,释放
    pipeline.exec()
    pipeline.sync()
    pipeline.close()
    internalredisclient.returnresource(jedis)
  }
}

参考

spark代码

顺便贴一下自己整理的spark相关的代码。

github地址:

主要包括:

  • rdd的基本使用
  • sql
    • jdbc(读、写)
    • hive(读、写、动态分区)
  • streaming
    • 消费kafka(手动提交、手动维护offset)
    • 写入hbase
    • 写入hive

上一篇:

下一篇: