spark streaming中维护kafka偏移量到外部介质
程序员文章站
2022-04-14 18:21:45
spark streaming中维护kafka偏移量到外部介质 以kafka偏移量维护到redis为例。 redis存储格式 使用的数据结构为 ,其中key为 ,value为 。 例如 这个 下有3个分区,则key value结构如下: 的偏移量为x 的偏移量为y 的偏移量为z 消费时指定offse ......
spark streaming中维护kafka偏移量到外部介质
以kafka偏移量维护到redis为例。
redis存储格式
使用的数据结构为string
,其中key为topic:partition
,value为offset
。
例如bobo
这个topic
下有3个分区,则key-value结构如下:
-
bobo:0
的偏移量为x -
bobo:1
的偏移量为y -
bobo:2
的偏移量为z
消费时指定offset
主要是如下两个方法:
-
createkafkastream()
创建kakfa流 -
getoffsets()
从redis中获取offsets
/** * kakfa参数 */ private val kafkaparams = map[string, object]( "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667", "key.deserializer" -> classof[stringdeserializer], "value.deserializer" -> classof[stringdeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", // 注意这里是none。 "auto.offset.reset" -> "none", "enable.auto.commit" -> (false: java.lang.boolean) ) // `bobo`topic下有3个分区 private val topicpartitions = map[string, int]("bobo" -> 3) // 从redis中获取offsets def getoffsets: map[topicpartition, long] = { val jedis = internalredisclient.getresource // 设置每个分区起始的offset val offsets = mutable.map[topicpartition, long]() topicpartitions.foreach { it => val topic = it._1 val partitions = it._2 // 遍历分区,设置每个topic下对应partition的offset for (partition <- 0 until partitions) { val topicpartitionkey = topic + ":" + partition var lastoffset = 0l val lastsavedoffset = jedis.get(topicpartitionkey) if (null != lastsavedoffset) { try { lastoffset = lastsavedoffset.tolong } catch { case e: exception => log.error("get lastsavedoffset error", e) system.exit(1) } } log.info("from redis topic: {}, partition: {}, lastoffset: {}", topic, partition, lastoffset) // 添加 offsets += (new topicpartition(topic, partition) -> lastoffset) } } internalredisclient.returnresource(jedis) offsets.tomap } /** * 创建kakfa流 * * @param ssc streamingcontext * @return inputdstream */ def createkafkastream(ssc: streamingcontext): inputdstream[consumerrecord[string, string]] = { val offsets = getoffsets // 创建kafka stream val stream = kafkautils.createdirectstream[string, string]( ssc, locationstrategies.preferconsistent, consumerstrategies.assign[string, string](offsets.keys.tolist, kafkaparams, offsets) ) stream }
其中:核心是通过consumerstrategies.assign
方法来指定topic
下对应partition
的offset
信息。
更新offset到redis
最后将offset信息维护到redis即可。
/** * 消费 * * @param stream inputdstream */ def consume(stream: inputdstream[consumerrecord[string, string]]): unit = { stream.foreachrdd { rdd => // 获取offset信息 val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges // 计算相关指标,这里就统计下条数了 val total = rdd.count() val jedis = internalredisclient.getresource val pipeline = jedis.pipelined() // 会阻塞redis pipeline.multi() // 更新相关指标 pipeline.incrby("totalrecords", total) // 更新offset offsetranges.foreach { offsetrange => log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetrange.topic, offsetrange.partition, offsetrange.untiloffset) val topicpartitionkey = offsetrange.topic + ":" + offsetrange.partition pipeline.set(topicpartitionkey, offsetrange.untiloffset + "") } // 执行,释放 pipeline.exec() pipeline.sync() pipeline.close() internalredisclient.returnresource(jedis) } }
参考
spark代码
顺便贴一下自己整理的spark相关的代码。
github地址:
主要包括:
- rdd的基本使用
- sql
- jdbc(读、写)
- hive(读、写、动态分区)
- streaming
- 消费kafka(手动提交、手动维护offset)
- 写入hbase
- 写入hive
下一篇: java基础-接口