Hbase(二)Client客户端
对于使用HBase的业务方来说,从HBase客户端到HBase服务端,再到HDFS客户端,最后到HDFS服务端,这是一整条路径,其中任何一个环节出现问题,都会影响业务的可用性并造成延迟。因此,HBase的业务方需要对HBase客户端有较好地理解,以便优化服务体验。而事实上,由于HBase本身功能的复杂性以及Region定位功能设计在客户端上,导致HBase客户端并不足够轻量级。
1、 HBase客户端实现
HBase提供了面向Java、C/C++、Python等多种语言的客户端。由于HBase本身是Java开发的,所以非Java语言的客户端需要先访问ThriftServer,然后通过ThriftServer的Java HBase客户端来请求HBase集群。对其他语言的客户端,推荐使用ThriftServer的方式来访问HBase服务。
这里主要探讨HBase社区Java客户端。
下面通过一个访问HBase集群的典型示例代码,阐述HBase客户端的用法和设计,代码如下所示:
public class TestDemo {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
public static final TableName tableName = TableName.valueOf("testTable");
public static final byte[] ROW_KEY0 = Bytes.toBytes("rowkey0");
public static final byte[] ROW_KEY1 = Bytes.toBytes("rowkey1");
public static final byte[] FAMILY = Bytes.toBytes("family");
public static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
public static final byte[] VALUE = Bytes.toBytes("value");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void test() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(conf)) {
try (Table table = conn.getTable(tableName)) {
for (byte[] rowkey : new byte[][] { ROW_KEY0, ROW_KEY1 }) {
Put put = new Put(rowkey).addColumn(FAMILY, QUALIFIER, VALUE);
table.put(put);
}
Scan scan = new Scan().withStartRow(ROW_KEY1).setLimit(1);
try (ResultScanner scanner = table.getScanner(scan)) {
List<Cell> cells = new ArrayList<>();
for (Result result : scanner) {
cells.addAll(result.listCells());
}
Assert.assertEquals(cells.size(), 1);
Cell firstCell = cells.get(0);
Assert.assertArrayEquals(CellUtil.cloneRow(firstCell), ROW_KEY1);
Assert.assertArrayEquals(CellUtil.cloneFamily(firstCell), FAMILY);
Assert.assertArrayEquals(CellUtil.cloneQualifier(firstCell), QUALIFIER);
Assert.assertArrayEquals(CellUtil.cloneValue(firstCell), VALUE);
}
}
}
}
}
这个示例是一个访问HBase的单元测试代码。我们在类TestDemo初始化前,通过HBase的HBaseTestingUtility工具启动一个运行在本地的Mini HBase集群,最后跑完所有的单元测试样例之后,同样通过HBaseTestingUtility工具清理相关资源,并关闭集群。
下面重点讲解TestDemo#test方法的实现。主要步骤如下。
步骤1:获取集群的Configuration对象。
对访问HBase集群的客户端来说,一般需要3个配置文件:hbase-site.xml、core-site.xml、hdfs-site.xml。只需把这3个配置文件放到JVM能加载的classpath下即可,然后通过如下代码即可加载到Configuration对象:
Configuration conf = HBaseConfiguration.create();
在示例中,由于HBaseTestingUtility拥有API可以方便地获取到Configuration对象,所以省去了加载Configuration对象的步骤。
步骤2:通过Configuration初始化集群Connection。
Connection是HBase客户端进行一切操作的基础,它维持了客户端到整个HBase集群的连接,例如一个HBase集群中有2个Master、5个RegionServer,那么一般来说,这个Connection会维持一个到Active Master的TCP连接和5个到RegionServer的TCP连接。
通常,一个进程只需要为一个独立的集群建立一个Connection即可,并不需要建立连接池。建立多个连接,是为了提高客户端的吞吐量,连接池是为了减少建立和销毁连接的开销,而HBase的Connection本质上是由连接多个节点的TCP链接组成,客户端的请求分发到各个不同的物理节点,因此吞吐量并不存在问题;另外,客户端主要负责收发请求,而大部分请求的响应耗时都花在服务端,所以使用连接池也不一定能带来更高的效益。
Connection还缓存了访问的Meta信息,这样后续的大部分请求都可以通过缓存的Meta信息定位到对应的RegionServer。
步骤3:通过Connection初始化Table。
Table是一个非常轻量级的对象,它实现了用户访问表的所有API操作,例如Put、Get、Delete、Scan等。本质上,它所使用的连接资源、配置信息、线程池、Meta缓存等,都来自步骤2创建的Connection对象。因此,由同一个Connection创建的多个Table,都会共享连接、配置信息、线程池、Meta缓存这些资源。在branch-1以及之前的版本中,Table并不是线程安全的类,所以并不建议在多个线程之间共享使用同一个Table实例。在HBase 2.0.0及之后的版本中,Table已经实现为线程安全类。总体上,由于Table是一个非常轻量级的对象,所以可以通过同一个Connection为每个请求创建一个Table,但要记住,在该请求执行完之后需关闭Table对象。步骤4:通过Table执行Put和Scan操作。
从示例代码中可以明显看出,HBase操作的rowkey、family、column、value等都需要先序列化成byte[],同样读取的每一个cell也是用byte[]来表示的。
以上就是访问HBase表数据的全过程。
2、 定位Meta表
HBase一张表的数据是由多个Region构成,而这些Region是分布在整个集群的RegionServer上的。那么客户端在做任何数据操作时,都要先确定数据在哪个Region上,然后再根据Region的RegionServer信息,去对应的RegionServer上读取数据。因此,HBase系统内部设计了一张特殊的表——hbase:meta表,专门用来存放整个集群所有的Region信息。hbase:meta中的hbase指的是namespace,HBase容许针对不同的业务设计不同的namespace,系统表采用统一的namespace,即hbase;meta指的是hbase这个namespace下的表名。
首先,我们来介绍一下hbase:meta表的基本结构,打开HBase Shell,我们可以看到hbase:meta表的结构如下:
./bin/hbase shell
hbase(main):001:0> describe 'hbase:meta'
hbase:meta, {TABLE_ATTRIBUTES => {IS_META => 'true', REGION_REPLICATION => '1',
coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoi
nt|536870911|'}}
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'NONE', VERSIONS => '10',
IN_MEMORY => 'true', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =>'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', CACHE_DATA_IN_L1 => 'true',
MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '8192',
REPLICATION_SCOPE => '0'}
1 row(s) in 0.2350 seconds
hbase:meta表的结构非常简单,整个表只有一个名为info的ColumnFamily。而且HBase保证hbase:meta表始终只有一个Region,这是为了确保meta表多次操作的原子性,因为HBase本质上只支持Region级别的事务(注意表结构中用到了MultiRowMutationEndpoint这个coprocessor,就是为了实现Region级别事务)。
那么,hbase:meta表内具体存放的是哪些信息呢?下图较为清晰地描述了hbase:meta表内存储的信息。
总体来说,hbase:meta的一个rowkey就对应一个Region,rowkey主要由TableName(业务表名)、StartRow(业务表Region区间的起始rowkey)、Timestamp(Region创建的时间戳)、EncodedName(上面3个字段的MD5Hex值)4个字段拼接而成。每一行数据又分为4列,分别是info:regioninfo、info:seqnumDuringOpen、info:server、info:serverstartcode。
info:regioninfo:该列对应的Value主要存储4个信息,即EncodedName、RegionName、Region的StartRow、Region的StopRow。
info:seqnumDuringOpen:该列对应的Value主要存储Region打开时的sequenceId。
info:server:该列对应的Value主要存储Region落在哪个RegionServer上。
info:serverstartcode:该列对应的Value主要存储所在RegionServer的启动Timestamp。
理解了hbase:meta表的基本信息后,就可以根据rowkey来查找业务的Region了。例如,现在需要查找micloud:note表中rowkey=userid334452'所在的Region,可以设计如下查询语句:
scan 'hbase:meta', {STARTROW=>'micloud:note,userid334452,9999999999999',REVERSED=>true,LIMIT=>1}
这里,读者可能会感到奇怪:为什么需要用一个9999999999999的timestamp,以及为什么要用反向查询Reversed Scan呢?
首先,9999999999999是13位时间戳中最大值。其次因为HBase在设计hbase:meta表的rowkey时,把业务表的StartRow(而不是StopRow)放在hbase:meta表的rowkey上。这样,如果某个Region对应的区间是[bbb,ccc),为了定位rowkey=bc的Region,通过正向Scan只会找到[bbb,ccc)这个区间的下一个区间,但是,即使我们找到了[bbb,ccc)的下一个区间,也没法快速找到[bbb,ccc)这个Region的信息。所以,采用Reversed Scan是比较合理的方案。
在理解了如何根据rowkey去hbase:meta表中定位业务表的Region之后,试着思考另外一个问题:HBase作为一个分布式数据库系统,一个大的集群可能承担数千万的查询写入请求,而hbase:meta表只有一个Region,如果所有的流量都先请求hbase:meta表找到Region,再请求Region所在的RegionServer,那么hbase:meta表的将承载巨大的压力,这个Region将马上成为热点Region,且根本无法承担数千万的流量。那么,如何解决这个问题呢?
事实上,解决思路很简单:把hbase:meta表的Region信息缓存在HBase客户端,如图所示。
HBase客户端有一个叫做MetaCache的缓存,在调用HBase API时,客户端会先去MetaCache中找到业务rowkey所在的Region,这个Region可能有以下三种情况:
·Region信息为空,说明MetaCache中没有这个rowkey所在Region的任何Cache。此时直接用上述查询语句去hbase:meta表中Reversed Scan即可,注意首次查找时,需要先读取ZooKeeper的/hbase/meta-region-server这个ZNode,以便确定hbase:meta表所在的RegionServer。在hbase:meta表中找到业务rowkey所在的Region之后,将(regionStartRow,region)这样的二元组信息存放在一个MetaCache中。这种情况极少出现,一般发生在HBase客户端到服务端连接第一次建立后的少数几个请求内,所以并不会对HBase服务端造成巨大压力。
·Region信息不为空,但是调用RPC请求对应RegionServer后发现Region并不在这个RegionServer上。这说明MetaCache信息过期了,同样直接Reversed Scan hbase:meta表,找到正确的Region并缓存。通常,某些Region在两个RegionServer之间移动后会发生这种情况。但事实上,无论是RegionServer宕机导致Region移动,还是由于Balance导致Region移动,发生的几率都极小。而且,也只会对Region移动后的极少数请求产生影响,这些请求只需要通过HBase客户端自动重试locate meta即可成功。
·Region信息不为空,且调用RPC请求到对应RegionSsrver后,发现是正确的RegionServer。绝大部分的请求都属于这种情况,也是代价极小的方案。
由于MetaCache的设计,客户端分摊了几乎所有定位Region的流量压力,避免出现所有流量都打在hbase:meta的情况,这也是HBase具备良好拓展性的基础。所谓Region级别事务,就是当多个操作落在同一个Region内时,HBase能保证这一批操作执行的原子性。如果多个操作分散在不同的Region,则无法保证这批操作的原子性。
3 Scan的复杂之处
HBase客户端的Scan操作应该是比较复杂的RPC操作。为了满足客户端多样化的数据库查询需求,Scan必须能设置众多维度的属性。常用的有startRow、endRow、Filter、caching、batch、reversed、maxResultSize、version、timeRange等。
为便于理解,我们先来看一下客户端Scan的核心流程。在上面的代码示例中,我们已经知道table.getScanner(scan)可以拿到一个scanner,然后只要不断地执行scanner.next()就能拿到一个Result,用户每次执行scanner.next(),都会尝试去名为cache的队列中拿result。如果cache队列已经为空,则会发起一次RPC向服务端请求当前scanner的后续result数据。客户端收到result列表之后,通过scanResultCache把这些results内的多个cell进行重组,最终组成用户需要的result放入到Cache中。为什么需要对RPC response中的result进行重组呢?这是因为RegionServer为了避免被当前RPC请求耗尽资源,实现了多个维度的资源限制(例如timeout、单次RPC响应最大字节数等),一旦某个维度资源达到阈值,就马上把当前拿到的cell返回给客户端。这样客户端拿到的result可能就不是一行完整的数据,因此需要对result进行重组。
理解了scanner的执行流程之后,再来理解Scan的几个重要的概念。
·caching:每次loadCache操作最多放caching个result到cache队列中。控制caching,也就能控制每次loadCache向服务端请求的数据量,避免出现某一次scanner.next()操作耗时极长的情况。
·batch:用户拿到的result中最多含有一行数据中的batch个cell。如果某一行有5个cell,Scan设的batch为2,那么用户会拿到3个result,每个result中cell个数依次为2,2,1。
·allowPartial:用户能容忍拿到一行部分cell的result。设置了这个属性,将跳过重组流程,直接把服务端收到的result返回给用户。
·maxResultSize:loadCache时单次RPC操作最多拿到maxResultSize字节的结果集。
4 HBase客户端其他要点
4.1 RPC重试配置要点
在HBase客户端到服务端的通信过程中,可能会碰到各种各样的异常。例如有以下几种导致重试的常见异常:
·待访问Region所在的RegionServer发生宕机,此时Region已经被移到一个新的RegionServer上,但由于客户端存在meta缓存,首次RPC请求仍然访问到了旧的RegionServer。后续将重试发起RPC。
·服务端负载较大,导致单次RPC响应超时。客户端后续将继续重试,直到RPC成功或者超过客户容忍最大延迟。
·访问meta表或者ZooKeeper异常。
下面我们了解一下HBase常见的几个超时参数。
1)hbase.rpc.timeout:表示单次RPC请求的超时时间,一旦单次RPC超过该时间,上层将收到TimeoutException。默认为60000ms。
2)hbase.client.retries.number:表示调用API时最多容许发生多少次RPC重试操作。默认为35次。
3)hbase.client.pause:表示连续两次RPC重试之间的休眠时间,默认为100ms。注意,HBase的重试休眠时间是按照随机退避算法计算的,若hbase.client.pause=100,则第一次RPC重试前将休眠100ms左右,第二次RPC重试前将休眠200ms左右,第三次RPC重试前将休眠300ms左右,第四次重试前将休眠500ms左右,第五次重试前将休眠1000ms左右,第六次重试则将休眠2000ms左右……也就是重试次数越多,则休眠的时间会越长。因此,若按照默认的hbase.client.retries.number=35,则可能长期卡在休眠和重试两个步骤中。
4)hbase.client.operation.timeout:表示单次API的超时时间,默认值为1200000ms。注意,get/put/delete等表操作称为一次API操作,一次API可能会有多次RPC重试,这个operation.timeout限制的是API操作的总超时。
假设某业务要求单次HBase的读请求延迟不超过1s,那么该如何设置上述4个超时参数呢?
首先,hbase.client.operation.timeout应该设成1s。其次,在SSD集群上,如果集群参数设置合适且集群服务正常,则基本可以保证p99延迟在100ms以内,因此hbase.rpc.timeout设成100ms。这里,hbase.client.pause用默认的100ms。
最后,在1s之内,第一次RPC耗时100ms,休眠100ms;第二次RPC耗时100ms,休眠200ms;第三次RPC耗时100ms,休眠300ms;第四次RPC耗时100ms,休眠500ms(不是完全线性递增的)。因此,在hbase.client.operation.timeout内,至少可执行4次RPC重试,实际中单次RPC耗时可能更短(因为有hbase.rpc.timeout保证了单次RPC最长耗时),所以hbase.client.retries.number可以稍微设大一点(保证在1s内有更多的重试,从而提高请求成功的概率),比如设成6次。
4.2 CAS接口是Region级别串行执行的,吞吐受限
HBase客户端提供一些重要的CAS(Compare And Swap)接口,例如:
boolean checkAndPut(byte[] row, byte[] family,byte[] qualifier,byte[] value, Put put)
long incrementColumnValue(byte[] row,byte[] family,byte[] qualifier,long amount)
这些接口在高并发场景下,能很好地保证读取与写入操作的原子性。例如,有多个分布式的客户端同时更新一个计数器count,可以通过increment接口来保证任意时刻只有一个客户端能成功原子地执行count++操作。
需要特别注意的是,这些CAS接口在RegionServer上是Region级别串行执行的,也就是说,同一个Region内部的多个CAS操作是严格串行执行的,不同Region间的多个CAS操作可以并行执行。
以checkAndPut为例,简要说明一下CAS的运行步骤:
1)服务端拿到Region的行锁(row lock),避免出现两个线程同时修改一行数据,从而破坏了行级别原子性的情况。
2)等待该Region内的所有写入事务都已经成功提交并在mvcc上可见。
3)通过Get操作拿到需要check的行数据,进行条件检查。若条件不符合,则终止CAS。
4)将checkAndPut的put数据持久化。
5)释放第1)步拿到的行锁。
关键在于第2)步,必须要等所有正在写入的事务成功提交并在mvcc上可见。由于branch-1的HBase是写入完成时,即先释放行锁,再sync WAL,最后推mvcc(写入吞吐更高)。所以,第1)步拿到行锁之后,若跳过第2)步则可能未读取到最新的版本。例如:两个客户端并发对x=100这行数据进行increment操作:
·客户端A读取到x=100,开始进行increment操作,将x设成101。
·注意此时客户端A行锁已释放,但A的put操作mvcc仍不可见。客户端B依旧读到老版本x=100,进行increment操作,又将x设成101。
这样,客户端认为成功执行了两次increment操作,但是服务端却只increment了一次,导致语义矛盾。
因此,对那些依赖CAS(Compare-And-Swap:指increment/append这样的读后写原子操作)接口的服务,需要意识到这个操作的吞吐是受限的,因为CAS操作本质上是Region级别串行执行的。当然,在HBase 2.x版已经调整设计,对同一个Region内的不同行可以并行执行CAS,这大大提高了Region内的CAS吞吐。
4.3 Scan Filter设置
HBase作为一个数据库系统,提供了多样化的查询过滤手段。最常用的就是Filter,例如一个表有很多个列簇,用户想找到那些列簇不为C的数据。那么,可设计一个如下的Scan:
Scan scan = new Scan();
scan.setFilter(new FamilyFilter(CompareOp.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("C"))));
如果想查询列簇不为C且Qualifier在[a,z]区间的数据,可以设计一个如下的Scan:
Scan scan = new Scan();
FamilyFilter ff =
new FamilyFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("C")));
ColumnRangeFilter qf =
new ColumnRangeFilter(Bytes.toBytes("a"), true, Bytes.toBytes("b"), true);
filterList ffilterList = new filterList(Operator.MUST_PASS_ALL, ff, qf);
scan.setFilter(ffilterList);
上面代码使用了一个带AND的FilterList来连接FamilyFilter和ColumnRangeFilter。
有了Filter,大量无效数据可以在服务端内部过滤,相比直接返回全表数据到客户端然后在客户端过滤,要高效很多。但是,HBase的Filter本身也有不少局限,如果使用不当,仍然可能出现极其低效的查询,甚至对线上集群造成很大负担。
4.4 少量写和批量写
HBase是一种对写入操作非常友好的系统,但是当业务有大批量的数据要写入HBase中时,仍会碰到写入瓶颈。为了适应不同数据量的写入场景,HBase提供了3种常见的数据写入API,如下所示。
·table.put(put):这是最常见的单行数据写入API,在服务端先写WAL,然后写MemStore,一旦MemStore写满就flush到磁盘上。这种写入方式的特点是,默认每次写入都需要执行一次RPC和磁盘持久化。因此,写入吞吐量受限于磁盘带宽、网络带宽以及flush的速度。但是,它能保证每次写入操作都持久化到磁盘,不会有任何数据丢失。最重要的是,它能保证put操作的原子性。
·table.put(List<Put>puts):HBase还提供了批量写入的接口,即在客户端缓存put,等凑足了一批put,就将这些数据打包成一次RPC发送到服务端,一次性写WAL,并写MemStore。相比第一种方式,此方法省去了多次往返RPC以及多次刷盘的开销,吞吐量大大提升。不过,这个RPC操作耗时一般都会长一点,因为一次写入了多行数据。另外,如果List内的put分布在多个Region内,则不能保证这一批put的原子性,因为HBase并不提供跨Region的多行事务,换句话说,这些put中,可能有一部分失败,一部分成功,失败的那些put操作会经历若干次重试。
·bulk load:通过HBase提供的工具直接将待写入数据生成HFile,将这些HFile直接加载到对应的Region下的CF内。在生成HFile时,在HBase服务端没有任何RPC调用,只在load HFile时会调用RPC,这是一种完全离线的快速写入方式。bulk load应该是最快的批量写手段,同时不会对线上的集群产生巨大压力。当然,在load完HFile之后,CF内部会进行Compaction,但是Compaction是异步的且可以限速,所以产生的IO压力是可控的。因此,bulk load对线上集群非常友好。
例如,我们之前碰到过一种情况,有两个集群,互为主备,其中一个集群由于工具bug导致数据缺失,想通过另一个备份集群的数据来修复异常集群。最快的方式就是,把备份集群的数据导一个快照拷贝到异常集群,然后通过CopyTable工具扫快照生成HFile,最后bulk load到异常集群,完成数据的修复。
另外的一种场景是,用户在写入大量数据后,发现选择的split keys不合适,想重新选择split keys建表。这时,也可以通过Snapshot生成HFile再bulk load的方式生成新表。
推荐阅读
-
Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
-
Windows Server 2016-Netdom Join之客户端加域(二)
-
swoole学习(二)----搭建server和client
-
Node.js中的http请求客户端示例(request client)
-
Hbase入门(二)——安装与配置
-
Elasticsearch Java Rest Client API 整理总结 (二) —— SearchAPI
-
分享Oracle 11G Client 客户端安装步骤(图文详解)
-
Oracle 11g Client客户端安装教程
-
HBase 二级索引与Coprocessor协处理器
-
通过phoenix在hbase上创建二级索引,Secondary Indexing