Flink读写系列之-读HBase并写入HBase
程序员文章站
2022-03-08 09:12:57
...
这里读HBase提供两种方式,一种是继承RichSourceFunction,重写父类方法,一种是实现OutputFormat接口,具体代码如下:
方式一:继承RichSourceFunction
package com.my.flink.utils.streaming.hbase;
import com.my.flink.utils.config.ConfigKeys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
/**
* @Description hbase reader
* @Author jiangxiaozhi
* @Date 2018/10/17 10:05
**/
public class HBaseReader extends RichSourceFunction<Tuple2<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HBaseReader.class);
private Connection conn = null;
private Table table = null;
private Scan scan = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = HBaseConnection.getHBaseConn();
table = conn.getTable(TableName.valueOf(ConfigKeys.HBASE_SOURCE_TABLE()));
scan = new Scan();
scan.setStartRow(Bytes.toBytes("1001"));
scan.setStopRow(Bytes.toBytes("1004"));
scan.addFamily(Bytes.toBytes(ConfigKeys.HBASE_SOURCE_CF()));
}
@Override
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
ResultScanner rs = table.getScanner(scan);
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
String rowkey = Bytes.toString(result.getRow());
StringBuffer sb = new StringBuffer();
for (Cell cell : result.listCells()) {
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
sb.append(value).append(",");
}
String valueString = sb.replace(sb.length() - 1, sb.length(), "").toString();
Tuple2<String, String> tuple2 = new Tuple2<>();
tuple2.setFields(rowkey, valueString);
ctx.collect(tuple2);
}
}
@Override
public void cancel() {
try {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
} catch (IOException e) {
logger.error("Close HBase Exception:", e.toString());
}
}
}
代码和Flink读写系列之-读mysql并写入mysql类似,具体说明可前往查看,
方式二:重写TableInputFormat方法
env.createInput(new TableInputFormat[org.apache.flink.api.java.tuple.Tuple2[String, String]] {
override def mapResultToTuple(r: Result): org.apache.flink.api.java.tuple.Tuple2[String, String] = {
val rowkey = Bytes.toString(r.getRow)
val sb = new StringBuffer()
for (cell: Cell <- r.rawCells()) {
val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
sb.append(value).append(",")
}
val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString
val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]
tuple2.setField(rowkey, 0)
tuple2.setField(valueString, 1)
tuple2
}
override def getTableName: String = HBASE_SOURCE_TABLE
override def getScanner: Scan = {
scan
}
override def configure(parameters: Configuration): Unit = {
val conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_QUORUM)
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, ZOOKEEPER_CLIENT_PORT)
conn = ConnectionFactory.createConnection(conf)
table = classOf[HTable].cast(conn.getTable(TableName.valueOf(HBASE_SOURCE_TABLE)))
scan = new Scan() {
setStartRow(Bytes.toBytes("1001"))
setStopRow(Bytes.toBytes("1004"))
addFamily(Bytes.toBytes(HBASE_SOURCE_CF))
}
}
override def close() = {
if (table != null) {
table.close()
}
if (conn != null) {
conn.close()
}
}
})
上面的env是StreamExecutionEnvironment。
写入HBase也有两种方法,其中写入和写入mysql类似,这里重点说明实现OutputFormat接口进行写入:
package com.my.flink.utils.streaming.hbase;
import com.my.flink.utils.config.ConfigKeys;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @Description HBaseOutputFormat
* @Author jiangxiaozhi
* @Date 2018/10/16 14:06
**/
public class HBaseOutputFormat implements OutputFormat<Tuple2<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HBaseOutputFormat.class);
private org.apache.hadoop.conf.Configuration conf = null;
private Connection conn = null;
private Table table = null;
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
conn = HBaseConnection.getHBaseConn();
table = conn.getTable(TableName.valueOf(ConfigKeys.HBASE_SINK_TABLE()));
}
@Override
public void writeRecord(org.apache.flink.api.java.tuple.Tuple2<String, String> record) throws IOException {
Put put = new Put(Bytes.toBytes(record.f0));
put.addColumn(Bytes.toBytes(ConfigKeys.HBASE_SINK_CF()), Bytes.toBytes("test1"), Bytes.toBytes(record.f1));
table.put(put);
}
@Override
public void close() throws IOException {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
}
}
在使用时:
读取HBase:
1.如果是HBaseReader
env.addSource(new HBaseReader())//产生DataStream
2.如果是TableInputFormat,产生DataStream见方式二,env.createInput.....
存储到HBase:
1.如果是HBaseWritter
env.addSource(new HBaseWriter())//HBaseWritter没有实现,可参看JdbcWriter
2.dataStream.writeUsingOutputFormat(new HBaseOutputFormat())