欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)

程序员文章站 2022-03-14 19:03:08
...

flink 代码:

注册kafka source 版本:0.10,格式:Json

  Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)

相关依赖,版本为flink1.8.0:

这里要注意flink-json,如果没有此依赖,以上代码块将无法使用Json格式

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

在本地运行没有出现任何问题,打jar到服务器之运行报:

找不到  org.apache.flink.table.factories.TableSourceFactory

(此处不贴报错截图,有可能报错提示信息会有些出处,但万卷不离其宗,还有的可能是  org.apache.flink.table.factories.TableSinkFactory )

原因:

进入org.apache.flink.table.factories.TableFactoryService源码中你可以找到这个类似匹配器的代码块,异常由这里发出:

Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)

查看TableFactoryService的实现,是有KafkaTable的:

Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)

而查看此目录下TableFactory配置文件中并没有Kafka Source及Sink相关Factory:

Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法) Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)

解决方法一: 

找到源码目录 flink-table\flink-table-planner\src\main\resources\META-INF\services\org.apache.flink.table.factories.TableFactory进行更改,添加以下内容并重新编译:

org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory

开始并没有添加 JsonRowFormatFactory,也报出了找不到的异常信息,所以这里直接加上

解决方法二(推荐): 

Flink:注册Table Kafka Source报错处理(Flink Table Kafka Source创建方法)  copy后添加以上内容

添加后打包到服务器正常运行。

 

 

参考: https://www.jianshu.com/p/663899eb905d

相关标签: flink