HBase基础知识笔记(含安装配置与开发环境避坑流程)
笔记涉及代码:https://github.com/hackeryang/Hadoop-Exercises
1.HBase是一个在HDFS上开发的面向列的分布式数据库,用于实时地随机访问超大规模数据集。原本的关系型数据库并非为大规模可伸缩的分布式处理而设计,虽然也有复制(replication)和分区(partitioning)的改善方案,让数据库能够从单个节点扩展出去,但难以安装与维护,而且会牺牲一些重要的RDBMS特性,例如连接、复杂查询、触发器、视图以及外键约束等,这些功能要么运行开销大,要么根本无法使用。HBase从另一个方向解决可伸缩线问题,能够简单地通过增加节点来达到线性扩展,它不是关系型数据库所以不支持SQL,但它可以在廉价硬件构成的集群上管理超大规模的稀疏表。简单来说,HBase表和RDBMS中的表类似,只不过它的单元格有版本,行是排序的,只要列族预先存在,客户端随时可以把列添加到列族中去。
2.应用把数据存放在带标签的表中,单元格(cell)由行和列的交叉坐标决定,是有版本的。默认情况版本号自动分配,为HBase插入单元格时的时间戳,单元格的内容是未解释的字节数组,如下所示为存储照片的HBase表:
表中行的键也是字节数组,表中的行根据行的键(即表的主键)进行排序,排序根据字节序进行,所有对表的访问都要通过表的主键。行中的列被分成“列族”(column family)。同一个列族的所有成员具有相同的前缀,因此像列info:format和info:geo都是列族info的成员,而contents:image则属于contents族。列族和修饰符之间始终以冒号分隔,冒号前为列族前缀,冒号后为列族修饰符。
一个表的列族(前缀)必须作为表模式定义的一部分预先给出,但是新的列族成员可以按需要以后加入。例如只要表中已经有了列族info,客户端可以以后提供新的列info:camera并存储值。由于调优和存储都是在列族这个层次进行的,所以最好使所有列族成员都有相同的访问模式(access pattern)和大小特征。例如上述存储照片的表,图像数据比较大(兆字节),因而和较小的元数据(千字节)分别存储在不同列族中。
3.HBase自动把表的水平方向划分为区域(region)。每个区域由表中部分行组成。一开始一个表只有一个区域,随着区域开始变大,等到超出设定的大小阈值后,会在某行的边界上把表分成两个大小基本相同的新分区。在第一次划分前,所有加载的数据都放在原始区域所在服务器上,随着表变大,区域的个数会增加,区域是HBase集群上分布数据的最小单位。这样一个太大无法放在单台服务器上的表会被放到服务器集群上,其中每个节点都负责管理表所有区域的一个子集,表的加载也是用这种方式把数据分布到各个节点。
4.与HDFS和YARN由客户端、从属机(slave)和协调主控机(master)组成类似,HBase由一个master节点协调管理一至多个regionserver从属机,如下所示:
HBase主控机负责启动(bootstrap)一个全新安装,把表的区域分配给注册的regionserver,恢复regionserver故障等。master的负载很轻,regionserver负责零至多个表区域的管理以及响应客户端的读写请求。regionserver还负责region的划分并通知HBase master有了新的子区域(daughter region),这样master可以把父区域设为离线,用子区域代替父区域。
HBase依赖于ZooKeeper,默认情况ZooKeeper管理一个集合体(ensemble)作为集群的“权威机构”(authority),负责管理诸如hbase:meta目录表的位置以及当前集群主控机地址等重要信息。如果表区域的分配过程中有服务器崩溃,就可以通过ZooKeeper进行分配的协调。在启动一个客户端到HBase集群的连接时,客户端必须至少拿到集群所传递的ZooKeeper集合体的位置,这样客户端才能访问ZooKeeper的层次结构,从而了解集群属性。
与通过$HADOOP/etc/hadoop/slaves文件查看datanode和节点管理器列表一样,regionserver从属机节点列表在HBase的/conf/regionservers文件中。HBase集群的站点配置(site-specific configuration)在HBase的/conf/hbase-site.xml和/conf/hbase-env.sh文件中。默认情况下,HBase会将存储写入本地文件系统,所以通常都会把HBase的存储配置为指向要使用的HDFS集群。
5.HBase内部保留名为hbase:meta的特殊目录表(catalog table)。它们维护着当前集群上所有区域的列表、状态和位置。hbase:meta表中的项使用区域名作为键,区域名由所属表名、区域起始行、区域创建时间以及对其整体进行的MD5哈希值组成。例如表TestTable中起始行为xyz的区域的名称如下所示:
在表名、起始行、时间戳中间用逗号分隔,MD5哈希值用前后两个句号包围。行的键是排序的,因此要查找一个特定行所在区域只需在目录表中找到第一个键大于等于给定行键的项即可。区域变化时,目录表会进行相应更新。
新连接到ZooKeeper集群上的客户端首先查找hbase:meta的位置,然后客户端通过查找合适的hbase:meta区域来获取用户空间区域所在节点及其位置。接着,客户端就可以直接和管理那个区域的regionserver进行交互。每个行操作可能要访问三次远程节点,为了节省该代价,客户端会缓存它们遍历hbase:meta时获取的信息。缓存的不仅有位置信息,还有用户空间区域的开始行和结束行,这样以后不需要访问hbase:meta表也能得知区域存放的位置。客户端会一直使用缓存的项,直到查询缓存出错,即区域被移动了,才会再次查看hbase:meta获取区域的新位置。如果hbase:meta区域也被移动了,客户端会重新查找。
到达Regionserver的写操作首先追加到“提交日志”(commit log)中,然后加入内存中的memstore。如果memstore满,其中的内容会被写入(flush)文件系统。提交日志存放在HDFS中,因此即使一个regionserver崩溃,提交日志依然可用。如果发现一个regionserver无法访问,主控机会根据区域对死掉的regionserver的提交日志进行分割。重新分配后,在打开并使用死掉的regionserver的区域之前,这些区域会找到属于它们的从被分割提交日志中得到的文件。这些更新会被重做(replay)使区域恢复到服务器失败前的状态。
在读数据时首先查看区域的memstore,如果在内存里的memsotre中找到了需要的版本,查询结束,否则需要按照次序从新到旧检查“刷新文件”(flush file),知道找到满足查询的版本或所有刷新文件被遍历为止。有一个后台进程负责在刷新文件个数到达一个阈值时压缩它们。它把多个文件重新写入一个文件。在regionserver上,另外有个独立进程监控着刷新文件的大小,一旦文件大小超出预先设定的最大值,便对区域进行分割。
6.HBase的伪分布式安装配置流程如下所示:
(1)如果使用的是CDH版Hadoop,可以安装CDH版的HBase(网址为http://archive-primary.cloudera.com/cdh5/cdh/5/)。然后在浏览器按Ctrl+F查找hbase,找到最新版的tar.gz包下载(目前为5.15.0)。
(2)下载后用tar -zvxf /<路径>/hbase-1.2.0-cdh5.15.0.tar.gz -C /<指定目录>命令将该tar.gz包解压到想要的位置下,然后进入HBase所在目录的/conf/hbase-env.sh文件,修改JAVA_HOME变量的值,指向JDK所在位置(可以用echo $JAVA_HOME命令查看JDK位置),如下所示:
同时,在hbase-env.sh文件中将一行export HBASE_MANAGES_ZK=true取消注释,使之生效。
接着,将同目录下hbase-site.xml设置如下属性,设置HBase数据存储目录以及是否设为分布式:
(3)为HBase添加系统环境变量,用sudo vim /etc/profile命令编辑系统环境变量文件,按i键进入编辑模式后,加入HBASE_HOME变量并将PATH变量添加HBase的内容,并按ESC退出编辑模式后按冒号并输入wq保存退出,如下所示:
然后在命令行输入source /etc/profile使环境变量文件立刻生效,就不必进入HBase的根目录直接在任何路径下启用HBase。
(4)测试是否可以直接使用HBase,在命令行输入hbase命令,会显示命令提示,如下所示:
7.创建HBase实例的命令如下所示(启动HBase前要先启动Hadoop),默认使用/tmp目录作为存储目录:
在单机模式下,HBase主控机、regionserver和ZooKeeper实例都在同一个JVM中运行,只会启动HMaster进程,而伪分布模式和全分布模式还会启动HQuorumPeer和HRegionServer进程,如下所示:
默认HBase的临时数据会被写入到/${java.io.tmpdir}/hbase-${user.name}中,即/tmp/<hbase-用户名>文件夹,一般需要设置$HBASE_HOME/conf/hbase-site.xml文件中的hbase.tmp.dir属性改变存储目录。。要管理HBase实例,输入以下命令启动HBase的shell环境:
要新建一个表首先要为表起名,并为其定义模式。一个表的模式包含表的属性和列族的列表。列族的属性包括列族是否应该在文件系统中被压缩存储以及一个单元格要保存多少个版本等。模式可以修改但在修改时需要将表设为“离线”状态(offline)。在hbase shell中使用disable命令可以把表设为离线,使用alter命令可以进行必要的修改,而enable命令可以把表重新设置为“在线”状态(online)。
例如要新建一个名为test的表,使其只包含一个名为data的列,表和列族属性都为默认值,使用以下命令:
如果前面有命令没有成功完成,shell环境会提示错误并显示堆栈跟踪信息(stack trace)。可以检查HBase日志目录中的主控机日志,默认在$HBASE_HOME/logs目录下。为了验证新表是否创建成功,运行list命令会输出用户空间中的所有表:
在列族data中三个不同的行和列上插入数据,读取第一行,以及列出表的内容的例子如下所示:
若要移除这个表,首先要将它设为“离线”,然后删除:
运行以下命令来关闭HBase:
8.要编写HBase的Java代码,需要将如下Maven依赖添加到pom.xml文件(各依赖项的写法可以参考如下链接:https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_hadoop_api_dependencies.html):
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.15.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.15.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.15.0</version>
<scope>provided</scope>
</dependency>
与上面用hbase shell创建与管理表等功能相同的HBase客户端代码如下所示:
package HBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class ExampleClient { //基本的HBase表管理与访问,功能与在hbase shell中创建与管理表相同
public static void main(String[] args) throws IOException {
Configuration config= HBaseConfiguration.create(); //获取HBase的配置文件实例
//创建表
Connection connection=ConnectionFactory.createConnection(config); //根据配置文件创建Connection实例,可以从Connection实例中获取旧API中的HBaseAdmin和HTable对象的功能
try{
Admin admin=connection.getAdmin(); //Admin对象用于管理HBase集群,添加和丢弃表,代替旧API中的HBaseAdmin对象
try{
TableName tableName= TableName.valueOf("test"); //设置表名
HTableDescriptor htd=new HTableDescriptor(tableName);
HColumnDescriptor hcd=new HColumnDescriptor("data"); //设置列族
htd.addFamily(hcd); //添加列族
admin.createTable(htd); //根据描述器的设置创建test表
HTableDescriptor[] tables=admin.listTables(); //在HBase实例中列出所有的表并放入一个表描述器数组中
if(tables.length!=1 && Bytes.equals(tableName.getName(),tables[0].getTableName().getName())){ //如果未成功创建表则报错
throw new IOException("Failed create of table");
}
//插入三条输入、获取一行的值、遍历整个表的数据
Table table=connection.getTable(tableName); //Table对象用于访问指定的表,通过Connection实例获得以代替旧API中的HTable
try{
for(int i=1;i<=3;i++){
byte[] row=Bytes.toBytes("row"+i); //rowi作为表中每行数据的行键
Put put=new Put(row); //每行作为一个输入
byte[] columnFamily=Bytes.toBytes("data");
byte[] qualifier=Bytes.toBytes(String.valueOf(i));
byte[] value=Bytes.toBytes("value"+i);
put.add(columnFamily,qualifier,value); //插入三行数据,包括列族和列族内的三列,以及值
table.put(put); //将一条输入插入到表中,相当于hbase shell中的put命令
}
Get get=new Get(Bytes.toBytes("row1")); //获取第一行的数据
Result result=table.get(get); //相当于hbase shell中的get命令
System.out.println("Get: "+result);
Scan scan=new Scan();
ResultScanner scanner=table.getScanner(scan); //遍历获取整个test表的数据,相当于hbase shell中的scan命令
try{
for(Result scannerResult:scanner){
System.out.println("Scan: "+scannerResult);
}
}finally{
scanner.close(); //HBase扫描器使用后需要关闭
}
//使表“离线”并删除表
admin.disableTable(tableName);
admin.deleteTable(tableName);
}finally{
table.close();
}
}finally{
admin.close();
}
}finally{
connection.close();
}
}
}
其中ResultScanner会在幕后每次获取100行数据并显示在客户端,每次获取和缓存行数由hbase.client.scanner.caching属性决定,也可以通过setCaching()方法设置预读取和缓存行数。较大缓存值可以加速扫描器运行,但也会在客户端使用较多内存,如果缓存值太高会导致客户端还没处理完一批数据的时候扫描器已超时。在扫描器超时之前客户端若没能再次访问服务器,扫描器在服务器的资源会被垃圾回收,默认扫描器超时时间为60秒,由hbase.client.scanner.timeout.period设置,若超时会收到UnknownScannerException异常。
将上述代码打包成jar后,使用以下命令运行程序:
export HBASE_CLASSPATH=/mnt/sda6/hbaseExample.jar
hbase HBase.ExampleClient
输出结果如下所示:
输出字段以斜杠分隔且含义如下:行名称、列名称、单元格时间戳、单元格类型、值的字节数组长度和一个内部HBase字段。
9.可以将HBase作为MapReduce作业的输入和输出,TableInputFormat类可以在区域边界进行分割,使map能够获得单个区域进行处理,也可以把reduce的结果写入HBase。下列代码例子使用TableInputFormat运行一个map任务以计算行数,如下所示:
package HBase;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class SimpleRowCounter extends Configured implements Tool { //计算HBase表中行数的MapReduce程序
static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> { //TableMapper设定map的输入类型由TableInputFormat来传递,输入的键为行键,值为扫描的行结果
public static enum Counters {ROWS}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
context.getCounter(Counters.ROWS).increment(1); //每获得表的一行输入,枚举的计数器加一
}
}
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: SimpleRowCounter <tablename>");
return -1;
}
String tableName=args[0];
Scan scan=new Scan();
scan.setFilter(new FirstKeyOnlyFilter()); //运行服务器端任务时,只用每行的第一个单元格填充mapper中的Result对象
Job job=new Job(getConf(),getClass().getSimpleName());
job.setJarByClass(getClass());
TableMapReduceUtil.initTableMapperJob(tableName,scan,RowCounterMapper.class,ImmutableBytesWritable.class,Result.class,job); //用表名、扫描器、Mapper类、输出键值对的类和作业对象来设置TableMap作业
job.setNumReduceTasks(0); //设置reducer数量为0
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new SimpleRowCounter(),args); //exitCode被赋予上面run()方法中最后job.waitForCompletion()的返回值
System.exit(exitCode);
}
}
10.HDFS和MapReduce对于读写单独的记录效率很低,因此HBase可以填补这方面的缺点。举个例子:(1)创建stations表,行的键是stationid,包含一个列族info,其中有info:name、info:location以及info:description列。(2)创建observations表,行的键是stationid和逆序时间戳构成的组合键,包含一个列族data,有data:airtemp列。在hbase shell环境中,使用如下命令定义表:
在数据量极大的情况下,可以将原始输入数据复制到HDFS,接着运行MapReduce作业,就能读到输入数据并写入已创建的HBase表。将HDFS中的文件数据插入到HBase表的代码如下所示:
package HBase;
import MapReduceProperties.NcdcStationMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.util.Map;
public class HBaseStationImporter extends Configured implements Tool { //用于将HDFS中的stations-fixed-width.txt数据导入到stations表中
@Override
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: HBaseStationImporter <input>");
return -1;
}
Configuration config=HBaseConfiguration.create(); //获取HBase的配置文件实例
Connection connection= ConnectionFactory.createConnection(config); //根据配置文件创建Connection实例,可以从Connection实例中获取旧API中的HBaseAdmin和HTable对象的功能
try{
TableName tableName= TableName.valueOf("stations");
Table table=connection.getTable(tableName); //获取stations表的实例,通过Table对象和Connection.getTable()方法代替旧API中的HTable对象
try{
NcdcStationMetadata metadata=new NcdcStationMetadata();
metadata.initialize(new File(args[0]));
Map<String,String> stationIdToNameMap=metadata.getStationIdToNameMap();
for(Map.Entry<String,String> entry:stationIdToNameMap.entrySet()){
Put put=new Put(Bytes.toBytes(entry.getKey()));
put.add(HBaseStationQuery.INFO_COLUMNFAMILY,HBaseStationQuery.NAME_QUALIFIER,Bytes.toBytes(entry.getValue())); //向stations表中的info:name列插入气象站名称
put.add(HBaseStationQuery.INFO_COLUMNFAMILY,HBaseStationQuery.DESCRIPTION_QUALIFIER,Bytes.toBytes("(unknown)"));
put.add(HBaseStationQuery.INFO_COLUMNFAMILY,HBaseStationQuery.LOCATION_QUALIFIER,Bytes.toBytes("(unknown)"));
table.put(put);
}
}finally{
table.close();
}
}finally{
connection.close();
}
return 0;
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseStationImporter(),args);
System.exit(exitCode);
}
}
与上述代码相配合,根据行键查询HBase表中一行数据的代码如下所示:
package HBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
public class HBaseStationQuery extends Configured implements Tool { //从stations表中根据行键stationId来查询一行数据
static final byte[] INFO_COLUMNFAMILY= Bytes.toBytes("info"); //设置列族info
static final byte[] NAME_QUALIFIER=Bytes.toBytes("name"); //设置列族冒号后的修饰符,即具体列info:name
static final byte[] LOCATION_QUALIFIER=Bytes.toBytes("location"); //设置列族冒号后的修饰符,即具体列info:location
static final byte[] DESCRIPTION_QUALIFIER=Bytes.toBytes("description"); //设置列族冒号后的修饰符,即具体列info:description
public Map<String,String> getStationInfo(Table table, String stationId) throws IOException {
Get get=new Get(Bytes.toBytes(stationId)); //获取行键为stationId的一行数据
get.addFamily(INFO_COLUMNFAMILY); //只获得指定列族的数据,即info列族
Result res=table.get(get); //根据HBase表对象以及Get对象的设置获得数据
if(res==null){
return null;
}
Map<String,String> resultMap=new LinkedHashMap<String,String>(); //将获取的查询结果Result对象转换为更便于使用的由String类型键值对构成的Map
resultMap.put("name",getValue(res,INFO_COLUMNFAMILY,NAME_QUALIFIER));
resultMap.put("location",getValue(res,INFO_COLUMNFAMILY,LOCATION_QUALIFIER));
resultMap.put("description",getValue(res,INFO_COLUMNFAMILY,DESCRIPTION_QUALIFIER));
return resultMap;
}
private static String getValue(Result res,byte[] cf,byte[] qualifier){ //从Result对象中获取具体列内单元格的值
byte[] value=res.getValue(cf,qualifier);
return value==null?"":Bytes.toString(value);
}
@Override
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: HBaseStationQuery <Station_id>");
return -1;
}
Configuration config=HBaseConfiguration.create(); //获取HBase的配置文件实例
Connection connection= ConnectionFactory.createConnection(config); //根据配置文件创建Connection实例,可以从Connection实例中获取旧API中的HBaseAdmin和HTable对象的功能
try{
TableName tableName=TableName.valueOf("stations"); //设置表名
Table table=connection.getTable(tableName); //获取stations表的实例,通过Table对象和Connection.getTable()方法代替旧API中的HTable对象
try{
Map<String,String> stationInfo=getStationInfo(table,args[0]); //根据命令行中输入的观测站ID来从HBase表中生成Map类型的数据
if(stationInfo==null){
System.err.printf("Station ID %s not found.\n",args[0]);
return -1;
}
for(Map.Entry<String,String> station:stationInfo.entrySet()){
System.out.printf("%s\t%s\n",station.getKey(),station.getValue()); //遍历打印出对应观测站ID的三条resultMap中的键值对
}
return 0;
}finally{
table.close();
}
}finally{
connection.close();
}
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseStationQuery(),args);
System.exit(exitCode);
}
}
将上述代码打包为jar,并将所需输入文件放入HDFS后,使用如下命令运行导入数据和查询数据:
export HBASE_CLASSPATH=/mnt/sda6/hbase-examples.jar
hbase HBase.HBaseStationImporter /input/stations-fixed-width.txt
hbase HBase.HBaseStationQuery 011990-99999
同样,将NCDC温度数据导入到HBase的observations表的代码如下所示:
package HBase;
import MapReduceApplication.NcdcRecordParser;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class HBaseTemperatureImporter extends Configured implements Tool { //从HDFS向observations表导入NCDC气温数据的MapReduce作业
static class HBaseTemperatureMapper<K> extends Mapper<LongWritable, Text,K, Put>{
private NcdcRecordParser parser=new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value.toString());
if(parser.isValidTemperature()){
byte[] rowKey=RowKeyConverter.makeObservationRowKey(parser.getStationId(),parser.getObservationDate().getTime()); //用观测站ID和观测时间创建HBase表的行键
Put p=new Put(rowKey);
p.add(HBaseTemperatureQuery.DATA_COLUMNFAMILY,HBaseTemperatureQuery.AIRTEMP_QUALIFIER, Bytes.toBytes(parser.getAirTemperature())); //将有效气温值添加到HBase的observations表的data:airtemp列
context.write(null,p); //将map匹配到的输入数据写出到Put对象
}
}
}
@Override
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: HBaseTemperatureImporter <input>");
return -1;
}
Job job=new Job(getConf(),getClass().getSimpleName());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job,new Path(args[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,"observations"); //设置作业输出的HBase表的名称为observations
job.setMapperClass(HBaseTemperatureMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseTemperatureImporter(),args); //exitCode被赋予上面run()方法中最后job.waitForCompletion()的返回值
System.exit(exitCode);
}
}
从observations表中查询前10行数据的代码如下所示:
package HBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
public class HBaseTemperatureQuery extends Configured implements Tool { //检索HBase表中某范围内气象站观测数据行
static final byte[] DATA_COLUMNFAMILY= Bytes.toBytes("data"); //设置列族
static final byte[] AIRTEMP_QUALIFIER=Bytes.toBytes("airtemp"); //设置列族冒号后面的后缀具体列,即data:airtemp
public NavigableMap<Long,Integer> getStationObservations(Table table,String stationId,long maxStamp,int maxCount) throws IOException {
byte[] startRow=RowKeyConverter.makeObservationRowKey(stationId,maxStamp); //获得第一行数据的行键,即观测站ID与逆序时间戳的组合键
NavigableMap<Long,Integer> resultMap=new TreeMap<Long,Integer>(); //创建一个已排序的并可以进行导航(如获取大于等于某对象的键值对)的键值对对象
Scan scan=new Scan(startRow); //定义扫描数据的起始行键
scan.addColumn(DATA_COLUMNFAMILY,AIRTEMP_QUALIFIER); //限制扫描HBase表所返回的列
ResultScanner scanner=table.getScanner(scan); //遍历获取整个observations表的数据,相当于hbase shell中的scan命令
try{
Result res;
int count=0;
while((res=scanner.next())!=null && count++ < maxCount){ //当还能扫描到下一行数据并且还没有超过10行
byte[] row=res.getRow(); //找到当前行键
byte[] value=res.getValue(DATA_COLUMNFAMILY,AIRTEMP_QUALIFIER); //获取当前行的data:airtemp列的单元格的值
Long stamp=Long.MAX_VALUE-Bytes.toLong(row,row.length-Bytes.SIZEOF_LONG,Bytes.SIZEOF_LONG); //将逆序时间戳还原为正常时间戳,即MAX_VALUE-(MAX_VALUE-timestamp)
Integer temp=Bytes.toInt(value);
resultMap.put(stamp,temp); //将正常时间戳和温度值放入resultMap
}
}finally{
scanner.close();
}
return resultMap;
}
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: HBaseTemperatureQuery <station_id>");
return -1;
}
Configuration config=HBaseConfiguration.create(); //获取HBase的配置文件实例
Connection connection=ConnectionFactory.createConnection(config); //根据配置文件创建Connection实例,可以从Connection实例中获取旧API中的HBaseAdmin和HTable对象的功能
try{
TableName tableName=TableName.valueOf("observations"); //设置表名
Table table=connection.getTable(tableName); //获取observations表的实例,通过Table对象和Connection.getTable()方法代替旧API中的HTable对象
try{
NavigableMap<Long,Integer> observations=getStationObservations(table,args[0],Long.MAX_VALUE,10).descendingMap(); //请求最近10个观测值,并调用descendingMap()使返回值以降序排列
for(Map.Entry<Long,Integer> observation:observations.entrySet()){
//打印日期,时间以及温度
System.out.printf("%1$tF %1$tR\t%2$s\n",observation.getKey(),observation.getValue());
}
return 0;
}finally{
table.close();
}
}finally{
connection.close();
}
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseTemperatureQuery(),args);
System.exit(exitCode);
}
}
打包jar并放好输入数据后,用如下命令运行:
hbase HBase.HBaseTemperatureImporter /input/NCDC.txt
hbase HBase.HBaseTemperatureQuery 011990-99999
如果发现报错,在http://localhost:8088/下查看该作业的历史记录,并查看logs日志的时候发现以下错误,则说明MapReduce作业没有找到HBase相关类的路径,需要导入类路径:
将HBase类路径导入Hadoop的类路径可以在$HADOOP_HOME/etc/hadoop/hadoop-env.sh文件里添加以下路径,以包括HBase的/lib目录下所有的jar:
export HBASE_HOME=/mnt/sda6/Hadoop/hbase-1.2.0-cdh5.15.0
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/*
11.上述的用MapReduce以及TableOutputFormat将数据导入HBase,或者使用HBase原生Client API导入的方式在导入大量数据时效率低下,在一个新的HBase表中,一开始只有一个区域,此时所有更新都会插入到该区域,直到足够大区域开始分裂为止,这样导入数据在开始时比较慢,直到有足够多的区域分布在多个RegionServer上为止,且会频繁进行flush和split等I/O操作,对节点稳定性会有影响。因此可以使用“批量加载”使数据导入表的速度提高一个数量级。基本思想是先把输入数据通过MapReduce生成HFile这种中间数据,存放在HDFS,然后将HDFS中的HFile批量写入到HBase表中。代码例子如下所示:
package HBase;
import MapReduceApplication.NcdcRecordParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class HBaseTemperatureBulkImporter extends Configured implements Tool { //HBase的批量加载,以HFile格式先把数据写入HDFS,再将HFile从HDFS写入到HBase,加快导入速度
static class HBaseTemperatureMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
private NcdcRecordParser parser=new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value.toString());
if(parser.isValidTemperature()){
byte[] rowKey=RowKeyConverter.makeObservationRowKey(parser.getStationId(),parser.getObservationDate().getTime()); //用观测站ID和观测时间创建HBase表的行键
Put p=new Put(rowKey);
p.add(HBaseTemperatureQuery.DATA_COLUMNFAMILY,HBaseTemperatureQuery.AIRTEMP_QUALIFIER, Bytes.toBytes(parser.getAirTemperature())); //将有效气温值添加到HBase的observations表的data:airtemp列
context.write(new ImmutableBytesWritable(rowKey),p); //将map匹配到的输入数据写出到Put对象,map输出的键为行键的不可变字节
}
}
}
@Override
public int run(String[] args) throws Exception {
if(args.length!=1){
System.err.println("Usage: HBaseTemperatureBulkImporter <input>");
return -1;
}
Configuration conf= HBaseConfiguration.create(getConf());
Job job=new Job(conf,getClass().getSimpleName()); //设置作业配置,作业名称为类名
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job,new Path(args[0]));
Path tmpPath=new Path("/tmp/bulk"); //写入HFile的目录
FileOutputFormat.setOutputPath(job,tmpPath);
job.setMapperClass(HBaseTemperatureMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HTable table=new HTable(conf,"observations"); //设置作业输出的HBase表的名称为observations
try{
HFileOutputFormat2.configureIncrementalLoad(job,table); //设置作业输出的键值对的类,设置作业的OutputFormat类,依据当前表中的Region边界自动设置TotalOrderPartitioner以进行分区,使每个HFile恰好适应一个Region
if(!job.waitForCompletion(true)){
return 1;
}
LoadIncrementalHFiles loader=new LoadIncrementalHFiles(conf);
loader.doBulkLoad(tmpPath,table); //将HDFS上的HFile写入到observations表中
FileSystem.get(conf).delete(tmpPath,true); //将HFile写入observations表后,删除HDFS上批量加载的HFile目录/tmp/bulk
return 0;
}finally{
table.close();
}
}
public static void main(String[] args) throws Exception{
int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseTemperatureBulkImporter(),args);
System.exit(exitCode);
}
}
打包jar并放好输入文件后,用以下命令运行:
export HBASE_CLASSPATH=/mnt/sda6/hbase-examples.jar
hbase HBase.HBaseTemperatureBulkImporter /input/NCDC.txt
再用hbase HBase.HBaseTemperatureQuery 011990-99999命令运行一次上面的查询observations表内最近前十条观测站ID为011990-99999的数据,结果如下所示:
12.HBase与RDBMS的比较:
(1)HBase是一个分布式的、面向列的数据存储系统,通过在HDFS上提供实时的随机读写来弥补HDFS读写延迟较高、RDBMS无法进行大规模可伸缩的分布式处理的问题:表可以很高(数十亿个数据行)、表可以很宽(数百万个列)、可以在上千个普通商用机上自动复制。而RDBMS是模式固定、面向行的数据库并具有ACID性质和复杂的SQL查询处理引擎。RDBMS具有二级索引,内连接和外链接等复杂连接和属性。
(2)对于大多数中小规模应用,RDBMS够用,但是如果要在数据规模和并发读写等方面进行大规模扩展,RDBMS会损失很多性能,也很难进行分布式处理,而强行进行扩展会放松或放弃ACID限制等RDBMS的原有特性。
(3)HBase没有索引的概念,行和列都是顺序存储的,不存在索引膨胀的问题,插入性能和表的大小无关。而RDBMS的写性能随着表规模变大而越来越慢。
除了无索引插入性能不受影响,HBase还有以下特定:
(1)自动分区。表增长时会自动分裂成区域,分布到不同RegionServer上。
(2)线性扩展和对于新节点自动处理。增加一个节点后表的区域会自动负载均衡。
(3)普通商用硬件支持。多个普通商用机器比一个高性能高端商用机器还便宜。而RDBMS往往为单节点,需要支持大量I/O的昂贵硬件。
(4)容错。大量节点使得单个节点失效不会有大问题。
(5)批处理。可用MapReduce并行分布式处理表数据。
13.HBase的区域文件在启动时就被打开,并在处理过程中始终保持打开状态,这样可以节省每次访问操作打开文件的开销。这样不久就可能达到系统和Hadoop设定的文件描述符数量限制,每个打开的文件在远程datanode上至少占用一个文件描述符。一个进程默认的文件描述符限制为1024。如果使用的描述符个数超过文件系统的ulimit值,会在日志中看到“Too many open files”的错误信息,需要增加文件描述符的ulimit参数值。Hadoop2的datanode上同时运行的线程数不能超过4096,可以通过在hdfs-site.xml中的dfs.datanode.max.transfer.threads属性更改。
HBase的web界面可以通过http://<HMaster-host>:60010访问HBase集群的状态,例如集群负载,请求频率,表的列表和加入的regionserver等。点击某个regionserver会列出当前regionserver所有区域的列表以及其他的属性值,例如使用的资源和请求频率,如下所示: