修复flink-jdbc含有schema的表报不存在
程序员文章站
2022-03-28 22:32:09
博客背景 最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。环境说明flink版本:1.9.1scala版本:2.12postgresql数据库表结构:ipark_datacenter:– ods– t_test(id, name)测试代码import org.apache.flink.api.common.typeinfo.BasicT...
博客背景
最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。
环境说明
flink版本:1.9.1
scala版本:2.12
postgresql数据库表结构:
ipark_datacenter:
--ods
– t_test(id, name)
测试代码
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
/**
* @Author: maozl
* @Date: 2019/10/17 17:14
* @Description:
*/
public class FlinkJdbcDialectBugShow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//jdbc
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://127.0.0.1:65432/ipark_datacenter")
.setUsername("root")
.setPassword("admin")
.setTableName("ods.t_test")
.build();
TableSchema tableSchema = TableSchema.builder()
.fields(new String[]{"id", "name"}, new DataType[]{TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.INT_TYPE_INFO), TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.STRING_TYPE_INFO)})
.build();
JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();
tEnv.registerTableSource("userInfo", jdbcTableSource);
tEnv.scan("userInfo").printSchema();
Table query = tEnv.sqlQuery("select * from userInfo");
DataStream<Row> rs = tEnv.toAppendStream(query, Row.class);
rs.print();
env.execute();
}
}
代码执行报错,报错信息如下
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at FlinkJdbcDialectBugShow.main(FlinkJdbcDialectBugShow.java:53)
Caused by: java.lang.Exception: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
位置:26
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
位置:26
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:250)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.postgresql.util.PSQLException: ERROR: relation "ods.t_test" does not exist
位置:26
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143)
at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:106)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:247)
... 4 more
报错信息提示 relation “ods.t_test” does not exist,但实际上我已经在ods这个schema下建了t_test这个表的。
Debug排查错误原因
- 首先,我在JdbcTableSource里找到下面这行代码。代码里的查询语句,解析成了SELECT “id”, “name” FROM “ods.t_test”。我们看到,这个sql的字段和表名都多了一层引号。并且,我们把这句sql拿到数据库执行,同样会报表不存在
- 通过sql解析函数 ,我们进到了JDBCDialect这个类。里面有个quoteIdentifier(tableName)方法,就是用来处理表名的
- quoteIdentifier代码内容如下。方法的介绍告诉我们,这个方法是为了防止字段、表名等含有保留字段,或者含有引号,它直接在它外层加上了引号。通过这个方法,我们的表名ods.t_test就变成了"ods.t_test"
- 上面的代码,如果针对没有schema的table,是能正常运行的。但如果含有schema,则会报表不存在。知道了原因,解决起来就方便了,我们只需要对表名进行一下处理,然后替换一下获取表名的方法就行了
default String quoteTablename(String tableName){ if(tableName.contains(".")){ String[] strs = tableName.split("\\."); return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining(".")); }else { return quoteIdentifier(tableName); } }
JDBCDialet 完整代码
package org.apache.flink.api.java.io.jdbc.dialect;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @Author: maozl01
* @Date: 2020/7/17 17:14
* @Description: JDBCDialect bug fix
*/
public interface JDBCDialect extends Serializable {
/**
* Check if this dialect instance can handle a certain jdbc url.
* @param url the jdbc url.
* @return True if the dialect can be applied on the given jdbc url.
*/
boolean canHandle(String url);
/**
* @return the default driver class name, if user not configure the driver class name,
* then will use this one.
*/
default Optional<String> defaultDriverName() {
return Optional.empty();
}
/**
* Quotes the identifier. This is used to put quotes around the identifier in case the column
* name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
* Default using double quotes {@code "} to quote.
*/
default String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
/**
* Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
default String quoteTablename(String tableName){
if(tableName.contains(".")){
String[] strs = tableName.split("\\.");
return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
}else {
return quoteIdentifier(tableName);
}
}
/**
* Get row exists statement by condition fields. Default use SELECT.
*/
default String getRowExistsStatement(String tableName, String[] conditionFields) {
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "SELECT 1 FROM " + quoteTablename(tableName) + " WHERE " + fieldExpressions;
}
/**
* Get insert into statement.
*/
default String getInsertIntoStatement(String tableName, String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(f -> "?")
.collect(Collectors.joining(", "));
return "INSERT INTO " + quoteTablename(tableName) +
"(" + columns + ")" + " VALUES (" + placeholders + ")";
}
/**
* Get update one row statement by condition fields, default not use limit 1,
* because limit 1 is a sql dialect.
*/
default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
String setClause = Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(", "));
String conditionClause = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "UPDATE " + quoteTablename(tableName) +
" SET " + setClause +
" WHERE " + conditionClause;
}
/**
* Get delete one row statement by condition fields, default not use limit 1,
* because limit 1 is a sql dialect.
*/
default String getDeleteStatement(String tableName, String[] conditionFields) {
String conditionClause = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "DELETE FROM " + quoteTablename(tableName) + " WHERE " + conditionClause;
}
/**
* Get select fields statement by condition fields. Default use SELECT.
*/
default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
quoteTablename(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
}
本文地址:https://blog.csdn.net/u014730001/article/details/107356393