欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 消费kafka数据写入hbase

程序员文章站 2022-07-14 13:18:57
...

@羲凡——只为了更好的活着

Flink 消费kafka数据写入hbase

一.前提准备

1.创建Hbase表

create 'test_20191122','info'

2.pom.xml文件中要添加依赖

<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-client</artifactId>
	<version>${hbase.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-server</artifactId>
	<version>${hbase.version}</version>
</dependency>

二.直接上代码

package pulsar;

import com.hoperun.flink.sink.hbase.connector.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkConsumeKafkaWriteHbase {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConsumeKafkaWriteHbase.class);
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        // kafka 参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "cdh01.com:9092,cdh02.com:9092,cdh03.com:9092");
        properties.setProperty("group.id", "test227");
        properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        properties.setProperty("max.poll.records","1000");
        properties.setProperty("max.partition.fetch.bytes","5242880");
        // kafka消费者
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
                "testtopic",
                new SimpleStringSchema(),
                properties);
        consumer.setStartFromLatest();
        // 生成DateStream ,并做简单的wordcount计算
        DataStreamSource<String> text = env.addSource(consumer,"Kafka").setParallelism(1);
        DataStream<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] arr = str.split(" ");
                for (String s : arr) {
                    collector.collect(new Tuple2<>(s, 1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(3)).sum(1);

        sum.print();

        // 写入 hbase
        Map<String, String> userConfig = new HashMap<>();
        userConfig.put("hbase.zookeeper.quorum","cdh01.com,cdh02.com,cdh03.com");
        userConfig.put("hbase.zookeeper.property.clientPort","2181");
        userConfig.put("bulk.flush.interval.ms","1000");
        sum.addSink(new HBaseSink<>(userConfig,
                new HBaseSinkFunction<Tuple2<String, Integer>>() {
                    @Override
                    public void process(Tuple2<String, Integer> element, RuntimeContext ctx, RequestPutter putter) {
                        byte[] rowKey = Bytes.toBytes(element.f0);
                        byte[] cf = Bytes.toBytes("info");
                        byte[] qualifier = Bytes.toBytes("num");

                        Put put = new Put(rowKey);
                        put.addColumn(cf, qualifier, Bytes.toBytes(element.f1.toString()));
                        put.addColumn(cf,Bytes.toBytes("test") , Bytes.toBytes("227"));

                        putter.add(new PutRequest(TableName.valueOf("test_20191122"),put));
                    }
                },
                new PutRequestFailureHandler() {
                    @Override
                    public void onFailure(PutRequest entity, Throwable failure, int restStatusCode, RequestPutter putter) throws Throwable {
                        try {
                            String tableName = entity.getTableName().getNameAsString();
                            for (Cell cell : entity.getPut().getFamilyCellMap().get(Bytes.toBytes("info"))) {
                                String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                                String value = Bytes.toString(CellUtil.cloneValue(cell));
                                if(value != null){
                                    LOG.error("HBase数据写入异常。Table:{}, RowKey:{}, Qualifier:{}, value:{}", tableName, rowKey, qualifier, value);
                                    LOG.error("异常消息:", failure);
                                } else {
                                    return;
                                }
                            }
                        } catch (Exception e) {
                            LOG.error("HBase错误处理异常:", e);
                            LOG.error("异常处理数据:", entity);
                        }
                    }
                }));
        env.execute("FlinkConsumeKafkaWriteHbase");
    }
}

三.结果展示

hbase(main):005:0> scan 'test_20191122'
ROW                      COLUMN+CELL
 aaa                     column=info:num, timestamp=1574390105876, value=4
 aaa                     column=info:test, timestamp=1574390105876, value=227
 bbb                     column=info:num, timestamp=1574390105876, value=2
 bbb                     column=info:test, timestamp=1574390105876, value=227
 ccc                     column=info:num, timestamp=1574390105876, value=2
 ccc                     column=info:test, timestamp=1574390105876, value=227
 ddd                     column=info:num, timestamp=1574390105876, value=2
 ddd                     column=info:test, timestamp=1574390105876, value=227
4 row(s) in 0.0380 seconds

====================================================================

@羲凡——只为了更好的活着

若对博客中有任何问题,欢迎留言交流