欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

HBase海量数据存储

程序员文章站 2022-06-28 11:38:26
1.简介 HBase是一个基于HDFS的、分布式的、面向列的非关系型数据库。 HBase的特点 1.海量数据存储,HBase表中的数据能够容纳上百亿行*上百万列。 2.面向列的存储,数据在表中是按照列进行存储的,能够动态的增加列并对列进行各种操作。 3.准实时查询,HBase在海量的数据量下能够接近 ......

1.简介

 

 HBase海量数据存储

 

hbase是一个基于hdfs的、分布式的、面向列的非关系型数据库。

 

hbase的特点

1.海量数据存储,hbase表中的数据能够容纳上百亿行*上百万列。

2.面向列的存储,数据在表中是按照列进行存储的,能够动态的增加列并对列进行各种操作。

3.准实时查询,hbase在海量的数据量下能够接近准实时的查询(百毫秒以内)

4.多版本,hbase中每一列的数据都可以有多个版本。

5.可靠性,hbase中的数据存储于hdfs中且依赖于zookeeper进行master和regionserver的协调管理。 

 

hbase与关系型数据库的区别

1.hbase中的数据类型只有string,而关系型数据库中有char、varchar、int等。

2.hbase中只有普通的增删改查操作,没有表与表之间的连接、子查询等,若想要在hbase中进行复杂的操作则应该使用phoenix。

3.hbase是基于列进行存储的,因此在查询指定列的数据时效率会很高,而关系型数据库是基于行存储,每次查询都要查询整行。

4.hbase适合海量数据存储,而关系型数据库一般一张表不超过500m,否则就要考虑分表操作。

5.hbase中为空的列不占用存储空间,表的设计可以非常稀疏,而关系型数据库中表的设计较谨密。

 

 

2.hbase的表结构

 

 

HBase海量数据存储

 

*hbase中的表由rowkey、columnfamily、column、timestamp组成。

 

rowkey

记录的唯一标识,相当于关系型数据库中的主键。

*rowkey最大长度为64kb且按字典顺序进行排序存储。

 

columnfamily

列簇相当于特定的一个类别,每个列簇下可以有任意数量个列,并且列是动态进行添加的,只在插入数据后存在,hbase在创建表时只需要指定表名和列簇即可。

*一个列簇下的成员有着相同的前缀,使用冒号来对列簇和列名进行分隔。

*一张表中的列簇最好不超过5个。

 

column

列只有在插入数据后才存在,且列在列簇中是有序的。

*每个列簇下的列数没有限制。

 

timestamp

hbase中的每个键值对都有一个时间戳,在进行插入时由hbase进行自动赋值。

 

 

3.hbase的物理模型

 

 

 

 

HBase海量数据存储

 

 

master

1.处理对表的添加、删除、查询等操作。

2.进行regionserver的负载均衡(region与regionserver的分配)

3.在regionserver宕机后负责regionserver上的region转移(通过wal日志)

*master失效仅会导致meta数据和表无法被修改,表中的数据仍然可以进行读取和写入。

 

regionserver

1.处理对表中数据的添加、删除、修改、查询等操作。

2.维护region并将region中storefile写入到hdfs中。

3.当region中的数据达到一定大小时进行region的切分。

 

region

1.表中的数据存储在region中,每个region都由regionserver进行管理。

2.每个region都包含memorystore和storefile,memorystore中的数据位于内存,每当memorystore中的数据达到128m时将会生成一个storefile并写入到hdfs中。

3.region中每个列簇对应一个memorystore,可以有多个storefile,当多个storefile的文件大小超过一定时,会进行storefile的合并,将多个storefile文件合并成一个storefile,当storefile中的大小超过一定阀值时,会进行region的切分,由master将新region分配到相应的regionserver中,实现负载均衡。

  

zookeeper在hbase中的作用

1.保证master的高可用性,当状态为active的master无法提供服务时,会立刻将状态为standby的master切换为active状态。

2.实时监控regionserver集群,当某个regionserver节点无法提供服务时将会通知master,由master进行regionserver上的region转移以及重新进行负载均衡。

3.当hbase集群启动后,master和regionserver会分别向zookeeper进行注册,会在zookeeper中存放hbase的meta表数据,region与regionserver的关系、以及regionserver的访问地址等信息。

