欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink 读取kafka数据,并写入hbase

程序员文章站 2022-07-14 13:15:22
...

概述

环境说明

scala: 2.12.8 linux下scala安装部署
flink : 1.8.1 Flink1.8.1 集群部署
kafka_2.12-2.2.0 kafka_2.12-2.2.0 集群部署
hbase 2.1 hbase 2.1 环境搭建–完全分布式模式 Advanced - Fully Distributed
hadoop Hadoop 2.8.5 完全分布式HA高可用安装(二)–环境搭建

引入依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.5</version>
</dependency> 	
<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>5.0.0-HBase-2.0</version>
</dependency>


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

使用flink读取kafka的数据消息

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "node1:9092");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-test-topic", new SimpleStringSchema(), properties);
    //从最早开始消费
    consumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(consumer);
    stream.print();
    //stream.map();
    env.execute();
}

启动服务:

  1. 启动hadoop集群
  2. 启动hbase集群
  3. 启动kafka集群
  4. 启动flink

执行上述main方法,该main方法会一直监控kafka集群消息。

我们启动kafka客户端来发送几条消息

./kafka-console-producer.sh --broker-list node1:9092 --topic my-test-topic
>111111
>2222

可以看到java程序控制台输出

4> 111111
4> 2222

写入hbase

编写process来完成写入hbase的操作

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

@Slf4j
public class HbaseProcess extends ProcessFunction<String, String> {
    private static final long serialVersionUID = 1L;

    private Connection connection = null;
    private Table table = null;

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        try {
            // 加载HBase的配置
            Configuration configuration = HBaseConfiguration.create();

            // 读取配置文件
            configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
            connection = ConnectionFactory.createConnection(configuration);

            TableName tableName = TableName.valueOf("test");

            // 获取表对象
            table = connection.getTable(tableName);

            log.info("[HbaseSink] : open HbaseSink finished");
        } catch (Exception e) {
            log.error("[HbaseSink] : open HbaseSink faild {}", e);
        }
    }

    @Override
    public void close() throws Exception {
        log.info("close...");
        if (null != table) table.close();
        if (null != connection) connection.close();
    }

    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        try {
            log.info("[HbaseSink] value={}", value);

            //row1:cf:a:aaa
            String[] split = value.split(":");

            // 创建一个put请求,用于添加数据或者更新数据
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3]));
            table.put(put);
            log.error("[HbaseSink] : put value:{} to hbase", value);
        } catch (Exception e) {
            log.error("", e);
        }
    }
}

然后将上面main方法中的stream.print();改为:

stream.process(new HbaseProcess());

运行main方法,然后在kafka控制台发送一条消息row1:cf:a:aaa
到hbase 的shell控制台查看test表数据:

hbase(main):012:0> scan 'test'
ROW                                              COLUMN+CELL                                                                                                                                   
 row1                                            column=cf:a, timestamp=1563880584014, value=aaa                                                                                               
 row1                                            column=cf:age, timestamp=1563779499842, value=12                                                                                              
 row2                                            column=cf:a, timestamp=1563451278532, value=value2a                                                                                           
 row2                                            column=cf:age, timestamp=1563779513308, value=13                                                                                              
 row2                                            column=cf:b, timestamp=1563441738877, value=value2                                                                                            
 row3                                            column=cf:c, timestamp=1563441741609, value=value3

上面第一行aaa就是我们新插入的数据。

当然除了process,也可以使用sink,编写HbaseSink类

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

@Slf4j
public class HbaseSink implements SinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        Connection connection = null;
        Table table = null;
        try {
            // 加载HBase的配置
            Configuration configuration = HBaseConfiguration.create();

            // 读取配置文件
            configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
            connection = ConnectionFactory.createConnection(configuration);

            TableName tableName = TableName.valueOf("test");

            // 获取表对象
            table = connection.getTable(tableName);

            //row1:cf:a:aaa
            String[] split = value.split(":");

            // 创建一个put请求,用于添加数据或者更新数据
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3]));
            table.put(put);
            log.error("[HbaseSink] : put value:{} to hbase", value);
        } catch (Exception e) {
            log.error("", e);
        } finally {
            if (null != table) table.close();
            if (null != connection) connection.close();
        }
    }
}

然后修改main方法代码,运行效果一样的。具体区别后续再分析。


//        stream.print();
//        stream.process(new HbaseProcess());
        stream.addSink(new HbaseSink());
相关标签: hbase flink