Flink SQL自定义TableSource读取Kudu数据
本文基于Flink1.9,之前文章基于Flink1.6。在Flink的官方文档中提供了很多connector用于连接外部系统数据源。如果提供的connector不能满足需要,还可以通过自定义方式定义读取外部数据源的逻辑。本文的背景就是使用SQL查询批量数据,但是批量数据存储在kudu中,由于没有提供connector所以需要自定义读取数据逻辑。
官方文档中给出了自定义批量数据读取的实现方式:
然而实际API却显示该类已经不推荐使用了:
于是使用InputFormatTableSource抽象类来实现此功能,我们继承该类
package com.xxx.bigdata;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
public class KuduTableSource extends InputFormatTableSource {
@Override
public InputFormat getInputFormat() {
return new KuduInputFormat();
}
@Override
public TableSchema getTableSchema() {
String[] names = {"info"};
DataType[] dataTypes = {DataTypes.STRING()};
TableSchema tableSchema = TableSchema.builder().fields(names, dataTypes).build();
return tableSchema;
}
@Override
public DataType getProducedDataType() {
return getTableSchema().getFieldDataTypes()[0];
}
}
上面的代码中,getTableSchema是定义Table的schema描述,getProducedDataType定义数据输出类型,而父类InputFormatTableSource里面的一个核心抽象方法是:
public abstract InputFormat<T, ?> getInputFormat();
该方法的功能就是读数的逻辑,我们需要重写此方法。
package com.xxx.bigdata;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
public class KuduInputFormat extends GenericInputFormat<String> {
private static Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
private KuduClient client;
private KuduTable table;
private KuduScanner scanner;
private static final String IMPALA_TABLE_PREFIX = "impala::";
private static final String KUDU_MASTER_ADDRESS = "192.168.1.1:7051";
private String tableName = "class";
List<String> columnsList = new ArrayList<>();
private RowResultIterator iterator;
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
client = new KuduClient.KuduClientBuilder(KUDU_MASTER_ADDRESS).build();
table = client.openTable(IMPALA_TABLE_PREFIX + tableName);
columnsList.add("id");
columnsList.add("teacher_id");
scanner = client.newScannerBuilder(table).setProjectedColumnNames(columnsList).build();
iterator = scanner.nextRows();
}
@Override
public boolean reachedEnd() {
return iterator.hasNext() == false;
}
@Override
public String nextRecord(String reuse) {
StringJoiner joiner = new StringJoiner(":");
RowResult rowResult = iterator.next();
String id = String.valueOf(rowResult.getLong("id"));
String teacher_id = String.valueOf(rowResult.getLong("teacher_id"));
joiner.add(id).add(teacher_id);
return joiner.toString();
}
@Override
public GenericInputSplit[] createInputSplits(int numSplits) {
GenericInputSplit[] splits = new GenericInputSplit[1];
splits[0] = new GenericInputSplit(0, 1);
return splits;
}
@Override
public void close() throws IOException {
scanner.close();
client.close();
}
}
1.open用于打开一些连接,如数据库,文件系统等,reachedEnd判断数据是否已经读完,nextRecord用于读取下条数据并返回处理结果数据,createInputSplits方法是创建数据输入分片,这里不重写,默认调用父类的同名方法,产生8个分片,即8个并行度。close里进行一些资源的释放。
2.这里继承的是GenericInputFormat类,其实还有其他的类可以使用:
所有這些类都有一个共有的顶层接口InputFormat:
不同的InputFormat都是基于不同的功能实现不同的方法,有兴趣的可以打开看一下。那么这些方法之间的调用链是怎样的呢?如下所示:
每次读取下一条记录之前都会判断记录是否读完,如果读完直接执行close方法,否则读取下一条记录。这里重点说一下createInputSplits创建输入分片,其实在Kudu(类似HBase)的scan读取中,我们要做到并行读取,在open方法构建scanner时,应该每一个scanner对应一段范围内的rowkey,这样才能并行读取,不重复读取。因此,基于此,需要重写createInputSplits和getInputSplitAssigner,创建scanner的时候也需要做相应的改动,这里不做详细说明,程序入口如下:
package com.xxx.bigdata;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class BatchEngine {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchEnv = BatchTableEnvironment.create(env);
batchEnv.registerTableSource("class", new KuduTableSource());
Table result = batchEnv.sqlQuery("select * from class_room");
batchEnv.toDataSet(result, String.class).print();
batchEnv.execute("kudu job");
}
}
本地测试:
由于程序里没有定义sink,我们只是print了一下,因此会报上述错误,无关紧要。
上一篇: 对XML进行Sax解析
下一篇: Thumbnails