*meta表中维护着tablename、rowkey和region的关联关系。

 

hbase处理读取和写入请求的流程

hbase处理读取请求的过程

1.客户端连接zookeeper,根据tablename和rowkey从meta表中计算出该row对应的region。

2.获取该region所关联的regionserver,并获取regionserver的访问地址。

3.访问regionserver,找到对应的region。

4.如果region的memorystore中有该row则直接进行获取,否则从storefile中进行查询。

 

hbase处理写入请求的过程

1.客户端连接zookeeper,根据tablename找到其region列表。

2.通过一定算法计算出要写入的region。

3.获取该region所关联的regionserver并进行连接。

4.把数据分别写到hlog和memorystore中。

5.每当memorystore中的大小达到128m时,会生成一个storefile。

6.当多个storefile的文件大小达到一定时,会进行storefile的合并,将多个storefile文件合并成一个storefile,当storefile的文件大小超过一定阈值时,会进行region的切分,由master将新region分配到相应的regionserver中,实现负载均衡。

 

*在第一次读取或写入时才需要连接zookeeper,会将zookeeper中的相关数据缓存到本地,往后直接从本地进行读取,当zookeeper中的信息发生变化时,再通过通知机制通知客户端进行更新。

 

 

hbase在hdfs中的目录

HBase海量数据存储

1.tmp目录:当对hbase的表进行创建和删除时,会将表移动到该目录中进行操作。

2.masterprocwals目录:预写日志目录,主要用于存储master的操作日志。

3.wals目录:预写日志目录,主要用于存储regionserver的操作日志。

4.data目录:存储region中的storefile。

5.hbase.id文件:hbase集群的唯一标识。

6.hbase.version文件:hbase集群的版本号。

7.oldwals目录:当wals目录下的日志文件超过一定时间后,会将其移动到oldwals目录中,master会定期进行清理。

 

 

4.hbase集群的搭建

 

1.安装jdk和hadoop

由于hbase是通过java语言编写的,且hbase是基于hdfs的,因此需要安装jdk和hadoop,并配置好java_home环境变量。

HBase海量数据存储

HBase海量数据存储

 

由于hdfs一般都以集群的方式运行,因此需要搭建hdfs集群。

HBase海量数据存储

*在搭建hdfs集群时,需要相互配置ssh使之互相信任并且开放防火墙相应的端口,或者直接关闭防火墙。

 

2.安装zookeeper并进行集群的搭建

由于hdfs ha依赖于zookeeper,且hbase也依赖于zookeeper,因此需要安装zookeeper并进行集群的搭建。

HBase海量数据存储

HBase海量数据存储

HBase海量数据存储

 

3.安装hbase

1.从cdh中下载hbase并进行解压: 

HBase海量数据存储

 

2.修改hbase-env.sh配置文件

#设置jdk的安装目录
export java_home=/usr/jdk8/jdk1.8.0_161

#true则使用hbase自带的zk服务,false则使用外部的zk服务.
export hbase_manages_zk=flase

 

3.修改hbase-site.xml配置文件

  <!-- 指定hbase日志的存放目录 -->  
  <property> 
    <name>hbase.tmp.dir</name>  
    <value>/usr/hbase/hbase-1.2.8/logs</value> 
  </property>  
  <!-- 指定hbase中的数据存储在hdfs中的目录 -->  
  <property> 
    <name>hbase.rootdir</name>  
    <value>hdfs://nameservice:8020/hbase</value> 
  </property>  
  <!-- 设置是否是分布式 -->  
  <property> 
    <name>hbase.cluster.distributed</name>  
    <value>true</value> 
  </property>  
  <!-- 指定hbase使用的zk地址 -->  
  <property> 
    <name>hbase.zookeeper.quorum</name>  
    <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value> 
  </property> 

 

4.修改regionservers文件,配置充当regionserver的节点

HBase海量数据存储

*值可以是主机名或者ip地址

*如果hadoop配置了hdfs ha高可用集群,那么就会有两个namenode和一个nameservice,此时就需要将hdfs的core-site.xml和hdfs-site.xml配置文件复制到hbase的conf目录下,且hbase-site.xml配置文件中的hbase.rootdir配置项的hdfs地址指向nameservice的名称。

 

