欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)

程序员文章站 2022-05-18 21:21:17
Hbase的客户端有原生java客户端,Hbase Shell,Thrift,Rest,Mapreduce,WebUI等等。 下面是这几种客户端的常见用法。 一、原生Java客户端 原生java客户端是hbase最主要,最高效的客户端。 涵盖了增删改查等API,还实现了创建,删除,修改表等DDL操作 ......

Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)

hbase的客户端有原生java客户端,hbase shell,thrift,rest,mapreduce,webui等等。

下面是这几种客户端的常见用法。

Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)

一、原生java客户端

原生java客户端是hbase最主要,最高效的客户端。

涵盖了增删改查等api,还实现了创建,删除,修改表等ddl操作。

配置java连接hbase

java连接hbase需要两个类:

  • hbaseconfiguration
  • connectionfactory

首先,配置一个hbase连接:

比如zookeeper的地址端口
hbase.zookeeper.quorum
hbase.zookeeper.property.clientport

更通用的做法是编写hbase-site.xml文件,实现配置文件的加载:

hbase-site.xml示例:

<configuration>

<property>
<name>hbase.master</name>
<value>hdfs://host1:60000</value>
</property>

<property>
<name>hbase.zookeeper.quorum</name>
<value>host1,host2,host3</value>
</property>

<property>
<name>hbase.zookeeper.property.clientport</name>
<value>2181</value>
</property>
</configuration>

随后我们加载配置文件,创建连接:

 config.addresource(new path(system.getenv("hbase_conf_dir"), "hbase-site.xml"));
 connection connection = connectionfactory.createconnection(config);

创建表

要创建表我们需要首先创建一个admin对象

admin admin = connection.getadmin(); //使用连接对象获取admin对象
tablename tablename = tablename.valueof("test");//定义表名

htabledescriptor htd = new htabledescriptor(tablename);//定义表对象

hcolumndescriptor hcd = new hcolumndescriptor("data");//定义列族对象

htd.addfamily(hcd); //添加

admin.createtable(htd);//创建表

hbase2.x创建表

hbase2.x 的版本中创建表使用了新的 api

tablename tablename = tablename.valueof("test");//定义表名
//tabledescriptor对象通过tabledescriptorbuilder构建;
tabledescriptorbuilder tabledescriptor = tabledescriptorbuilder.newbuilder(tablename);
columnfamilydescriptor family = columnfamilydescriptorbuilder.newbuilder(bytes.tobytes("data")).build();//构建列族对象
tabledescriptor.setcolumnfamily(family);//设置列族
admin.createtable(tabledescriptor.build());//创建表

添加数据

table table = connection.gettable(tablename);//获取table对象
try {
    byte[] row = bytes.tobytes("row1"); //定义行
    put put = new put(row);             //创建put对象
    byte[] columnfamily = bytes.tobytes("data");    //列
    byte[] qualifier = bytes.tobytes(string.valueof(1)); //列族修饰词
    byte[] value = bytes.tobytes("张三丰");    //值
    put.addcolumn(columnfamily, qualifier, value);
    table.put(put);     //向表中添加数据

} finally {
    //使用完了要释放资源
    table.close();
}

获取指定行数据

//获取数据
get get = new get(bytes.tobytes("row1"));   //定义get对象
result result = table.get(get);         //通过table对象获取数据
system.out.println("result: " + result);
//很多时候我们只需要获取“值” 这里表示获取 data:1 列族的值
byte[] valuebytes = result.getvalue(bytes.tobytes("data"), bytes.tobytes("1")); //获取到的是字节数组
//将字节转成字符串
string valuestr = new string(valuebytes,"utf-8");
system.out.println("value:" + valuestr);

扫描表中的数据

scan scan = new scan();
resultscanner scanner = table.getscanner(scan);
try {
    for (result scannerresult: scanner) {
        system.out.println("scan: " + scannerresult);
         byte[] row = scannerresult.getrow();
         system.out.println("rowname:" + new string(row,"utf-8"));
    }
} finally {
    scanner.close();
}

删除表

tablename tablename = tablename.valueof("test");
admin.disabletable(tablename);  //禁用表
admin.deletetable(tablename);   //删除表

hbase java api表ddl完整示例:

package com.example.hbase.admin;

