欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Springboot整合HBase,实现对数据的增删改查

程序员文章站 2022-05-07 12:44:15
...

1.添加pom依赖
在pom.xml文件的中添加以下依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j</artifactId>
            <version>1.3.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.3.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.10.1</version>
        </dependency>
    </dependencies>

2.配置HBase的zookeeper集群地址及端口
在application.properties文件

#数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/XXXXX?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=*******
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

#hbase配置
#hbase:
#conf:
#confMaps:
#'hbase.zookeeper.quorum' : 'ip1:2181,ip2:2181,ip3:2181'
hbase.conf.confMaps.hbase.zookeeper.quorum = 127.0.0.1:2181

3.创建HbaseConfig.java,hbase配置类

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
@ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
public class HbaseConfig {
    public static final String CONF_PREFIX = "hbase.conf";

    private Map<String,String> confMaps;

    public Map<String, String> getconfMaps() {
        return confMaps;
    }
    public void setconfMaps(Map<String, String> confMaps) {
        this.confMaps = confMaps;
    }
    @Bean
    public HBaseUtils instance() {
        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
        //将hbase配置类中定义的配置加载到连接池中每个连接里
        Map<String, String> confMap = getconfMaps();
        for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
            config.set(confEntry.getKey(), confEntry.getValue());
        }
        return new HBaseUtils(config);
    }
}

4.HBaseUtils 工具类主要实现创建HBase的连接和对数据库的增删改查操作

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class HBaseUtils {
    Logger logger = LoggerFactory.getLogger(HBaseUtils.class);

    private Connection connection;
    private static ExecutorService pool = Executors.newScheduledThreadPool(20);    //设置连接池
    private static HBaseUtils instance = null;
    private static Admin admin = null;

    /**
     * 建立连接
     *
     * @param config
     */
    public HBaseUtils(Configuration config) {
        try {
            connection = ConnectionFactory.createConnection(config, pool);
            admin = connection.getAdmin();
        } catch (IOException e) {
            logger.error("建立连接HBase数据库失败", e);
        }
    }

    /**
     * 创建表空间
     *
     * @param namespace
     */
    public void createNamespace(String namespace) {
        try (Admin admin = connection.getAdmin()) {
            NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
            admin.createNamespace(desc);

        } catch (IOException e) {
            logger.error("HBase数据库创建表空间失败", e);
        }
    }

    /**
     * 创建表名
     *
     * @param tableName
     * @param columnFamily
     */
    public void createTable(String tableName, List<String> columnFamily) {
        try (Admin admin = connection.getAdmin()) {
            List<ColumnFamilyDescriptor> cfDescriptor = columnFamily.stream().map(e -> ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(e)).build()).collect(Collectors.toList());
            TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(cfDescriptor).build();
            //判断表名是否存在
            if (admin.tableExists(TableName.valueOf(tableName))) {

            } else {
                admin.createTable(tableDescriptor);
            }
        } catch (IOException e) {
            logger.error("HBase数据库创建表名失败", e);
        }
    }

    /**
     * 向表put数据,单行单列族-多列多值
     *
     * @param tableName 表名
     * @param rowKey    列簇
     * @param data      Map :data.put("cf1", dataMap);
     */
    public void save(String tableName, String rowKey, Map<String, Map<String, String>> data) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            data.entrySet().forEach(e -> e.getValue().entrySet().forEach(ee -> {
                put.addColumn(Bytes.toBytes(e.getKey()), Bytes.toBytes(ee.getKey()), Bytes.toBytes(ee.getValue()));
            }));
            table.put(put);
        } catch (Exception e) {
            logger.error("HBase数据库向表put数据失败", e);
        }
    }

    /**
     * 插入记录(单行单列族-多列多值)
     *
     * @param tableName     表名
     * @param row           行名
     * @param columnFamilys 列族名
     * @param columns       列名(数组)
     * @param values        值(数组)(且需要和列一一对应)
     */
    public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
        TableName name = TableName.valueOf(tableName);
        Table table = connection.getTable(name);
        Put put = new Put(Bytes.toBytes(row));
        for (int i = 0; i < columns.length; i++) {
            put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
            table.put(put);
        }
    }

    /**
     * 插入记录(单行单列族-单列单值)
     *
     * @param tableName    表名
     * @param row          行名
     * @param columnFamily 列族名
     * @param column       列名
     * @param value        值
     */
    public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
        TableName name = TableName.valueOf(tableName);
        Table table = connection.getTable(name);
        Put put = new Put(Bytes.toBytes(row));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
    }

    /**
     * 查询一行数组
     *
     * @param tableName 表名
     * @param rowKey    行
     * @return
     */
    public Map<String, String> getByrowKey(String tableName, String rowKey) {
        Map<String, String> map = new HashMap<>();
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            for (Cell c : result.rawCells()) {
                map.put(Bytes.toString(CellUtil.cloneQualifier(c)), Bytes.toString(CellUtil.cloneValue(c)));
            }

        } catch (Exception e) {
            logger.error("HBase数据库根据表名和列簇名查询数据失败", e);
        }
        return map;
    }

    /**
     * 查找单行单列族单列记录
     *
     * @param tablename    表名
     * @param rowKey       行名
     * @param columnFamily 列族名
     * @param column       列名
     * @return
     */
    public String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Get g = new Get(rowKey.getBytes());
        g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        Result rs = table.get(g);
        return Bytes.toString(rs.value());
    }

    /**
     * 查询表中所有行(Scan方式)
     *
     * @param tablename
     * @return
     */
    public List scanAllRecord(String tablename) throws IOException {
        List list = new ArrayList();
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                for (Cell cell : result.rawCells()) {
                    String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                    String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    list.add(rowKey + "\t" + family + ":" + qualifier + "\t" + value);
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }

        return list;
    }

    /**
     * 删除一行记录
     *
     * @param rowKey 行
     * @throws IOException
     */
    public void deleteRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        table.delete(delete);
    }

    /**
     * 删除单行单列族记录
     *
     * @param tablename    表名
     * @param rowkey       行名
     * @param columnFamily 列族名
     */
    public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));
        table.delete(d);
    }

    /**
     * 删除单行单列族单列记录
     *
     * @param tablename    表名
     * @param rowkey       行
     * @param columnFamily 列族名
     * @param column       列名
     */
    public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(d);
    }

    /**
     * 删除单行单列族多列记录
     *
     * @param tablename    表名
     * @param rowkey       行
     * @param columnFamily 列簇名
     * @param columns      列名数组
     * @throws IOException
     */
    public void deleteColumns(String tablename, String rowkey, String columnFamily, String[] columns) throws IOException {
        TableName tableName = TableName.valueOf(tablename);
        Table table = connection.getTable(tableName);
        for (int i = 0; i < columns.length; i++) {
            new Delete(rowkey.getBytes()).addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]));
        }
    }
}