5.ntp时间同步

ntp是一个时间服务器,作用是使集群中的各个节点的时间都保持一致。

由于在hbase集群中,zookeeper与hbase对时间的要求较高,如果两个节点之间的时间相差过大时,那么整个集群就会崩溃,因此需要使各个节点的时间都保持一致。

#查看是否安装了ntp服务
rpm -qa|grep ntp

#安装ntp服务
yum install ntp -y

#从ntp服务器中获取时间并同步本地
ntpdate 192.168.1.80

*在实际的应用场景中,可以自己搭建ntp服务器,也可以使用第三方开源的ntp服务器,如阿里等。

使用 “ntpdate ntp服务器地址” 命令从ntp服务器中获取时间并同步本地,一般配合linux的crontab使用,每隔5分钟进行一次时间的同步。

 

4.启动集群

使用bin目录下的start-hbase.sh命令启动集群,那么会在当前节点中启动一个master和regionsever进程,并通过ssh访问其它节点,启动regionserver进程。

HBase海量数据存储

HBase海量数据存储

HBase海量数据存储

 

由于hbase的master ha集群是通过zookeeper进行协调的,需要手动在其他节点中启动master,zookeeper能保证当前hbase集群中有且只有一个master处于active状态,当状态为active的master无法正常提供服务时,会将处于standby的master的状态修改为active。 

HBase海量数据存储

 

*当hbase集群启动后,可以访问,进入hbase的web监控页面。

 

 

5.使用shell操作hbase

 

使用bin/hbase shell命令进行hbase的shell操作

#创建表
create 'tablename' , 'columnfamily' , 'columnfamily...'

#添加记录
put 'tablename' , 'rowkey' , 'columnfamily:column' , 'value'

#查询记录
get 'tablename' , 'rowkey'

#统计表的记录数
count 'tablename'

#删除记录
deleteall 'tablename' , 'rowkey'

#删除记录的某一列
delete 'tablename' , 'rowkey' ,'columnfamily:column'

#禁用表
disable 'tablename'

#启动表
enable 'tablename'

#查看表是否被禁用
is_disabled 'tablename'

#删除表
drop 'tablename'

#查看表中的所有记录
scan 'tablename'

#查看表中指定列的所有记录
scan 'tablename' , {columns=>'columnfamily:column'}

#检查表是否存在
exists 'tablename'

#查看当前hbase中的表
list

 

*在删除表时需要禁用表,否则无法删除。

*使用put相同rowkey的一条数据来进行记录的更新,仅会更新列相同的值。

 

 

6.使用java操作hbase

 

1.导入相关依赖

<dependency>
  <groupid>org.apache.hbase</groupid>
  <artifactid>hbase-client</artifactid>
  <version>1.2.8</version>
</dependency>

 

2.初始化配置

使用hbaseconfiguration的create()静态方法创建一个configuration实例,用于封装环境配置信息。

configuration config = hbaseconfiguration.create();
config.set("hbase.zookeeper.quorum","192.168.1.80,192.168.1.81,192.168.1.82");
config.set("hbase.zookeeper.property.clientport","2181");

*此方法会默认加载classpath下的hbase-site.xml配置文件,如果没有此配置文件则需要手动进行环境的配置。

 

3.创建hbase连接对象

connection conn = connectionfactory.createconnection(config);

 

4.进行表的管理

*使用admin类进行hbase表的管理,通过connection实例的getadmin()静态方法返回一个admin实例。

//判断表是否存在
boolean tableexists(tablename);

//遍历hbase中的表定义
htabledescriptor [] listtables();

//遍历hbase中的表名称
tablename [] listtablenames();

//根据表名获取表定义
htabledescriptor gettabledescriptor(tablename);

//创建表
void createtable(htabledescriptor);

//删除表
void deletetable(tablename);

//启用表
void enabletable(tablename);

//禁用表
void disabletable(tablename);

//判断表是否是启用状态
boolean istableenabled(tablename);

//判断表是否是禁用状态
boolean istabledisabled(tablename);

