欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 1.9 Table API问题 读取kafka

程序员文章站 2022-06-16 16:38:38
...

Flink 1.9 API问题
Flink: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath

 

kafka数据格式:

{"user_id": "346146", "item_id":"851377", "category_id": "4789432", "behavior": "pv", "ts": "2017-11-26T01:24:19Z"}
 

错误原因就是环境变量 跟 依赖的原因 ,目前我只是简单的做了实验,本地代码是可以运行的。

1,先上一个1.62版本的代码,可以运行:
 

package com.coder.flink.core.table;
 
 
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
 
 
public class TestDemo {
    public static void main(String[] args) {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.enableCheckpointing(5000);
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
        Kafka kafka = new Kafka()
                .version("0.10")
                .topic("user_behavior")
                .property("bootstrap.servers", "node2.hadoop:9092")
                .property("zookeeper.connect", "node2.hadoop:2181");
        tableEnv.connect(kafka)
                .withFormat(
                        new Json().failOnMissingField(true).deriveSchema()
                )
                .withSchema(
                        new Schema()
                                .field("user_id", Types.INT)
                                .field("item_id", Types.INT)
                                .field("category_id", Types.INT)
                                .field("behavior", Types.STRING)
                                .field("ts", Types.STRING)
                )
                .inAppendMode()
                .registerTableSource("tmp_table");
 
        String sql = "select * from tmp_table";
        Table table = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
        rowDataStream.print();
        table.printSchema();
 
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
 
}

2,Flink 1.9的版本

1)需要注意的是 tableEnv的创建

Flink 1.9 Table API问题 读取kafka

2)官网说了连接kafka 需要2个依赖,坑就在这里,我加入了这两个依赖都不行,需要手动导入,并且注释掉依赖

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#json-format
Flink 1.9 Table API问题 读取kafka

3)手动导入到项目里面

Flink 1.9 Table API问题 读取kafka

3)代码如下

package com.learning.sql;
 
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
 
 
 
public class TestDemo {
    
    public static void main(String[] args) {
 
 
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        Kafka kafka = new Kafka()
                .version("0.10")
                .topic("user_behavior")
                .property("bootstrap.servers", "node2.hadoop:9092")
                .property("zookeeper.connect", "node2.hadoop:2181")
                .startFromLatest();
 
        tableEnv.connect(kafka)
                .withFormat(
                        new Json().failOnMissingField(true).deriveSchema()
                )
                .withSchema(
                        new Schema()
                                .field("user_id", Types.INT)
                                .field("item_id", Types.INT)
                                .field("category_id", Types.INT)
                                .field("behavior", Types.STRING)
                                .field("ts", Types.STRING)
                )
                .inAppendMode()
                .registerTableSource("test")  ;
 
 
 
 
        String sql = "select * from test";
        Table table = tableEnv.sqlQuery(sql);
        DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);
        table.printSchema();
        dataStream.print();
//        tEnv.registerTable("a",table);
//        tEnv.explain(table);
 
 
      /*  CsvTableSink sink = new CsvTableSink(
                "C:\\Users\\Administrator\\Desktop\\",                  // output path
                "|",                   // optional: delimit files by '|'
                1,                     // optional: write to a single file
                FileSystem.WriteMode.OVERWRITE);  // optional: override existing files
        tableEnv.registerTableSink(
                "csvOutputTable",
                // specify table schema
                new String[]{"f0", "f1","f2", "f3","f4"},
                new TypeInformation[]{Types.INT, Types.INT,Types.INT, Types.STRING,Types.STRING},
                sink);*/
        try {
//            table.insertInto("csvOutputTable");
            tableEnv.execute("aa");
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
 
}

4)如果还是报错的话 可能就是你的环境问题了,或者jar包的问题,这个得仔细,附上 依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.github.wuchong</groupId>
    <artifactId>flink-sql-submit</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.9.0</flink.version>
    </properties>

    <dependencies>
        <!-- Flink modules -->
        <!-- CLI dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

       <!-- <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            &lt;!&ndash;<scope>provided</scope>&ndash;&gt;
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.9.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
<!--
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            &lt;!&ndash;<scope>provided</scope>&ndash;&gt;
        </dependency>-->

     <!--   <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>



    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!-- The semantics of this option are reversed, see MCOMPILER-209. -->
                    <useIncrementalCompilation>false</useIncrementalCompilation>
                    <compilerArgs>
                        <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
                        <arg>-Xpkginfo:always</arg>
                    </compilerArgs>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>flink-sql-submit</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>flink-sql-submit</finalName>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.github.wuchong.sqlsubmit.SqlSubmit</mainClass>
                                    <!--<mainClass>com.learning.streaming.WordCount</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>