5.测试

public Map<String, String> getPut() {
//        putservice.getPut();
        //创建表
//        List<String> cf = new ArrayList<>();
//        cf.add("cf1");
//        hBaseUtils.createTable("t2",cf);
        //插入数据,可插入多族多列数据
        Map<String, Map<String, String>> data = new HashMap<>();
        Map<String, String> name = new HashMap<>();
        name.put("chName", "张三1");
        name.put("enName", "micheal");
        name.put("nickName", "小张1");
        data.put("cf1", name);
        hBaseUtils.save("t2","1",data);
        Map<String, String> returnMap = hBaseUtils.getByrowKey("t2", "1");
        try {
            //查询单行单列族单列记录
            String selectValue = hBaseUtils.selectValue("t2", "1", "cf1", "chName");
            System.out.println(selectValue);
            List list = hBaseUtils.scanAllRecord("t2");
            for (Object value : list) {
                System.out.println(value.toString());
            }

            //删除数据
//         hBaseUtils.deleteColumnFamily("t2","1","cf1");
        } catch (IOException e) {
            e.printStackTrace();
        }
        return returnMap;

    }

6.报错及解决
出现java.io.IOException: java.lang.reflect.InvocationTargetException错误,分析原因是因为maven引入的jar包版本不兼容。
解决方法:修改为安装Hadoop和HBase版本的jar引入