//为表添加列簇
void addcolumn(tablename,hcolumndescriptor);

//删除表中的列簇
void deletecolumn(tablename,byte);

//修改表中的列簇
void modifycolumn(tablename,hcolumndescriptor);

 

tablename实例用于封装表名称。

htabledescriptor实例用于封装表定义,包括表的名称、表的列簇等。

hcolumndescriptor实例用于封装表的列簇。

 

5.对表中的数据进行增删改查

使用table类进行表数据的增删改查,通过connection的gettable(tablename)静态方法返回一个table实例。

//判断指定rowkey的数据是否存在
boolean exists(get get);

//根据rowkey获取数据
result get(get get);

//根据多个rowkey获取数据
result [] get(list<get>);

//获取表的扫描器
resultscanner getscanner(scan);

//添加数据
void put(put);

//批量添加数据
void put(list<put>);

//删除数据
void delete(delete);

//批量删除数据
void delete(list<delete>)

 

使用get实例封装查询参数,使用其构建方法设置rowkey。

使用put实例封装新增和更新参数,使用其构建方法设置rowkey,使用其addcolumn(byte[] family , byte[] qualifier , byte[] value)方法分别指定列簇、列名、列值。

使用delete实例封装删除参数,使用其构建方法设置rowkey。

使用scan实例封装扫描器的查询条件,使用其addfamily(byte[] family)方法设置扫描的列簇,使用其addcolumn(byte[] family , byte[] qualifier)方法分别指定要扫描的列簇和列名。

 

*在进行表的增删改查时,方法参数大多都是字节数组类型,可以使用hbase java提供的bytes工具类进行字符串和字节数组之间的转换。

*在进行查询操作时,会返回result实例,result实例包含了一个rowkey的所有键值对(cell,不区分列簇),可以通过result实例的listcells()方法获取其包含的所有cell,借助cellutil工具类获取cell实例中对应的rowkey、family、qualifier、value等属性信息。

*在使用getscanner扫描时,返回的resultscanner接口继承iterable接口,其泛型是result,因此可以理解成resultscanner是result的一个集合。

 

6.完整的hbaseutil

/**
 * @auther: zhuanghaotang
 * @date: 2018/11/26 11:40
 * @description:
 */
public class hbaseutils {

    private static final logger logger = loggerfactory.getlogger(hbaseutils.class);

    /**
     * zk集群地址
     */
    private static final string zk_cluster_hosts = "192.168.1.80,192.168.1.81,192.168.1.82";

    /**
     * zk端口
     */
    private static final string zk_cluster_port = "2181";

    /**
     * hbase全局连接
     */
    private static connection connection;

    static {
        //默认加载classpath下hbase-site.xml文件
        configuration configuration = hbaseconfiguration.create();
        configuration.set("hbase.zookeeper.quorum", zk_cluster_hosts);
        configuration.set("hbase.zookeeper.property.clientport", zk_cluster_port);
        try {
            connection = connectionfactory.createconnection(configuration);
        } catch (exception e) {
            logger.info("初始化hbase连接失败:", e);
        }
    }

    /**
     * 返回连接
     */
    public static connection getconnection() {
        return connection;
    }

    /**
     * 创建表
     */
    public static void createtable(string tablename, string... families) throws exception {
        admin admin = connection.getadmin();
        if (admin.tableexists(tablename.valueof(tablename))) {
            throw new unsupportedoperationexception("tablename " + tablename + " is already exists");
        }
        htabledescriptor descriptor = new htabledescriptor(tablename.valueof(tablename));
        for (string family : families)
            descriptor.addfamily(new hcolumndescriptor(family));
        admin.createtable(descriptor);
    }

    /**
     * 删除表
     */
    public static void deletetable(string tablename) throws exception {
        admin admin = connection.getadmin();
        if (admin.tableexists(tablename.valueof(tablename))) {
            admin.disabletable(tablename.valueof(tablename));
            admin.deletetable(tablename.valueof(tablename));
        }
    }

    /**
     * 获取所有表名称
     */
    public static tablename[] gettablenamelist() throws exception {
        admin admin = connection.getadmin();
        return admin.listtablenames();
    }