import java.io.ioexception;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.hbase.hbaseconfiguration;
import org.apache.hadoop.hbase.hcolumndescriptor;
import org.apache.hadoop.hbase.hconstants;
import org.apache.hadoop.hbase.htabledescriptor;
import org.apache.hadoop.hbase.tablename;
import org.apache.hadoop.hbase.client.admin;
import org.apache.hadoop.hbase.client.connection;
import org.apache.hadoop.hbase.client.connectionfactory;
import org.apache.hadoop.hbase.io.compress.compression.algorithm;

public class example {

  private static final string table_name = "my_table_name_too";
  private static final string cf_default = "default_column_family";

  public static void createoroverwrite(admin admin, htabledescriptor table) throws ioexception {
    if (admin.tableexists(table.gettablename())) {
      admin.disabletable(table.gettablename());
      admin.deletetable(table.gettablename());
    }
    admin.createtable(table);
  }

  public static void createschematables(configuration config) throws ioexception {
    try (connection connection = connectionfactory.createconnection(config);
         admin admin = connection.getadmin()) {

      htabledescriptor table = new htabledescriptor(tablename.valueof(table_name));
      table.addfamily(new hcolumndescriptor(cf_default).setcompressiontype(algorithm.none));

      system.out.print("creating table. ");
      createoroverwrite(admin, table);
      system.out.println(" done.");
    }
  }

  public static void modifyschema (configuration config) throws ioexception {
    try (connection connection = connectionfactory.createconnection(config);
         admin admin = connection.getadmin()) {

      tablename tablename = tablename.valueof(table_name);
      if (!admin.tableexists(tablename)) {
        system.out.println("table does not exist.");
        system.exit(-1);
      }

      htabledescriptor table = admin.gettabledescriptor(tablename);

      // 更新表格
      hcolumndescriptor newcolumn = new hcolumndescriptor("newcf");
      newcolumn.setcompactioncompressiontype(algorithm.gz);
      newcolumn.setmaxversions(hconstants.all_versions);
      admin.addcolumn(tablename, newcolumn);

      // 更新列族
      hcolumndescriptor existingcolumn = new hcolumndescriptor(cf_default);
      existingcolumn.setcompactioncompressiontype(algorithm.gz);
      existingcolumn.setmaxversions(hconstants.all_versions);
      table.modifyfamily(existingcolumn);
      admin.modifytable(tablename, table);

      // 禁用表格
      admin.disabletable(tablename);

      // 删除列族
      admin.deletecolumn(tablename, cf_default.getbytes("utf-8"));

      // 删除表格(需提前禁用)
      admin.deletetable(tablename);
    }
  }

  public static void main(string... args) throws ioexception {
    configuration config = hbaseconfiguration.create();

    //添加必要配置文件(hbase-site.xml, core-site.xml)
    config.addresource(new path(system.getenv("hbase_conf_dir"), "hbase-site.xml"));
    config.addresource(new path(system.getenv("hadoop_conf_dir"), "core-site.xml"));
    createschematables(config);
    modifyschema(config);
  }
}

二、使用hbase shell工具操作hbase

在 hbase 安装目录 bin/ 目录下使用hbase shell命令连接正在运行的 hbase 实例。

$ ./bin/hbase shell
hbase(main):001:0>
预览 hbase shell 的帮助文本

输入help并回车, 可以看到 hbase shell 的基本信息和一些示例命令.

创建表

使用 create创建一个表 必须指定一个表名和列族名

hbase(main):001:0> create 'test', 'cf'
0 row(s) in 0.4170 seconds

=> hbase::table - test
表信息

使用 list 查看存在表

hbase(main):002:0> list 'test'
table
test
1 row(s) in 0.0180 seconds

=> ["test"]
使用 describe 查看表细节及配置
hbase(main):003:0> describe 'test'
table test is enabled
test
column families description
{name => 'cf', versions => '1', evict_blocks_on_close => 'false', new_version_behavior => 'false', keep_deleted_cells => 'false', cache_data_on_write =>
'false', data_block_encoding => 'none', ttl => 'forever', min_versions => '0', replication_scope => '0', bloomfilter => 'row', cache_index_on_write => 'f
alse', in_memory => 'false', cache_blooms_on_write => 'false', prefetch_blocks_on_open => 'false', compression => 'none', blockcache => 'true', blocksize
 => '65536'}
