欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

修复flink-jdbc含有schema的表报不存在

程序员文章站 2022-03-28 22:32:09
博客背景​ 最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。环境说明flink版本:1.9.1scala版本:2.12postgresql数据库表结构:ipark_datacenter:​– ods​– t_test(id, name)测试代码import org.apache.flink.api.common.typeinfo.BasicT...

博客背景

最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。

环境说明

flink版本:1.9.1
scala版本:2.12
postgresql数据库表结构:
ipark_datacenter:
​    --ods
        – t_test(id, name)

测试代码

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

/**
 * @Author: maozl
 * @Date: 2019/10/17 17:14
 * @Description:
 */
public class FlinkJdbcDialectBugShow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //jdbc
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://127.0.0.1:65432/ipark_datacenter")
                .setUsername("root")
                .setPassword("admin")
                .setTableName("ods.t_test")
                .build();
        TableSchema tableSchema = TableSchema.builder()
                .fields(new String[]{"id", "name"}, new DataType[]{TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.INT_TYPE_INFO), TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.STRING_TYPE_INFO)})
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();

        tEnv.registerTableSource("userInfo", jdbcTableSource);

        tEnv.scan("userInfo").printSchema();
        Table query = tEnv.sqlQuery("select * from userInfo");
        DataStream<Row> rs = tEnv.toAppendStream(query, Row.class);
        rs.print();

        env.execute();
    }

}

代码执行报错,报错信息如下

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
    at FlinkJdbcDialectBugShow.main(FlinkJdbcDialectBugShow.java:53)
Caused by: java.lang.Exception: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:250)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.postgresql.util.PSQLException: ERROR: relation "ods.t_test" does not exist
  位置:26
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143)
    at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:106)
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:247)
    ... 4 more

报错信息提示 relation “ods.t_test” does not exist,但实际上我已经在ods这个schema下建了t_test这个表的。

Debug排查错误原因

  1. 首先,我在JdbcTableSource里找到下面这行代码。代码里的查询语句,解析成了SELECT “id”, “name” FROM “ods.t_test”。我们看到,这个sql的字段和表名都多了一层引号。并且,我们把这句sql拿到数据库执行,同样会报表不存在
    修复flink-jdbc含有schema的表报不存在
  2. 通过sql解析函数 ,我们进到了JDBCDialect这个类。里面有个quoteIdentifier(tableName)方法,就是用来处理表名的
    修复flink-jdbc含有schema的表报不存在
  3. quoteIdentifier代码内容如下。方法的介绍告诉我们,这个方法是为了防止字段、表名等含有保留字段,或者含有引号,它直接在它外层加上了引号。通过这个方法,我们的表名ods.t_test就变成了"ods.t_test"
    修复flink-jdbc含有schema的表报不存在
  4. 上面的代码,如果针对没有schema的table,是能正常运行的。但如果含有schema,则会报表不存在。知道了原因,解决起来就方便了,我们只需要对表名进行一下处理,然后替换一下获取表名的方法就行了
    default String quoteTablename(String tableName){
        if(tableName.contains(".")){
            String[] strs = tableName.split("\\.");
            return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
        }else {
            return quoteIdentifier(tableName);
        }
    }
    

JDBCDialet 完整代码

package org.apache.flink.api.java.io.jdbc.dialect;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * @Author: maozl01
 * @Date: 2020/7/17 17:14
 * @Description: JDBCDialect bug fix
 */
public interface JDBCDialect extends Serializable {

    /**
     * Check if this dialect instance can handle a certain jdbc url.
     * @param url the jdbc url.
     * @return True if the dialect can be applied on the given jdbc url.
     */
    boolean canHandle(String url);

    /**
     * @return the default driver class name, if user not configure the driver class name,
     * then will use this one.
     */
    default Optional<String> defaultDriverName() {
        return Optional.empty();
    }

    /**
     * Quotes the identifier. This is used to put quotes around the identifier in case the column
     * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
     * Default using double quotes {@code "} to quote.
     */
    default String quoteIdentifier(String identifier) {
        return "\"" + identifier + "\"";
    }

    /**
     * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
     * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
     *
     * @return None if dialect does not support upsert statement, the writer will degrade to
     * the use of select + update/insert, this performance is poor.
     */
    default Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.empty();
    }

    default String quoteTablename(String tableName){
        if(tableName.contains(".")){
            String[] strs = tableName.split("\\.");
            return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
        }else {
            return quoteIdentifier(tableName);
        }
    }
    /**
     * Get row exists statement by condition fields. Default use SELECT.
     */
    default String getRowExistsStatement(String tableName, String[] conditionFields) {
        String fieldExpressions = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));

        return "SELECT 1 FROM " + quoteTablename(tableName) + " WHERE " + fieldExpressions;
    }

    /**
     * Get insert into statement.
     */
    default String getInsertIntoStatement(String tableName, String[] fieldNames) {
        String columns = Arrays.stream(fieldNames)
                .map(this::quoteIdentifier)
                .collect(Collectors.joining(", "));
        String placeholders = Arrays.stream(fieldNames)
                .map(f -> "?")
                .collect(Collectors.joining(", "));
        return "INSERT INTO " + quoteTablename(tableName) +
                "(" + columns + ")" + " VALUES (" + placeholders + ")";
    }

    /**
     * Get update one row statement by condition fields, default not use limit 1,
     * because limit 1 is a sql dialect.
     */
    default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        String setClause = Arrays.stream(fieldNames)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(", "));
        String conditionClause = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "UPDATE " + quoteTablename(tableName) +
                " SET " + setClause +
                " WHERE " + conditionClause;
    }

    /**
     * Get delete one row statement by condition fields, default not use limit 1,
     * because limit 1 is a sql dialect.
     */
    default String getDeleteStatement(String tableName, String[] conditionFields) {
        String conditionClause = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "DELETE FROM " + quoteTablename(tableName) + " WHERE " + conditionClause;
    }

    /**
     * Get select fields statement by condition fields. Default use SELECT.
     */
    default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
        String selectExpressions = Arrays.stream(selectFields)
                .map(this::quoteIdentifier)
                .collect(Collectors.joining(", "));
        String fieldExpressions = Arrays.stream(conditionFields)
                .map(f -> quoteIdentifier(f) + "=?")
                .collect(Collectors.joining(" AND "));
        return "SELECT " + selectExpressions + " FROM " +
                quoteTablename(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
    }
}

本文地址:https://blog.csdn.net/u014730001/article/details/107356393

相关标签: Java 大数据