    /**
     * 获取所有表定义
     */
    public static htabledescriptor[] gettabledescriptorlist() throws exception {
        admin admin = connection.getadmin();
        return admin.listtables();
    }

    /**
     * 为表添加列簇
     */
    public static void addfamily(string tablename, string family) throws exception {
        admin admin = connection.getadmin();
        if (!admin.tableexists(tablename.valueof(tablename))) {
            throw new unsupportedoperationexception("tablename " + tablename + " is not exists");
        }
        admin.addcolumn(tablename.valueof(tablename), new hcolumndescriptor(family));
    }

    /**
     * 删除表中指定的列簇
     */
    public static void deletefamily(string tablename, string family) throws exception {
        admin admin = connection.getadmin();
        admin.deletecolumn(tablename.valueof(tablename), bytes.tobytes(family));
    }

    /**
     * 为表添加一条数据
     */
    public static void put(string tablename, string rowkey, string family, map<string, string> values) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        put put = new put(bytes.tobytes(rowkey));
        for (map.entry<string, string> entry : values.entryset())
            put.addcolumn(bytes.tobytes(family), bytes.tobytes(entry.getkey()), bytes.tobytes(entry.getvalue()));
        table.put(put);
    }

    /**
     * 批量为表添加数据
     */
    public static void batchput(string tablename, string family, map<string, map<string, string>> values) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        list<put> puts = new arraylist<>();
        for (map.entry<string, map<string, string>> entry : values.entryset()) {
            put put = new put(bytes.tobytes(entry.getkey()));
            for (map.entry<string, string> subentry : entry.getvalue().entryset())
                put.addcolumn(bytes.tobytes(family), bytes.tobytes(subentry.getkey()), bytes.tobytes(subentry.getvalue()));
            puts.add(put);
        }
        table.put(puts);
    }

    /**
     * 删除rowkey中的某列
     */
    public static void deletecolumn(string tablename, string rowkey, string family, string qualifier) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        delete delete = new delete(bytes.tobytes(rowkey));
        delete.addcolumn(bytes.tobytes(family), bytes.tobytes(qualifier));
        table.delete(delete);
    }

    /**
     * 删除rowkey
     */
    public static void delete(string tablename, string rowkey) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        table.delete(new delete(bytes.tobytes(rowkey)));
    }

    /**
     * 批量删除rowkey
     */
    public static void batchdelete(string tablename, string... rowkeys) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        list<delete> deletes = new arraylist<>();
        for (string rowkey : rowkeys)
            deletes.add(new delete(bytes.tobytes(rowkey)));
        table.delete(deletes);
    }

    /**
     * 根据rowkey获取数据
     */
    public static map<string, string> get(string tablename, string rowkey) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        result result = table.get(new get(bytes.tobytes(rowkey)));
        list<cell> cells = result.listcells();
        map<string, string> cellsmap = new hashmap<>();
        for (cell cell : cells) {
            cellsmap.put(bytes.tostring(cellutil.clonequalifier(cell)), bytes.tostring(cellutil.clonevalue(cell)));
        }
        return cellsmap;
    }

    /**
     * 获取全表数据
     */
    public static map<string, map<string, string>> scan(string tablename) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        resultscanner resultscanner = table.getscanner(new scan());
        return getresult(resultscanner);
    }

    /**
     * 获取某列数据
     */
    public static map<string, map<string, string>> scan(string tablename, string family, string qualifier) throws exception {
        table table = connection.gettable(tablename.valueof(tablename));
        scan scan = new scan();
        scan.addcolumn(bytes.tobytes(family), bytes.tobytes(qualifier));
        resultscanner resultscanner = table.getscanner(scan);
        return getresult(resultscanner);
    }

    private static map<string, map<string, string>> getresult(resultscanner resultscanner) {
        map<string, map<string, string>> resultmap = new hashmap<>();
        for (result result : resultscanner) {
            list<cell> cells = result.listcells();
            map<string, string> cellsmap = new hashmap<>();
            for (cell cell : cells)
                cellsmap.put(bytes.tostring(cellutil.clonequalifier(cell)), bytes.tostring(cellutil.clonevalue(cell)));
            resultmap.put(bytes.tostring(result.getrow()), cellsmap);
        }
        return resultmap;
    }

}