Hadoop 综合揭秘——HBase的原理与应用
前言
现今互联网科技发展日新月异,大数据、云计算、人工智能等技术已经成为前瞻性产品,海量数据和超高并发让传统的 web2.0 网站有点力不从心,暴露了很多难以克服的问题。为此,google、amazon 、powerset 等各大平台纷纷推出 nosql 技术以应对市场的急速发展,近10年间nosql技术百花齐放,hbase、redis、mongodb、cassandra 等技术纷纷涌现。
本文主要向各位介绍 hbase 的发展历史,基础结构与原理,应用的场景,对常用的 java api 操作进行梳理,在最后一节还会详细讲述 hbase 与 mr 之间关系。
目录
一、hbase 的概述
1.1 hbase 的发展历史
hbase(hadoop database)是一个高可靠性、高性能、面向列、可伸缩的分布式数据库,典型的 nosql(not only sql)数据库。它起源于 hadoop 的子项目,由 powerset 公司在2007年创建,同年10 月 hbase 的第一版与 hadoop 0.15.0 捆绑发布,初期的目标是弥补 mapreduce 在实时操作上的缺失,方便用户可随时操作大规模的数据集。随着大数据与 nosql 的流行和迅速发展,在 2010年5月,apache hbase 脱离了 hadoop,成为 apache 基金的*项目。次年即 2011年1月 zookeeper 也脱离 hadoop,成为 apache 基金的*项目。
1.2 hbase 的特点
- 面向列设计:面向列表(簇)的存储和权限控制,列(簇)独立检索。
- 支持多版本:每个单元中的数据可以有多个版本,默认情况下,版本号可自动分配,版本号就是单元格插入时的时间戳。
- 稀疏性:为空的列不占用存储空间,表可以设计得非常稀疏。
- 高可靠性:wal机制保证了数据写入时不会因集群异常而导致写入数据丢失,replication机制保证了在集群出现严重的问题时,数据不会发生丢失或损坏。
- 高性能:底层的lsm数据结构和 rowkey 有序排列等架构上的独特设计,使得hbase具有非常高的写入性能。通过科学性地设计rowkey 可让数据进行合理的 region 切分,主键索引和缓存机制使得hbase 在海量数据下具备高速的随机读取性能。(下文将再作介绍)
1.3 hbase (nosql)与 rdbms 的区别
1.3.1 传统的 rdbms 具有以下特征
它是面向表格、视图设计的标准化数据,表中的数据类型也会进行预定义,数据保存后表的结构不易修改。每个表格对列的数据有所限制,最大不会超过几个百个,这将导致不同的数据可能会存放到多个表,表格之间存在一对一,一对多,多对一,多对多等复杂关系。正因如此也限制了 rdbms 的使用场景更适合于高度结构化的的行业,例如医疗,机关,教育等行业。
1.3.2 hbase 是典型的 nosql代表
相对于 rdbms ,它属于一种高效的映射嵌套型弱视图设计,以key-value的方式存储数据,每一行数据都可以有不同的列设计。数据依赖于行键作为唯一标识,当行数据的结构发生变生时,hbase 也能根据需求作出灵活调整。数据以文本方式保存,hbase 把数据的解释任务交给了应用程序,因此它更适合于灵活的数据结构项目。
1.4 hbase 的版本问题
hbase 对 hadoop 和 jdk 的版本支持性有一定要求,详细内容可在官网查询
hadoop version support matrix
"s" =supported ,"x"= not supported ,“nt"=not tested
二、hbase 的原理
2.1 hbase 的总体结构
hbase 的架构是依托于 hadoop 的 hdfs 作为最基本存储基础单元,在 hbase 的集群中由一个 master 主节点管理多个 region server ,而 zookeeper 进行协调操作,其关系如下图所示:
2.1.1 hmaster
hmaster 用于启动任务管理多个hregionserver,侦测各个hregionserver之间的状态,当一个新的hregionserver登录到hmaster时,hmaster 会告诉它等待分配数据,平衡 hregionserver 之间的负载。而当某个 hregionserver 死机时,hmaster会把它负责的所有hregion标记为未分配,然后再把它们分配到其他 hregionserver 中,并恢复hregionserver的故障。事实上 hmaster 的负载很轻, hbase 允许有多个 hmaster 节点共存,但同一时刻只有一个 hmaster 能为系统提供服务,其他的 hmaster 节点处于待命的状态。当正在工作的 hmaster 节点宕机时,其他的 hmaster 则会接管hbase的集群。
2.1.2 hregionserver
hbase中的所有数据从底层来说一般都是保存在hdfs中的,用户通过一系列hregionserver获取这些数据。集群一个节点上一般只运行一个hregionserver,且每一个区段的hregion只会被一个hregionserver维护。hregionserver主要负责响应用户i/o请求,向hdfs文件系统读写数据,是hbase中最核心的模块。
2.1.3 zookeeper
apache zookeeper 起源于 hadoop 的分布式协同服务,是负责协调集群中的分布式组件,在 2011年1月 zookeeper 脱离 hadoop,成为 apache 基金的*项目。经过多年的发展 zookeeper 已经成为了分布式大数据框架中容错性的标准框架,被多个分布式开源框架所应用。hbase 的组件之间是通过心跳机制协调系统之间的状态和健康信息的,这些功能都是通过消息实现,一旦消息因外界原因丢失,系统侧需要根据不同的情况进行处理, zookeeper 的主要作用正是监听并协调各组件的运作。它监听了多个节点的使用状态,保证了 hmaster 处于正常运行当中,一旦 hmaster 发生故障时 zookeeper 就会发出通知,备用的 hmaster 就会进行替代。zookeeper 也会监测 hregionserver 的健康状态, 一旦发生故障就会通知 hmaster ,把任务重新分配给正常的 hregionserver 进行操作,并恢复有故障的 hregionserver。
2.2 hbase 运作原理
在介绍完 hbase 的总体结构后,下面将为大家介绍一下 hregion、hstore、memstore、hfile、wal 等组件是如何进行协调操作的,hbase 的运作原理图如下:
2.2.1 hregion
每个 hregionserver 内部管理了一系列 hregion ,他们可以分别属于不同的逻辑表,每个 hregion 对应了逻辑表中的一个连续数据段。hregionserver 只是管理表格,实现读写操作。client 直接连接到 hregionserver,并通信获取 hbase 中的数据。而 hregion 则是真实存放 hbase 数据的地方,也就说 hregion 是 hbase 可用性和分布式的基本单位。当表的大小超过预设值的时候,hbase会自动将表划分为不同的区域,每个区域就是一个hregion,以主键(rowkey)来区分。一个hregion会保存一个表中某段连续的数据,一张完整的表数据是保存在多个 hregion 中的,这些 hregion 可以在同一个hregionserver 中,也可以来源于不同的 hregionserver。
2.2.2 hstore
每个 hregion 由多个hstore组成,每个 hstore 对应逻辑表在这个 hregion 集合中的一个 column family,建议把具有相近 io 特性的 column 存储在同一个 column family 中,以实现高效读取 。hstore 由一个 memstore 及一系列 hfile 组成,memstore 存储于内存当中,而 hfiles 则是写入到 hdfs 中的持久性文件。用户写入的数据首先会放入 memstore,当 memstore大小到达预设值(可通过 hbase.hregion.memstore.flush.size 进行配置)后就会 flush 成一个storefile(即 hfile)文件。
2.2.3 memstore
memstore 是一个缓存 (in memory sorted buffer),当所有数据完成 wal 日志写后,就会写入memstore 中,由 memstore 根据一定的算法将数据 flush 到地层 hdfs 文件中(hfile),每个 hregion 中的每个 column family 有一个自己的 memstore。当用户从 hbase 中读取数据时,系统将尝试从 memstore 中读取数据,如果没找到相应数据才会尝试从 hfile 中读取。当服务器宕机时,memstore 中的数据有可能会丢失,此时 hbase 就会使用 wal 中的记录对 memstroe 中的数据进行恢复。
2.2.4 hfile
hfile 是最终保存 hbase 数据行的文件,一个 hfile 文件属于一张表中的某个列簇,当中的数据是按 rowkey、column family、column 升序排序,对相同的cell(即这三个值都一样),则按timestamp倒序排列。hfile 的具体格式如下图:
hfile 中每条键值存储的开发都包括2个固定长度的数字,分别表示键和值的长度,目的是让客户端可根据字节的偏移访问值域中的数据。根据上图可以看到 keyvalue 类中getkey和getrow方法的区别, getkey() 方法返回的是整个键(图中绿色部分),而 getrow() 方法返回的只是行键 rowkey(图中第4格)。
hfile 文件是根据表的列簇进行区分的,在执行持久化时,记录是有序的。但当 hfile 的文件内容增长到一定阈值后就会触发合并操作,多个 hfiles 就会合并成一个更大的 hfile,由于这几个 hfile 有可能在不同的时间段产生,为保证合并后数据依然是有序排列,hfile 会通过小量压缩或全量压缩进行合并,对 hfile 文件记录进行重新排序。由于全量压缩是一个耗费资源的操作,因此应该保证在资源充足的情况下进行(由于数据压缩问题已超出本文的界限,在以后的章节将会详细介绍)。
当单个 hfile 大小超过一定阈值后,会触发 split 拆分操作,用户可通过配置 hbase.regionserver.region.split.policy 选择拆分的策略,拆分策略由 regionsplitpolicy 类进行处理,目前系统已支持 increasingtoupperboundregionsplitpolicy、constantsizeregionsplitpolicy、delimitedkeyprefixregionsplitpolicy、keyprefixregionsplitpolicy 等多种拆分策略。默认情况下 hregion 将被拆分成 2 个 hregion,父 hregion 会下线,新分出的 2 个子 hregion 会被 hmaster 分配到相应的 hregionserver。
2.2.5 wal
wal(write-ahead-log,又名 hlog)是 hregionserver 中的日志记录的工具,当系统发生故障时,可以通过 wal 恢复数据。在每次用户操作将数据写入memstore的时候,也会写一份数据到 wal 当中,wal 当中包含了部分还没有写入 hfile 的文件。wal 文件会定期滚动刷新,并删除旧的文件(已持久化到 hfile中的数据)。当 hmaster 通过 zookeeper 感知到某个hregionserver意外终止时,hmaster首先会处理遗留的 wal 文件,将其中不同 hregion 的 wal 数据进行拆分,分别放到相应 wal 的目录下,然后再将失效的 hregion 重新分配,领取到这些 hregion 的 hregionserver 在加载 hregion的过程中,会发现有历史 wal 需要处理,因此会把 wal 中的数据加载到 memstore 中,然后 flush 到hfiles,完成数据恢复。
用户可以通过禁用 wal 方式提高hbase 的性能,然而这将有导致数据丢失的风险,用户应该谨慎处理。一旦禁用了 wal,系统应该在接收到 hregionserver 宕机的消息后重新启动写入程序,然而这有可能导致数据重复输入。
三、简述 rowkey 设计原理
由于hbase属于分布式系统,数据会根据 rowkey 进行分块存储,只要合理设计好 rowkey 让数据均匀分散多台 hregionserver 管理,时常被同时查找的数据被存储到同一个hregion之内,这样就能最大限度地提高系统的性能(在第四节介绍到分页查询方法中,如果能巧妙地设计 rowkey 把每页的数据存储在一个hregion之内,将有效地提升性能)。反之,如若 rowkey 设计不合理,让大量的数据存储在同一个 hregionserver 而其他 hregionserver 长期处理闲置状态,就会减低系统性能,还有可能让 hregionsever 资源耗尽会发生错误。在拜读过 sameer wadkar 等大师所著的《pro apache hadoop》和nick dimiduk 所著的 《hbase in action》等文献后,本人对关于rowkey设计原则归纳成了下面几点:
- 注意 rowkey 的长度,rowkey越长系统i/o开销越大,在上千万条数据的系统当中rowkey设置超过100个字节,光rowkey就会消耗近1g 的流量
- 可将 rowkey 的前位作为散列字段,由程序循环生成,扩展位放时间、表格、属性、时间戳等字段,这样可提高数据均衡分布在每个hregionserver 实现负载均衡的几率。
- 利用组合式行键方式,以主机名,事件,时间戳作为行键,根据组件的访问特性进行排序分组。
以随机的散列作为前缀这点很好理解,例如有下面的order表
orderid | goods | price |
20180802001 | iphone x | 8000 |
20180802002 | lynk&co tools | 5000 |
20180823003 | ausu n75s | 8600 |
20180713004 | nikon d7500 | 7300 |
若只以orderid作为rowkey,以递增方式进行存储,数据很有可能只存储于同一个hregionserver,此时可以用hash函数随机生成散列,加上必要的属性(可以是辨别符、时间戳等唯一属性),生成类似于154432_180802001、879531_180802002、544688_180823003、687851_180713004等数据。此数据的分配会更均衡,但查找时会更耗资源。
我们也可以利用主机名,事件,时间戳组合键的方式定制行键,例如系统配置了4个hregionserver,分别用a、b、c、d代表,此时可以将 rowkey 配置为类似于 a-84548454-c、b-3223265-c、c-656565-c、d-333256-c,这里只是举个简单的例子,当然实际操作上组合方式有多种。此方法好处在于用户更容易地控制数据的分布,但会增加管理的繁琐度,如果服务器有变动时,数据存储可能需要重新整理。
其实rowkey设计本来就是一个复杂的问题,在这里介绍的只是冰山一角,希望对大家的rowkey设计有所启发。
四、hbase 的 java api 开发实例
4.1 hbase 的基础操作类
由于各版 hbase 的 java api 会有不同,下面以比较稳定的 hbase-client 1.2.6 为例子介绍一下 hbase 的具体操作
4.1.1 org.apache.hadoop.hbase.hbaseconfiguration 类
继承了 org.apache.hadoop.conf.configuration 类,主要用于配置系统运行环境,管理资源池
函数 | 描述 |
static configuration create() | 获取当前运行环境下的 configuration 配置对象 |
void addresource(path file) | 通过给定的路径所指的文件来添加资源 |
void clear() | 清空所有已设置的属性 |
string get(string name) | 获取属性名对应的值 |
string getboolean(string name, boolean defaultvalue) | 获取为boolean类型的属性值,如果其属性值类型部位boolean,则返回默认属性值 |
void set(string name, string value) | 通过属性名来设置值 |
void setboolean(string name, boolean value) | 设置boolean类型的属性值 |
下面是hbaseconfiguration常用的方式
1 public class hbaseutils { 2 public static configuration config; 3 public static connection connection; 4 5 static{ 6 config=hbaseconfiguration.create(); 7 8 try { 9 connection=connectionfactory.createconnection(config); 10 11 } catch (ioexception e) { 12 // todo 自动生成的 catch 块 13 e.printstacktrace(); 14 } 15 } 16 ...... 17 }
4.1.2 org.apache.hadoop.hbase.client.connection 接口
与 sql 的 connection 连接相似,用户管理 hbase 客户端与服务端的连接,在操作完成后可通过 connetion.close ()及时释放资源。
通过 connection 类的 admin getadmin()方法可获取 admin 管理类。
通过 connetionfactory 类的 static connection createconnection (config) 静态方法可获取当前连接。
4.1.3 org.apache.hadoop.hbase.client.hbaseadmin 类
用于管理hbase数据库的表信息,它提供的方法包括:创建表,删除表,列出表,使表有效或无效,以及添加或删除表列簇成员等。
下面例子可用于判断表格是否存在
1 public class hbaseutils { 2 public static configuration config; 3 public static connection connection; 4 5 static{ 6 config=hbaseconfiguration.create(); 7 8 try { 9 connection=connectionfactory.createconnection(config); 10 } catch (ioexception e) { 11 // todo 自动生成的 catch 块 12 e.printstacktrace(); 13 } 14 } 15 16 public static void checktable(string tablename) 17 throws exception { 18 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 19 20 if (admin.tableexists(tablename)) { 21 system.out.println(tablename + " exists!"); 22 } 23 } 24 }
4.1.4 org.apache.hadoop.hbase.htabledescriptor 类
用于操作表名及其对应表的列簇
4.1.5 org.apache.hadoop.hbase.hcolumndescriptor 类
维护列簇的信息,例如版本号,压缩设置等,通常在创建表或者为表添加列簇的时候使用。
列簇被创建后不能直接修改,只能通过删除然后重新创建的方式。 列簇被删除的时候,列簇里面的数据也会同时被删除。
一个表通常可以包含1~5个列簇,下面例子可用于新建表,如需要包含多个列簇,可以在columnfamily参数中用 “ , ” 输入
1 //新建表 2 public static boolean createtable(string tablename, string columnfamily) 3 throws exception { 4 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 5 6 if (admin.tableexists(tablename)) { 7 system.out.println(tablename + " exists!"); 8 return false; 9 } else { 10 //建立列簇 11 string[] columnfamilyarray; 12 if(columnfamily.contains(",")) 13 columnfamilyarray = columnfamily.split(","); 14 else{ 15 columnfamilyarray=new string[1]; 16 columnfamilyarray[0]=columnfamily; 17 } 18 hcolumndescriptor[] hcolumndescriptor = new hcolumndescriptor[columnfamilyarray.length]; 19 for (int i = 0; i < hcolumndescriptor.length; i++) { 20 hcolumndescriptor[i] = new hcolumndescriptor(columnfamilyarray[i]); 21 } 22 //建立表对象 23 htabledescriptor familydesc = new htabledescriptor(tablename.valueof(tablename)); 24 for (hcolumndescriptor columndescriptor : hcolumndescriptor) { 25 familydesc.addfamily(columndescriptor); 26 } 27 htabledescriptor tabledesc = new htabledescriptor(tablename.valueof(tablename), familydesc); 28 //新建表 29 admin.createtable(tabledesc); 30 admin.close(); 31 return true; 32 } 33 }
4.1.6 org.apache.hadoop.hbase.client.table 接口
由于 htable 是非线性安全类,已经被系统丢弃,在 hbase-client v1.0 版本后在数据更新删除时应该使用线性安全的 table 接口进行操作
方法 | 说明 |
boolean checkadnput(byte[] row, byte[] family, byte[] qualifier, byte[] value, put put | 自动的检查row/family/qualifier是否与给定的值匹配 |
void close() | 释放所有的资源或挂起内部缓冲区中的更新,请谨记在数据更新删除后使用以释放资源 |
boolean exists(get get) | 检查get实例所指定的值是否存在于table中 |
result get(get get) | 获取指定行的某些单元格所对应的值 |
result[] get(list<get> list) | 获取多行的单元格所对应的值 |
resultscanner getscanner(scan scan) | 获取当前给定列簇的scanner实例 |
htabledescriptor gettabledescriptor() | 获取当前表的htabledescriptor实例 |
void delete(delete delete) | 删除数据 |
void delete(list<delete> list) | 删除多行数据 |
void put(list<put> list) | 向表中添加多个值 |
void put(put put) | 向表中添加值 |
此接口是hbase最常用的对表格操作的接口,具体的使用方法将在下一节再详细介绍
4.1.7 org.apache.hadoop.hbase.client.put 类
主要用于对单个行执行添加操作,在 hbase-client v1.0 版本后 add 方法已经被 addcolumn 方法所代替
方法 | 说明 |
put addcolumn(byte[] family, byte[] qualifier, byte[] value) | 将指定的列和对应的值添加到put实例中 |
put addcolumn(byte[] family, byte[] qualifier, long ts, byte[] value) | 将指定的列和对应的值及时间戳添加到put实例中 |
list<cell> get(byte[] family,byte[] qualifier) | 获取指定列簇和对应值的列 |
4.1.8 org.apache.hadoop.hbase.client.get 类
用于获取单行数据的相关信息
方法 | 说明 |
get addcolumn(byte[] family, byte[] qualifier) | 获取指定列簇和列修饰符对应的get对象 |
get addfamily(byte[] family) | 获取指定列簇对应列的get对象 |
get settimerange(long timestamp) | 获取指定时间戳的get对象 |
get setfilter(filter filter) | 执行get操作时设置服务器端的过滤器 |
4.1.9 org.apache.hadoop.hbase.client.delete类
用于获取单行数据的相关信息
方法 | 说明 |
delete addcolumn(byte[] family, byte[] qualifier) | 获取指定列簇和列修饰符的delete对象(只包含当前version) |
delete addcolumn(byte[] family, byte[] qualifier,long timestamp) | 获取指定列簇、列修饰符和时间戳的delete对象(只包含当前version) |
delete addcolumns(byte[] family, byte[] qualifier) | 获取指定列簇和列修饰符(包含所有version) |
delete addcolumns(byte[] family, byte[] qualifier,long timestamp) | 获取指定列簇、列修饰符和时间戳的delete对象(包含所有version) |
delete addfamily(byte[] family) | 获取指定的列簇的delete对象 |
delete settimerange(long timestamp) | 获取指定时间戳的delete对象 |
4.1.10 org.apache.hadoop.hbase.client.result 类
用于封装数据查找后的单行查询结果,用 map 结构保存符合列的对应属性。旧版本一直使用keyvalue来封装列的属性,但自从v1.0版本后,系统都会使用 cell 对象来封装列的属性,多个旧方法已经被弃用,各位在使用时可以注意一下
方法 | 说明 |
byte[] getrow() | 获取当前行键值 |
byte[] getvalue(byte[] family, byte[] qualifier) | 获取指定列簇和列修饰符的最新数据值 |
cell getcolumnlastestcell(byte[] family, byte[] qualifier) | 获取指定列簇、列修饰符最新版本的列 |
list<cell> getcolumncells(byte[] family, byte[] qualifier) | 获取指定列簇、列修饰符多个版本的列 |
boolean containscolumn(byte[] family, byte[] qualifier) | 判断是否存在指定列簇、列修饰符的列 |
cell[] listcells() | 获取多个版本所有列 |
navigablemap<byte[], byte[]> getfamilymap(byte[] family) | 获取批定列簇的列 |
navigablemap<byte[], navigablemap<byte[], navigablemap<long, byte[]>>> getmap() | 获取所有列簇所有版本的列 |
4.1.11 org.apache.hadoop.hbase.client.scan类
当用户想在查询时一次返回多选结果,可以通过scan进行条件查询,通过scan可以设定多种过滤器和起始行,从而根据用户的需要对整张表进行扫描。然而在默认情况下,scan对象是用迭代形式进行查询一次只返回一行,为了提高内存的使用率可以通过 scan.setcaching (int caching)方法把返回行数进行调整,从而提高系统性能。
方法 | 说明 |
scan addfamily(byte[] family) | 获取指定列簇的scan |
scan addcolumn(byte[] family, byte[] qualifier) | 获取指定列簇和列修饰符的scan |
scan setcaching(int caching) | 设置缓存数量 |
scan setstartrow(byte[] startrow) | 设置开始行rowkey,如果不设置将由第一行开始查找 |
scan setstoprow(byte[] stoprow) | 设置结束行rowkey |
scan settimestamp(long stamp) | 获取指定时间戳的scan对象 |
scan setmaxresultsize(long maxresultsize) | 设置最大返回数量 |
scan setfilter(filter filter) | 设置查询条件 |
4.1.12 org.apache.hadoop.hbase.client.resultscanner 接口
由于scan 查找会返回多行数据,为了实现逐行查询功能,resultscanner类出现,它把扫描到每一行数据封装成一个result实例,并将所有的result实例放入一个迭代器中。
resultscanner 只有几个简单的方法,在有条件的情况下,使用result[] next(int paramint)一次获取多条数据,有利用提升系统的性能。
方法 | 说明 |
result next() | 返回result,转到下一行 |
result[] next (int paramint) | 返回多行数据的result数组 |
void close() | 关闭连接释放资源 |
4.2 通用类 hbaseutils
下面本人参考了一些基础文献对hbase常用的curd操作进行了归纳,整理出一个通用类hbaseutils,希望对大家日常开发有所帮助。
由于hbase是分布式系统,其性能与数据的存储方式有莫大的关联,所以在开发时应该与第三节rowkey的设计原理相结合进行调整,要不然有可能影响系统性能。
1 public class hbaseutils { 2 public static configuration config; 3 public static connection connection; 4 5 static{ 6 config=hbaseconfiguration.create(); 7 8 try { 9 connection=connectionfactory.createconnection(config); 10 } catch (ioexception e) { 11 // todo 自动生成的 catch 块 12 e.printstacktrace(); 13 } 14 } 15 16 //新建表 17 public static boolean createtable(string tablename, string columnfamily) 18 throws exception { 19 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 20 21 if (admin.tableexists(tablename)) { 22 system.out.println(tablename + " exists!"); 23 return false; 24 } else { 25 //建立列簇 26 string[] columnfamilyarray; 27 if(columnfamily.contains(",")) 28 columnfamilyarray = columnfamily.split(","); 29 else{ 30 columnfamilyarray=new string[1]; 31 columnfamilyarray[0]=columnfamily; 32 } 33 hcolumndescriptor[] hcolumndescriptor = new hcolumndescriptor[columnfamilyarray.length]; 34 for (int i = 0; i < hcolumndescriptor.length; i++) { 35 hcolumndescriptor[i] = new hcolumndescriptor(columnfamilyarray[i]); 36 } 37 //建立表对象 38 htabledescriptor familydesc = new htabledescriptor(tablename.valueof(tablename)); 39 for (hcolumndescriptor columndescriptor : hcolumndescriptor) { 40 familydesc.addfamily(columndescriptor); 41 } 42 htabledescriptor tabledesc = new htabledescriptor(tablename.valueof(tablename), familydesc); 43 //新建表 44 admin.createtable(tabledesc); 45 admin.close(); 46 return true; 47 } 48 } 49 50 //插入数据 51 public static boolean put(string tablename, string row, string columnfamily, 52 string qualifier, string data) throws exception { 53 table table = connection.gettable(tablename.valueof(tablename)); 54 put put = new put(bytes.tobytes(row)); 55 put.addcolumn(bytes.tobytes(columnfamily), bytes.tobytes(qualifier), 56 bytes.tobytes(data)); 57 table.put(put); 58 system.out.println("put '" + row + "', '" + columnfamily + ":" + qualifier 59 + "', '" + data + "'"); 60 table.close(); 61 return true; 62 } 63 64 //插入多列数据 65 public static boolean put(string tablename, string row, string columnfamily, 66 string[] qualifierlist, string[] datalist) throws exception { 67 table table = connection.gettable(tablename.valueof(tablename)); 68 put put = new put(bytes.tobytes(row)); 69 for(int n=0;n<qualifierlist.length;n++){ 70 put.addcolumn(bytes.tobytes(columnfamily), bytes.tobytes(qualifierlist[n]), 71 bytes.tobytes(datalist[n])); 72 } 73 table.put(put); 74 table.close(); 75 return true; 76 } 77 78 //查看某行 79 public static map<string, object> getrow(string tablename, string row) throws exception { 80 table table = connection.gettable(tablename.valueof(tablename)); 81 get get = new get(bytes.tobytes(row)); 82 result result = table.get(get); 83 table.close(); 84 return resulttomap(result); 85 } 86 87 //查看全表 88 public static list<map<string, object>> gettable(string tablename) throws exception { 89 table table = connection.gettable(tablename.valueof(tablename)); 90 scan s = new scan(); 91 resultscanner rs = table.getscanner(s); 92 93 list<map<string, object>> reslist = new arraylist<map<string, object>>(); 94 for (result r : rs) { 95 map<string, object> tempmap = resulttomap(r); 96 reslist.add(tempmap); 97 } 98 table.close(); 99 return reslist; 100 } 101 102 //单个qualifier的值等于data 103 public static list<map<string, object>> queryequal(string tablename, string columnfamily, string qualifier, string data) throws exception { 104 //某列等于data的数据 105 filter filter = new singlecolumnvaluefilter(bytes.tobytes(columnfamily), bytes.tobytes(qualifier), 106 compareop.equal, bytes.tobytes(data)); 107 filterlist filterlist = new filterlist(); 108 filterlist.addfilter(filter); 109 return query(tablename, filterlist); 110 } 111 112 //查询qualifier值在mindata和maxdata之间的数据 113 public static list<map<string, object>> querybetween(string tablename, string columnfamily, string qualifier, string mindata, string maxdata) throws exception { 114 singlecolumnvaluefilter filter1 = new singlecolumnvaluefilter(bytes.tobytes(columnfamily), bytes.tobytes(qualifier), 115 compareop.less_or_equal, bytes.tobytes(maxdata)); 116 singlecolumnvaluefilter filter2 = new singlecolumnvaluefilter(bytes.tobytes(columnfamily), bytes.tobytes(qualifier), 117 compareop.greater_or_equal, bytes.tobytes(mindata)); 118 filterlist filterlist = new filterlist(filterlist.operator.must_pass_all); 119 //不存在此值时跳过 120 filter1.setfilterifmissing(true); 121 filter2.setfilterifmissing(true); 122 //加入过虑器 123 filterlist.addfilter(filter1); 124 filterlist.addfilter(filter2); 125 return query(tablename, filterlist); 126 } 127 128 //查询列 129 public static list<map<string, object>> querycolumn(string tablename, string prefix) throws exception { 130 filter filter = new columnprefixfilter(bytes.tobytes(prefix)); 131 filterlist filterlist = new filterlist(); 132 filterlist.addfilter(filter); 133 return query(tablename, filterlist); 134 } 135 136 //分页查询 137 public static list<map<string, object>> queryrowpage(string tablename, string columnfamily, string qualifier,string value,string count,string startrowkey,string stoprowkey) throws exception { 138 filterlist filterlist = new filterlist(filterlist.operator.must_pass_all); 139 //设置每页面数量pagecount 140 filter filter1 = new pagefilter(integer.parseint(count)); 141 filter filter2 = new singlecolumnvaluefilter(bytes.tobytes(columnfamily), bytes.tobytes(qualifier), 142 compareop.equal, bytes.tobytes(value)); 143 //设置查询条件 144 filterlist.addfilter(filter1); 145 filterlist.addfilter(filter2); 146 147 table table = connection.gettable(tablename.valueof(tablename)); 148 scan s=new scan(); 149 //设置key的开始值和结束值 150 if(startrowkey!=null) 151 s.setstartrow(bytes.tobytes(startrowkey)); 152 if(stoprowkey!=null) 153 s.setstoprow(bytes.tobytes(stoprowkey)); 154 //加入查询条件 155 s.setfilter(filterlist); 156 resultscanner rs = table.getscanner(s); 157 158 list<map<string, object>> reslist = new arraylist<map<string, object>>(); 159 for (result r : rs) { 160 map<string, object> tempmap = resulttomap(r); 161 reslist.add(tempmap); 162 } 163 164 table.close(); 165 return reslist; 166 } 167 168 //数据转换为map 169 private static map<string, object> resulttomap(result result) { 170 map<string, object> resmap = new hashmap<string, object>(); 171 list<cell> listcell = result.listcells(); 172 map<string, object> tempmap = new hashmap<string, object>(); 173 string rowname = ""; 174 list<string> familynamelist = new arraylist<string>(); 175 for (cell cell : listcell) { 176 byte[] rowarray = cell.getrowarray(); 177 byte[] familyarray = cell.getfamilyarray(); 178 byte[] qualifierarray = cell.getqualifierarray(); 179 byte[] valuearray = cell.getvaluearray(); 180 int rowoffset = cell.getrowoffset(); 181 int familyoffset = cell.getfamilyoffset(); 182 int qualifieroffset = cell.getqualifieroffset(); 183 int valueoffset = cell.getvalueoffset(); 184 int rowlength = cell.getrowlength(); 185 int familylength = cell.getfamilylength(); 186 int qualifierlength = cell.getqualifierlength(); 187 int valuelength = cell.getvaluelength(); 188 189 byte[] temprowarray = new byte[rowlength]; 190 system.arraycopy(rowarray, rowoffset, temprowarray, 0, rowlength); 191 string temprow= bytes.tostring(temprowarray); 192 193 byte[] tempqulifierarray = new byte[qualifierlength]; 194 system.arraycopy(qualifierarray, qualifieroffset, tempqulifierarray, 0, qualifierlength); 195 string tempqulifier= bytes.tostring(tempqulifierarray); 196 197 byte[] tempfamilyarray = new byte[familylength]; 198 system.arraycopy(familyarray, familyoffset, tempfamilyarray, 0, familylength); 199 string tempfamily= bytes.tostring(tempfamilyarray); 200 201 byte[] tempvaluearray = new byte[valuelength]; 202 system.arraycopy(valuearray, valueoffset, tempvaluearray, 0, valuelength); 203 string tempvalue= bytes.tostring(tempvaluearray); 204 205 tempmap.put(tempfamily + ":" + tempqulifier, tempvalue); 206 207 rowname = temprow; 208 string familyname = tempfamily; 209 if (familynamelist.indexof(familyname) < 0) { 210 familynamelist.add(familyname); 211 } 212 } 213 resmap.put("rowname", rowname); 214 for (string familyname : familynamelist) { 215 hashmap<string,object> tempfiltermap = new hashmap<string,object>(); 216 for (string key : tempmap.keyset()) { 217 string[] keyarray = key.split(":"); 218 if(keyarray[0].equals(familyname)){ 219 tempfiltermap.put(keyarray[1],tempmap.get(key)); 220 } 221 } 222 resmap.put(familyname, tempfiltermap); 223 } 224 225 return resmap; 226 } 227 228 //公共query查找方法 229 private static list<map<string, object>> query(string tablename, filterlist filterlist) throws exception { 230 table table = connection.gettable(tablename.valueof(tablename)); 231 scan s = new scan(); 232 s.setfilter(filterlist); 233 resultscanner rs = table.getscanner(s); 234 235 list<map<string, object>> reslist = new arraylist<map<string, object>>(); 236 for (result r : rs) { 237 map<string, object> tempmap = resulttomap(r); 238 reslist.add(tempmap); 239 } 240 table.close(); 241 return reslist; 242 } 243 244 245 //删除表 246 public static boolean delete(string tablename) throws ioexception { 247 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 248 if (admin.tableexists(tablename)) { 249 try { 250 admin.disabletable(tablename); 251 admin.deletetable(tablename); 252 admin.close(); 253 } catch (exception e) { 254 e.printstacktrace(); 255 return false; 256 }finally{ 257 admin.close(); 258 } 259 } 260 return true; 261 } 262 263 //删除columnfamily 264 public static boolean deletecolumnfamily(string tablename,string columnfamilyname) throws ioexception { 265 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 266 if (admin.tableexists(tablename)) { 267 try { 268 admin.deletecolumn(tablename,columnfamilyname); 269 } catch (exception e) { 270 e.printstacktrace(); 271 return false; 272 }finally{ 273 admin.close(); 274 } 275 } 276 return true; 277 } 278 279 //删除row 280 public static boolean deleterow(string tablename,string rowname) throws ioexception { 281 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 282 table table = connection.gettable(tablename.valueof(tablename)); 283 if (admin.tableexists(tablename)) { 284 try { 285 delete delete = new delete(rowname.getbytes()); 286 table.delete(delete); 287 } catch (exception e) { 288 e.printstacktrace(); 289 return false; 290 }finally{ 291 table.close(); 292 } 293 } 294 return true; 295 } 296 297 //删除qualifier 298 public static boolean deletequalifier(string tablename,string rowname,string columnfamilyname,string qualifiername) throws ioexception { 299 hbaseadmin admin = (hbaseadmin) connection.getadmin(); 300 table table = connection.gettable(tablename.valueof(tablename)); 301 if (admin.tableexists(tablename)) { 302 try { 303 delete delete = new delete(rowname.getbytes()); 304 delete.addcolumns(columnfamilyname.getbytes(),qualifiername.getbytes()); 305 table.delete(delete); 306 } catch (exception e) { 307 e.printstacktrace(); 308 return false; 309 }finally{ 310 table.close(); 311 } 312 } 313 return true; 314 } 315 316 }
五、hbase与mapreduce的相互调用
5.1 mapreduce 与 hbase 的关系
有朋友常说现在流程nosql架构 ,系统常用的是 spark、hbase、redis、cassandra 等技术,mapreducer 已经没落。然而本人觉得 mapreduce 与 hbase 等nosql技术并非处于对立发展的关系,反而是相辅相成,互补不足的技术。由于受到数据块、io、网络资源等限制,mapreduce 的先天架构决定其主要应用于大型文件的存取管理。然而对大量小型文件的管理方面,mapreduce 却是无能为力。因此 hbase 等技术才会诞生并迅速发展,随着 hadoop 2.0 的发展,hbase 以 hdfs 为基础,补尝了 mapreduce 在海量文件管理方面的不足。
在很多大型的*机关,金融管理,图书管理,交通航运等平台上,往往会以 mr 作为大型的持久性资源库,以 hbase 进行数据提取作为某个区域或阶段的研究对象,加以分析挖掘,最后形成汇总。有见及此,hadoop 为 mr与hbase开发出一系列的数据转换工具,方便开发人员利用。
5.2 常用类简介
5.2.1 tablemapreduceutil 绑定表关系
tablemapreduceutil为编辑人员准备的几个常用的方法,可快速地绑定mapper/reducer与hbase中table的关系
方法 | 说明 |
static void setscannercaching(job job, int batchsize) | 设置scan缓存大小,默认值为1,应按运行环境设置不适宜过大 |
static void inittablereducerjob(string table,class<? extends tablereducer> reducer, job job) | 绑定从reducer输出数据所要存入的table |
static void inittablemapperjob(string table, scan scan,class<? extends tablemapper> mapper, class<?> outputkeyclass,class<?> outputvalueclass, job job) | 绑定要从table获取到scan数据所送往的mapper |
static void inittablemapperjob(list<scan> scans,class<? extends tablemapper> mapper,class<?> outputkeyclass,class<?> outputvalueclass, job job) | 绑定要从多个table中获取到的list<scan>所送往的mapper |
5.2.2 设置数据格式
在设置job的运行条件时可使用以下方法按照需要把数据输入或输入设置为table格式
job.setinputformatclass(tableoutputformat.class);
job.setoutputformatclass(tableoutputformat.class);
5.2.3 tablemapper 与 tablereducer
为提供与 hbase 的数据对接,系统提供了tablemapper与tablereducer两个常用类
public abstract class tablemapper<keyout, valueout>
extends mapper<immutablebyteswritable, result, keyout, valueout> {
}
tablemappler 是继承了 mapper<immutablebyteswritable, result, keyout, valueout> 并以 immeutablebyteswritable对象(键值rowkey)为输入inputkey,以 result 为输入 inputvalue 的 mapper,目的是把hbase中某个table中的行数据输入到 tablemapper 进行处
上一篇: js获取select标签选中值的方法教程
下一篇: CentOS7版本基础使用