Flink实战之MysqlCDC To Iceberg
程序员文章站
2022-07-14 12:21:55
...
网上找了一圈没找到一个mysql to Iceberg的实践案例,总结下这几天的踩坑心血历程,希望能提供一些帮助。
现状
目前Iceberg 0.11版本只支持CDC streaming写入,sql 写入以及CDC 读取都是不支持的。
mysql binlog的读取有现成的Connector可用,参考 flink-cdc-connectors。不错demo里面只提供了String的序列化方式,Iceberg需要接受RowData的数据格式。
踩坑记录
java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.types.Row
TypeInformation 构造的不对
Client does not support authentication protocol requested by server; consider upgrading MySQL client
解决方法:https://*.com/questions/50093144/mysql-8-0-client-does-not-support-authentication-protocol-requested-by-server
2021-03-12 14:07:57
org.apache.kafka.connect.errors.ConnectException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782)
at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666)
at io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201)
at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465)
... 3 more
权限问题,如果生产中遇到找DBA处理下
2021-03-15 11:35:01
java.lang.IllegalArgumentException: Cannot write delete files in a v1 table
at org.apache.iceberg.ManifestFiles.writeDeleteManifest(ManifestFiles.java:154)
at org.apache.iceberg.SnapshotProducer.newDeleteManifestWriter(SnapshotProducer.java:365)
at org.apache.iceberg.MergingSnapshotProducer.newDeleteFilesAsManifest(MergingSnapshotProducer.java:480)
at org.apache.iceberg.MergingSnapshotProducer.prepareDeleteManifests(MergingSnapshotProducer.java:469)
at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:358)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:298)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:285)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:210)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:147)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
解决办法:
社区版本现在没有把v1打开,是因为v2还有一些功能不是特别完善。如果用户想用v2展开测试的话,需要通过java API把v1升级到v2,调用方式如下:
Table table = …
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
完整Demo看代码吧。
上一篇: 优先队列的基本用法(初步)
下一篇: 优先队列基本用法
推荐阅读
-
Flink实战之合并小文件
-
Flink与SparkStreaming之Counters& Accumulators累加器双向应用案例实战-Flink牛刀小试
-
Flink实战之MysqlCDC To Iceberg
-
ApacheFlink实战之flink中的背压的处理原理
-
Flink 入门实战之七Source自定义读取Mongodb数据
-
Flink 入门实战之五Source读取kafka数据
-
Flink集成iceberg数据湖之合并小文件
-
数据湖之iceberg系列(六)-flink处理数据
-
数据湖之 Flink Spark 集成 iceberg
-
ApacheFlink实战之flink中的背压的处理原理