1 row(s)
took 0.9998 seconds
插入数据

使用 put 插入数据

hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1'
0 row(s) in 0.0850 seconds

hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2'
0 row(s) in 0.0110 seconds

hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3'
0 row(s) in 0.0100 seconds
扫描全部数据

从 hbase 获取数据的途径之一就是 scan 。使用 scan 命令扫描表数据。你可以对扫描做限制。

hbase(main):006:0> scan 'test'
row                                      column+cell
 row1                                    column=cf:a, timestamp=1421762485768, value=value1
 row2                                    column=cf:b, timestamp=1421762491785, value=value2
 row3                                    column=cf:c, timestamp=1421762496210, value=value3
3 row(s) in 0.0230 seconds
获取一条数据

使用 get 命令一次获取一条数据

hbase(main):007:0> get 'test', 'row1'
column                                   cell
 cf:a                                    timestamp=1421762485768, value=value1
1 row(s) in 0.0350 seconds
禁用表

使用 disable 命令禁用表

hbase(main):008:0> disable 'test'
0 row(s) in 1.1820 seconds

hbase(main):009:0> enable 'test'
0 row(s) in 0.1770 seconds

使用 enable 命令启用表

hbase(main):010:0> disable 'test'
0 row(s) in 1.1820 seconds
删除表
hbase(main):011:0> drop 'test'
0 row(s) in 0.1370 seconds
退出 hbase shell

使用quit命令退出命令行并从集群断开连接。

三、使用thrift客户端访问hbase

由于hbase是用java写的,因此它原生地提供了java接口,对非java程序人员,怎么办呢?幸好它提供了thrift接口服务器,因此也可以采用其他语言来编写hbase的客户端,这里是常用的hbase python接口的介绍。其他语言也类似。

1.启动thrift-server

要使用hbase的thrift接口,必须将它的服务启动,启动hbase的thrift-server进程如下:

cd /app/zpy/hbase/bin
./hbase-daemon.sh start thrift 
执行jps命令检查:
34533 thriftserver

thrift默认端口是9090,启动成功后可以查看端口是否起来。

2.安装thrift所需依赖

(1)安装依赖

yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel

(2)安装boost

wget http://sourceforge.net/projects/boost/files/boost/1.53.0/boost_1_53_0.tar.gz 
tar xvf boost_1_53_0.tar.gz 
cd boost_1_53_0 
./bootstrap.sh 
./b2 install

3.安装thrift客户端

官网下载 thrift-0.11.0.tar.gz,解压并安装

wget http://mirrors.hust.edu.cn/apache/thrift/0.11.0/thrift-0.11.0.tar.gz
tar xzvf thrift-0.11.0.tar.gz
cd thrift-0.11.0
mkdir /app/zpy/thrift
./configure --prefix=/app/zpy/thrift
make 
make install

make可能报错如下:

g++: error: /usr/lib64/libboost_unit_test_framework.a: no such file or directory

解决:

find / -name libboost_unit_test_framework.*
cp /usr/local/lib/libboost_unit_test_framework.a  /usr/lib64/

4.使用python3连接hbase

安装所需包

pip install thrift
pip install hbase-thrift

python 脚本如下:

from thrift import thrift
from thrift.transport import tsocket
from thrift.transport import ttransport
from thrift.protocol import tbinaryprotocol

from hbase import hbase
from hbase.ttypes import *

transport = tsocket.tsocket('localhost', 9090)
protocol = tbinaryprotocol.tbinaryprotocol(transport)

client = hbase.client(protocol)
transport.open()
a = client.gettablenames()
print(a)

四、rest客户端

1、启动rest服务 

  a.启动一个非守护进程模式的rest服务器(ctrl+c 终止)

     bin/hbase rest start 

  b.启动守护进程模式的rest服务器

     bin/hbase-daemon.sh start rest

默认启动的是8080端口(可以使用参数在启动时指定端口),可以被访问。curl  :8080/

2、java调用示例:

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.hbaseconfiguration;
import org.apache.hadoop.hbase.client.get;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.client.resultscanner;
import org.apache.hadoop.hbase.client.scan;
import org.apache.hadoop.hbase.rest.client.client;
import org.apache.hadoop.hbase.rest.client.cluster;
import org.apache.hadoop.hbase.rest.client.remotehtable;
import org.apache.hadoop.hbase.util.bytes;
import util.hbasehelper;
import java.io.ioexception;

