Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
hbase的客户端有原生java客户端,hbase shell,thrift,rest,mapreduce,webui等等。
下面是这几种客户端的常见用法。
一、原生java客户端
原生java客户端是hbase最主要,最高效的客户端。
涵盖了增删改查等api,还实现了创建,删除,修改表等ddl操作。
配置java连接hbase
java连接hbase需要两个类:
hbaseconfiguration
connectionfactory
首先,配置一个hbase连接:
比如zookeeper的地址端口 hbase.zookeeper.quorum hbase.zookeeper.property.clientport
更通用的做法是编写hbase-site.xml文件,实现配置文件的加载:
hbase-site.xml示例:
<configuration> <property> <name>hbase.master</name> <value>hdfs://host1:60000</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>host1,host2,host3</value> </property> <property> <name>hbase.zookeeper.property.clientport</name> <value>2181</value> </property> </configuration>
随后我们加载配置文件,创建连接:
config.addresource(new path(system.getenv("hbase_conf_dir"), "hbase-site.xml")); connection connection = connectionfactory.createconnection(config);
创建表
要创建表我们需要首先创建一个admin
对象
admin admin = connection.getadmin(); //使用连接对象获取admin对象 tablename tablename = tablename.valueof("test");//定义表名 htabledescriptor htd = new htabledescriptor(tablename);//定义表对象 hcolumndescriptor hcd = new hcolumndescriptor("data");//定义列族对象 htd.addfamily(hcd); //添加 admin.createtable(htd);//创建表
hbase2.x创建表
hbase2.x 的版本中创建表使用了新的 api
tablename tablename = tablename.valueof("test");//定义表名 //tabledescriptor对象通过tabledescriptorbuilder构建; tabledescriptorbuilder tabledescriptor = tabledescriptorbuilder.newbuilder(tablename); columnfamilydescriptor family = columnfamilydescriptorbuilder.newbuilder(bytes.tobytes("data")).build();//构建列族对象 tabledescriptor.setcolumnfamily(family);//设置列族 admin.createtable(tabledescriptor.build());//创建表
添加数据
table table = connection.gettable(tablename);//获取table对象 try { byte[] row = bytes.tobytes("row1"); //定义行 put put = new put(row); //创建put对象 byte[] columnfamily = bytes.tobytes("data"); //列 byte[] qualifier = bytes.tobytes(string.valueof(1)); //列族修饰词 byte[] value = bytes.tobytes("张三丰"); //值 put.addcolumn(columnfamily, qualifier, value); table.put(put); //向表中添加数据 } finally { //使用完了要释放资源 table.close(); }
获取指定行数据
//获取数据 get get = new get(bytes.tobytes("row1")); //定义get对象 result result = table.get(get); //通过table对象获取数据 system.out.println("result: " + result); //很多时候我们只需要获取“值” 这里表示获取 data:1 列族的值 byte[] valuebytes = result.getvalue(bytes.tobytes("data"), bytes.tobytes("1")); //获取到的是字节数组 //将字节转成字符串 string valuestr = new string(valuebytes,"utf-8"); system.out.println("value:" + valuestr);
扫描表中的数据
scan scan = new scan(); resultscanner scanner = table.getscanner(scan); try { for (result scannerresult: scanner) { system.out.println("scan: " + scannerresult); byte[] row = scannerresult.getrow(); system.out.println("rowname:" + new string(row,"utf-8")); } } finally { scanner.close(); }
删除表
tablename tablename = tablename.valueof("test"); admin.disabletable(tablename); //禁用表 admin.deletetable(tablename); //删除表
hbase java api表ddl完整示例:
package com.example.hbase.admin; import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.hbase.hbaseconfiguration; import org.apache.hadoop.hbase.hcolumndescriptor; import org.apache.hadoop.hbase.hconstants; import org.apache.hadoop.hbase.htabledescriptor; import org.apache.hadoop.hbase.tablename; import org.apache.hadoop.hbase.client.admin; import org.apache.hadoop.hbase.client.connection; import org.apache.hadoop.hbase.client.connectionfactory; import org.apache.hadoop.hbase.io.compress.compression.algorithm; public class example { private static final string table_name = "my_table_name_too"; private static final string cf_default = "default_column_family"; public static void createoroverwrite(admin admin, htabledescriptor table) throws ioexception { if (admin.tableexists(table.gettablename())) { admin.disabletable(table.gettablename()); admin.deletetable(table.gettablename()); } admin.createtable(table); } public static void createschematables(configuration config) throws ioexception { try (connection connection = connectionfactory.createconnection(config); admin admin = connection.getadmin()) { htabledescriptor table = new htabledescriptor(tablename.valueof(table_name)); table.addfamily(new hcolumndescriptor(cf_default).setcompressiontype(algorithm.none)); system.out.print("creating table. "); createoroverwrite(admin, table); system.out.println(" done."); } } public static void modifyschema (configuration config) throws ioexception { try (connection connection = connectionfactory.createconnection(config); admin admin = connection.getadmin()) { tablename tablename = tablename.valueof(table_name); if (!admin.tableexists(tablename)) { system.out.println("table does not exist."); system.exit(-1); } htabledescriptor table = admin.gettabledescriptor(tablename); // 更新表格 hcolumndescriptor newcolumn = new hcolumndescriptor("newcf"); newcolumn.setcompactioncompressiontype(algorithm.gz); newcolumn.setmaxversions(hconstants.all_versions); admin.addcolumn(tablename, newcolumn); // 更新列族 hcolumndescriptor existingcolumn = new hcolumndescriptor(cf_default); existingcolumn.setcompactioncompressiontype(algorithm.gz); existingcolumn.setmaxversions(hconstants.all_versions); table.modifyfamily(existingcolumn); admin.modifytable(tablename, table); // 禁用表格 admin.disabletable(tablename); // 删除列族 admin.deletecolumn(tablename, cf_default.getbytes("utf-8")); // 删除表格(需提前禁用) admin.deletetable(tablename); } } public static void main(string... args) throws ioexception { configuration config = hbaseconfiguration.create(); //添加必要配置文件(hbase-site.xml, core-site.xml) config.addresource(new path(system.getenv("hbase_conf_dir"), "hbase-site.xml")); config.addresource(new path(system.getenv("hadoop_conf_dir"), "core-site.xml")); createschematables(config); modifyschema(config); } }
二、使用hbase shell工具操作hbase
在 hbase 安装目录 bin/ 目录下使用hbase shell
命令连接正在运行的 hbase 实例。
$ ./bin/hbase shell hbase(main):001:0>
预览 hbase shell 的帮助文本
输入help
并回车, 可以看到 hbase shell 的基本信息和一些示例命令.
创建表
使用 create
创建一个表 必须指定一个表名和列族名
hbase(main):001:0> create 'test', 'cf' 0 row(s) in 0.4170 seconds => hbase::table - test
表信息
使用 list
查看存在表
hbase(main):002:0> list 'test' table test 1 row(s) in 0.0180 seconds => ["test"]
使用 describe
查看表细节及配置
hbase(main):003:0> describe 'test' table test is enabled test column families description {name => 'cf', versions => '1', evict_blocks_on_close => 'false', new_version_behavior => 'false', keep_deleted_cells => 'false', cache_data_on_write => 'false', data_block_encoding => 'none', ttl => 'forever', min_versions => '0', replication_scope => '0', bloomfilter => 'row', cache_index_on_write => 'f alse', in_memory => 'false', cache_blooms_on_write => 'false', prefetch_blocks_on_open => 'false', compression => 'none', blockcache => 'true', blocksize => '65536'} 1 row(s) took 0.9998 seconds
插入数据
使用 put
插入数据
hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1' 0 row(s) in 0.0850 seconds hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0110 seconds hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3' 0 row(s) in 0.0100 seconds
扫描全部数据
从 hbase 获取数据的途径之一就是 scan
。使用 scan 命令扫描表数据。你可以对扫描做限制。
hbase(main):006:0> scan 'test' row column+cell row1 column=cf:a, timestamp=1421762485768, value=value1 row2 column=cf:b, timestamp=1421762491785, value=value2 row3 column=cf:c, timestamp=1421762496210, value=value3 3 row(s) in 0.0230 seconds
获取一条数据
使用 get
命令一次获取一条数据
hbase(main):007:0> get 'test', 'row1' column cell cf:a timestamp=1421762485768, value=value1 1 row(s) in 0.0350 seconds
禁用表
使用 disable
命令禁用表
hbase(main):008:0> disable 'test' 0 row(s) in 1.1820 seconds hbase(main):009:0> enable 'test' 0 row(s) in 0.1770 seconds
使用 enable
命令启用表
hbase(main):010:0> disable 'test' 0 row(s) in 1.1820 seconds
删除表
hbase(main):011:0> drop 'test' 0 row(s) in 0.1370 seconds
退出 hbase shell
使用quit
命令退出命令行并从集群断开连接。
三、使用thrift客户端访问hbase
由于hbase是用java写的,因此它原生地提供了java接口,对非java程序人员,怎么办呢?幸好它提供了thrift接口服务器,因此也可以采用其他语言来编写hbase的客户端,这里是常用的hbase python接口的介绍。其他语言也类似。
1.启动thrift-server
要使用hbase的thrift接口,必须将它的服务启动,启动hbase的thrift-server进程如下:
cd /app/zpy/hbase/bin ./hbase-daemon.sh start thrift 执行jps命令检查: 34533 thriftserver
thrift默认端口是9090,启动成功后可以查看端口是否起来。
2.安装thrift所需依赖
(1)安装依赖
yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel
(2)安装boost
wget http://sourceforge.net/projects/boost/files/boost/1.53.0/boost_1_53_0.tar.gz tar xvf boost_1_53_0.tar.gz cd boost_1_53_0 ./bootstrap.sh ./b2 install
3.安装thrift客户端
官网下载 thrift-0.11.0.tar.gz,解压并安装
wget http://mirrors.hust.edu.cn/apache/thrift/0.11.0/thrift-0.11.0.tar.gz tar xzvf thrift-0.11.0.tar.gz cd thrift-0.11.0 mkdir /app/zpy/thrift ./configure --prefix=/app/zpy/thrift make make install
make可能报错如下:
g++: error: /usr/lib64/libboost_unit_test_framework.a: no such file or directory
解决:
find / -name libboost_unit_test_framework.* cp /usr/local/lib/libboost_unit_test_framework.a /usr/lib64/
4.使用python3连接hbase
安装所需包
pip install thrift pip install hbase-thrift
python 脚本如下:
from thrift import thrift from thrift.transport import tsocket from thrift.transport import ttransport from thrift.protocol import tbinaryprotocol from hbase import hbase from hbase.ttypes import * transport = tsocket.tsocket('localhost', 9090) protocol = tbinaryprotocol.tbinaryprotocol(transport) client = hbase.client(protocol) transport.open() a = client.gettablenames() print(a)
四、rest客户端
1、启动rest服务
a.启动一个非守护进程模式的rest服务器(ctrl+c 终止)
bin/hbase rest start
b.启动守护进程模式的rest服务器
bin/hbase-daemon.sh start rest
默认启动的是8080端口(可以使用参数在启动时指定端口),可以被访问。curl
apache mapreduce 是hadoop提供的软件框架,用来进行大规模数据分析. 与 mapreduce 一样,在 hbase 中也有 2 种 mapreduce api 包.org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce.前者使用旧式风格的 api,后者采用新的模式.相比于前者,后者更加灵活。 hbase mapreduce 读示例 hbase mapreduce 读写示例 hbase提供了一种web方式的用户接口,用户可以通过web界面查看hbase集群的属性等状态信息,web页面分为:master状态界面,和zookeeper统计信息页面。 默认访问地址分别是: ip:60010 ip::60030 ip:60010/zk.jsp master状态界面会看到master状态的详情。 该页面大概分hbase集群信息,任务信息,表信息,regionserver信息。每一部分又包含了一些具体的属性。 regionserver状态界面会看到regionserver状态的详情。 regionserver的节点属性信息,任务信息和region信息。 zookeeper统计信息页面是非常简单的半结构化文本打印信息。 更多实时计算,hbase,flink,kafka等相关技术博文,欢迎关注实时流式计算 本文由博客一文多发平台 openwrite 发布!2、java调用示例:
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.hbaseconfiguration;
import org.apache.hadoop.hbase.client.get;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.resultscanner;
import org.apache.hadoop.hbase.client.scan;
import org.apache.hadoop.hbase.rest.client.client;
import org.apache.hadoop.hbase.rest.client.cluster;
import org.apache.hadoop.hbase.rest.client.remotehtable;
import org.apache.hadoop.hbase.util.bytes;
import util.hbasehelper;
import java.io.ioexception;
/**
* created by root on 15-1-9.
*/
public class restexample {
public static void main(string[] args) throws ioexception {
configuration conf = hbaseconfiguration.create();
hbasehelper helper = hbasehelper.gethelper(conf);
helper.droptable("testtable");
helper.createtable("testtable", "colfam1");
system.out.println("adding rows to table...");
helper.filltable("testtable", 1, 10, 5, "colfam1");
cluster cluster=new cluster();
cluster.add("hadoop",8080);
client client=new client(cluster);
get get = new get(bytes.tobytes("row-30"));
get.addcolumn(bytes.tobytes("colfam1"), bytes.tobytes("col-3"));
result result1 = table.get(get);
system.out.println("get result1: " + result1);
scan scan = new scan();
scan.setstartrow(bytes.tobytes("row-10"));
scan.setstoprow(bytes.tobytes("row-15"));
scan.addcolumn(bytes.tobytes("colfam1"), bytes.tobytes("col-5"));
resultscanner scanner = table.getscanner(scan);
for (result result2 : scanner) {
system.out.println("scan row[" + bytes.tostring(result2.getrow()) +
"]: " + result2);
}
}
}
五、mapreduce操作hbase
mapred
and mapreduce
hbase mapreduce 示例
configuration config = hbaseconfiguration.create();
job job = new job(config, "exampleread");
job.setjarbyclass(myreadjob.class); // class that contains mapper
scan scan = new scan();
scan.setcaching(500); // 1 is the default in scan, which will be bad for mapreduce jobs
scan.setcacheblocks(false); // don't set to true for mr jobs
// set other scan attrs
...
tablemapreduceutil.inittablemapperjob(
tablename, // input hbase table name
scan, // scan instance to control cf and attribute selection
mymapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setoutputformatclass(nulloutputformat.class); // because we aren't emitting anything from mapper
boolean b = job.waitforcompletion(true);
if (!b) {
throw new ioexception("error with job!");
}
public static class mymapper extends tablemapper<text, text> {
public void map(immutablebyteswritable row, result value, context context) throws interruptedexception, ioexception {
// process data for the row from the result instance.
}
}
configuration config = hbaseconfiguration.create();
job job = new job(config,"examplereadwrite");
job.setjarbyclass(myreadwritejob.class); // class that contains mapper
scan scan = new scan();
scan.setcaching(500); // 1 is the default in scan, which will be bad for mapreduce jobs
scan.setcacheblocks(false); // don't set to true for mr jobs
// set other scan attrs
tablemapreduceutil.inittablemapperjob(
sourcetable, // input table
scan, // scan instance to control cf and attribute selection
mymapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
tablemapreduceutil.inittablereducerjob(
targettable, // output table
null, // reducer class
job);
job.setnumreducetasks(0);
boolean b = job.waitforcompletion(true);
if (!b) {
throw new ioexception("error with job!");
}
六、hbase web ui
上一篇: Oracle数据的导入与导出
下一篇: Python27之集合