kafka数据到hudi丢失数据问题
程序员文章站
2022-07-14 20:30:25
...
kafka数据到hudi丢失数据问题
1.报错问题
Caused by: java.lang.IllegalStateException: Cannot fetch offset 196 (GroupId: spark-kafka-source-6f1df211-fdcb-4bcc-813d-55c4f9661c9d-1732697149-executor, TopicPartition: news-0).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:642)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss(KafkaDataConsumer.scala:448)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:269)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
-
翻译结果
最终应用程序状态:失败,exitCode:15,(原因:用户类引发异常:org.apache.spark.sql.streaming.StreamingQueryException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近的失败:阶段2.0中的任务0.3丢失(TID 5,hadoop,executor 1):java.lang.IllegalStateException:无法获取偏移量196(GroupId:spark-kafka-source-e2868915-6d7a-4aef-99a8-3d1c5ef45147-1732697149-executor,主题分区:news-0)。
一些数据可能已经丢失,因为它们在卡夫卡不再可用;要么是数据被卡夫卡过时了,要么是主题在处理完主题中的所有数据之前被删除了。如果您不希望流式查询在这种情况下失败,请将源选项“failOnDataLoss”设置为“false”。
2.根据提示添加配置文件 -> option(“failOnDataLoss”,“false”)
//5.读取Kafka源数据
val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", params.brokerList)
.option("subscribe", params.topic)
.option("startingOffsets", "latest")
.option("kafka.consumer.commit.groupid", "action-log-group01")
.option("failOnDataLoss","false")
.load()
tips:认为添加这个配置不太妥当,但尚未找到适宜的方法
哪位博主知道的,希望可以指点指点
上一篇: require() - NodeJS
下一篇: 清除require引入的js缓存
推荐阅读
-
MsSQL数据导入到Mongo的默认编码问题(正确导入Mongo的方法)
-
关于VS2005中C#代码用F12转到定义时,总是显示从元数据的问题
-
MFC连接数据库时,无法启动程序,计算机丢失libmysql.dll的问题解决办法
-
Oracle数据库从入门到精通 单行函数问题
-
详解vue2父组件传递props异步数据到子组件的问题
-
$.ajax传JSON数据到后台出现报错问题解决
-
解决表单提交的数据丢失问题
-
Python中LOADDATAINFILE语句导入数据到MySQL遇到问题的解决方案分享
-
挽回声誉 谷歌发布Chrome 79回炉重造版:修复安卓用户数据丢失问题
-
解决vue刷新页面以后丢失store的数据问题