/**
 * created by root on 15-1-9.
 */
public class restexample {
    public static void main(string[] args) throws ioexception {
       configuration conf = hbaseconfiguration.create();

       hbasehelper helper = hbasehelper.gethelper(conf);
       helper.droptable("testtable");
       helper.createtable("testtable", "colfam1");
       system.out.println("adding rows to table...");
       helper.filltable("testtable", 1, 10, 5, "colfam1");

       cluster cluster=new cluster();
       cluster.add("hadoop",8080);

       client client=new client(cluster);
 

       get get = new get(bytes.tobytes("row-30")); 
       get.addcolumn(bytes.tobytes("colfam1"), bytes.tobytes("col-3"));
       result result1 = table.get(get);
 
       system.out.println("get result1: " + result1);

       scan scan = new scan();
       scan.setstartrow(bytes.tobytes("row-10"));
       scan.setstoprow(bytes.tobytes("row-15"));
       scan.addcolumn(bytes.tobytes("colfam1"), bytes.tobytes("col-5"));
       resultscanner scanner = table.getscanner(scan); 
       for (result result2 : scanner) {
         system.out.println("scan row[" + bytes.tostring(result2.getrow()) +
                    "]: " + result2);
        }
    }
}

五、mapreduce操作hbase

apache mapreduce 是hadoop提供的软件框架,用来进行大规模数据分析.

mapred and mapreduce

与 mapreduce 一样,在 hbase 中也有 2 种 mapreduce api 包.org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce.前者使用旧式风格的 api,后者采用新的模式.相比于前者,后者更加灵活。

hbase mapreduce 示例

hbase mapreduce 读示例

configuration config = hbaseconfiguration.create();
job job = new job(config, "exampleread");
job.setjarbyclass(myreadjob.class);     // class that contains mapper

scan scan = new scan();
scan.setcaching(500);        // 1 is the default in scan, which will be bad for mapreduce jobs
scan.setcacheblocks(false);  // don't set to true for mr jobs
// set other scan attrs
...

tablemapreduceutil.inittablemapperjob(
  tablename,        // input hbase table name
  scan,             // scan instance to control cf and attribute selection
  mymapper.class,   // mapper
  null,             // mapper output key
  null,             // mapper output value
  job);
job.setoutputformatclass(nulloutputformat.class);   // because we aren't emitting anything from mapper

boolean b = job.waitforcompletion(true);
if (!b) {
  throw new ioexception("error with job!");
}
public static class mymapper extends tablemapper<text, text> {

  public void map(immutablebyteswritable row, result value, context context) throws interruptedexception, ioexception {
    // process data for the row from the result instance.
   }
}

hbase mapreduce 读写示例

configuration config = hbaseconfiguration.create();
job job = new job(config,"examplereadwrite");
job.setjarbyclass(myreadwritejob.class);    // class that contains mapper

scan scan = new scan();
scan.setcaching(500);        // 1 is the default in scan, which will be bad for mapreduce jobs
scan.setcacheblocks(false);  // don't set to true for mr jobs
// set other scan attrs

tablemapreduceutil.inittablemapperjob(
  sourcetable,      // input table
  scan,             // scan instance to control cf and attribute selection
  mymapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
tablemapreduceutil.inittablereducerjob(
  targettable,      // output table
  null,             // reducer class
  job);
job.setnumreducetasks(0);

boolean b = job.waitforcompletion(true);
if (!b) {
    throw new ioexception("error with job!");
}

六、hbase web ui

hbase提供了一种web方式的用户接口,用户可以通过web界面查看hbase集群的属性等状态信息,web页面分为:master状态界面,和zookeeper统计信息页面。

默认访问地址分别是:

ip:60010

ip::60030

ip:60010/zk.jsp

master状态界面会看到master状态的详情。

该页面大概分hbase集群信息,任务信息,表信息,regionserver信息。每一部分又包含了一些具体的属性。

regionserver状态界面会看到regionserver状态的详情。

regionserver的节点属性信息,任务信息和region信息。

zookeeper统计信息页面是非常简单的半结构化文本打印信息。

Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)

更多实时计算,hbase,flink,kafka等相关技术博文,欢迎关注实时流式计算

Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)

本文由博客一文多发平台 openwrite 发布!