Flink 1.9 Table API问题 读取kafka
程序员文章站
2022-06-16 16:38:38
...
Flink 1.9 API问题
Flink: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath
kafka数据格式:
{"user_id": "346146", "item_id":"851377", "category_id": "4789432", "behavior": "pv", "ts": "2017-11-26T01:24:19Z"}
错误原因就是环境变量 跟 依赖的原因 ,目前我只是简单的做了实验,本地代码是可以运行的。
1,先上一个1.62版本的代码,可以运行:
package com.coder.flink.core.table;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class TestDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// env.enableCheckpointing(5000);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Kafka kafka = new Kafka()
.version("0.10")
.topic("user_behavior")
.property("bootstrap.servers", "node2.hadoop:9092")
.property("zookeeper.connect", "node2.hadoop:2181");
tableEnv.connect(kafka)
.withFormat(
new Json().failOnMissingField(true).deriveSchema()
)
.withSchema(
new Schema()
.field("user_id", Types.INT)
.field("item_id", Types.INT)
.field("category_id", Types.INT)
.field("behavior", Types.STRING)
.field("ts", Types.STRING)
)
.inAppendMode()
.registerTableSource("tmp_table");
String sql = "select * from tmp_table";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
rowDataStream.print();
table.printSchema();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2,Flink 1.9的版本
1)需要注意的是 tableEnv的创建
2)官网说了连接kafka 需要2个依赖,坑就在这里,我加入了这两个依赖都不行,需要手动导入,并且注释掉依赖
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#json-format
3)手动导入到项目里面
3)代码如下
package com.learning.sql;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class TestDemo {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
Kafka kafka = new Kafka()
.version("0.10")
.topic("user_behavior")
.property("bootstrap.servers", "node2.hadoop:9092")
.property("zookeeper.connect", "node2.hadoop:2181")
.startFromLatest();
tableEnv.connect(kafka)
.withFormat(
new Json().failOnMissingField(true).deriveSchema()
)
.withSchema(
new Schema()
.field("user_id", Types.INT)
.field("item_id", Types.INT)
.field("category_id", Types.INT)
.field("behavior", Types.STRING)
.field("ts", Types.STRING)
)
.inAppendMode()
.registerTableSource("test") ;
String sql = "select * from test";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);
table.printSchema();
dataStream.print();
// tEnv.registerTable("a",table);
// tEnv.explain(table);
/* CsvTableSink sink = new CsvTableSink(
"C:\\Users\\Administrator\\Desktop\\", // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
FileSystem.WriteMode.OVERWRITE); // optional: override existing files
tableEnv.registerTableSink(
"csvOutputTable",
// specify table schema
new String[]{"f0", "f1","f2", "f3","f4"},
new TypeInformation[]{Types.INT, Types.INT,Types.INT, Types.STRING,Types.STRING},
sink);*/
try {
// table.insertInto("csvOutputTable");
tableEnv.execute("aa");
} catch (Exception e) {
e.printStackTrace();
}
}
}
4)如果还是报错的话 可能就是你的环境问题了,或者jar包的问题,这个得仔细,附上 依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.wuchong</groupId>
<artifactId>flink-sql-submit</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.9.0</flink.version>
</properties>
<dependencies>
<!-- Flink modules -->
<!-- CLI dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<!–<scope>provided</scope>–>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.0</version>
<!--<scope>provided</scope>-->
</dependency>
<!--
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<!–<scope>provided</scope>–>
</dependency>-->
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
<arg>-Xpkginfo:always</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>flink-sql-submit</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>flink-sql-submit</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.github.wuchong.sqlsubmit.SqlSubmit</mainClass>
<!--<mainClass>com.learning.streaming.WordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
上一篇: JDK8新特性一览
下一篇: 六、编写高质量的代码—枚举和注解(笔记)