HBase封装easy-hbase设计实现
新增码云地址:https://gitee.com/hanmov5/mop-hbase-template
一、写在前面
业务架构用到hbase,但由于某些不可名状原因,没有用phoniex等上层工具,开发都是用原生的hbase api来实现逻辑,原生api虽然使用不算困难,但是在复用性和可读性方便很差,在这样的背景下,根据现有业务和现在hbase的常用方式上封装了这个简易的orm,说是orm其实不是特别准确,只能算是一个轻量级的工具框架吧,我把它称之为easy-hbase,现已在本人所在事业部广泛使用。
二、设计思路
基于现有hbase存储业务的逻辑和使用方式来设计工具的思路
hbase使用列簇设计不宜过多,一般为单个固定列簇。
hbase存储的基础数据表,比如某个订单或者某个帖子之类的,rowkey类似为主键,然后固定单个列簇里面,某个column就是基础数据的一个字段,value就是对应的值,这个实际上和关系型数据库有点类似了,这样我们需要一个封装,根据主键返回一堆字段,再映射成我们需要的对象。
由于hbase是非关系型数据库,它的查询都是基于rowkey来进行的。一些关联查询需要建立相应的索引来实现(ps:复杂的hbase查询实现有多种方式,建立索引是比较常见的),比如某个用户的发帖列表,用户相关key为rowkey,column为long最大值-发帖时间,value为帖子rowkey,这部分数据的column和value都不是固定的,区别于2的固定column值。
hbase存储的基础单位也是字节,这点跟redis都是一致的,但是不同于redis客户端将value固定为string的字节数组,hbase提供的api是允许不同类型如integer|long|string等操作的,为方便管理和代码封装,实际业务上会规定尽量使用string来存储有关数据,特殊情况下用long(主要是为了计数器的原子操作)。
hbase基础数据表查询会返回指定po,而一些索引表查询会返回不同的column和value,另外在条件查询时,我们有时候会限制返回我们需要的column或者是只取指定value(或者别的笔记关系:大于或不等于等)的column,我们需要一个基础的单元格类来承载这些功能。
一般基础数据相同的属性,我们可能会放多份,区分正序或者倒叙等,还有复杂索引的数据其也没特定的table,所以我们设计的时候是将hbase的table以参数的形式传入,而非注解,建议这些配置在统一的地方维护
三、代码实现
下面会介绍框架的整体架构和核心相关类设计
- 项目结构
- easy-hbase-core:主要包括一些基础的bean和annotation,常量和工具类
- easy-hbase-dao:主要代码,包括hbase相关操作代码封装和查询映射等,可直接spring集成使用
- easy-hbase-spring-boot-starter:一个简单的spring-boot-starter,适合spring-boot项目集成使用
- easy-hbase-spring-boot-demo:一个简单的spring-boot-demo项目,演示集成使用
2. 核心类
- hbasecolumn:field注解,用于注解po的相关属性
package com.gaoxin.mop.annotation; import java.lang.annotation.*; /** * author: mr.tan * date: 2017/08/18 * <p> * used on the field associate to the column from the hbase * </p> */ @documented @retention(retentionpolicy.runtime) @target(elementtype.field) public @interface hbasecolumn { string family() default ""; string column(); boolean exist() default true; }
family:列簇属性,用于指定特殊的列簇,项目里面有默认的全局family
column::列属性,用于po属性和hbase对应column不匹配的情况,若一致无需指定
exist:若po中某个字段不在hbase存在的话,需手动设置这个属性为false,但建议po类为纯净的po
- rowkey:field注解,用于注解po中的rowkey,若po中的属性不为rowkey值的话,需手动指定这个注解,否则将会默认field为rowkey
package com.gaoxin.mop.annotation; import java.lang.annotation.*; /** * author: mr.tan * date: 2017/08/18 * <p> * used on the field associate to the rowkey from the hbase * </p> */ @documented @retention(retentionpolicy.runtime) @target(elementtype.field) public @interface rowkey { }
- columninfo:封装最基础的单元格类,columnfamily,column和对应的value
package com.gaoxin.mop.bean; import com.gaoxin.mop.constants.hbaseconstant; import org.apache.hadoop.hbase.filter.comparefilter; /** * author: mr.tan * date: 2017/08/18 * <p> * the base entity contains columnfamily and column、value、op * default value string.class,others set customize valueclass * used on limit the back columns and filter values * </p> */ public class columninfo { private string columnfamily; private string column; private string value; private comparefilter.compareop compareoperator; private class valueclass; public columninfo() { } public columninfo(string column) { this(hbaseconstant.default_family, column, comparefilter.compareop.equal); } public columninfo(string columnfamily, string column, comparefilter.compareop compareoperator) { this.columnfamily = columnfamily; this.column = column; this.compareoperator = compareoperator; } public columninfo(string columnfamily, string column, comparefilter.compareop compareoperator,class valueclass) { this(columnfamily, column, compareoperator); this.valueclass = valueclass; } public columninfo(string column, string value) { this(hbaseconstant.default_family, column, value, comparefilter.compareop.equal); } public columninfo(string columnfamily, string column, string value) { this(columnfamily, column, value, comparefilter.compareop.equal); } public columninfo(string columnfamily, string column, string value, comparefilter.compareop compareoperator) { this(columnfamily, column, compareoperator); this.value = value; } public string getcolumnfamily() { return columnfamily; } public comparefilter.compareop getcompareoperator() { return compareoperator; } public void setcompareoperator(comparefilter.compareop compareoperator) { this.compareoperator = compareoperator; } public void setcolumnfamily(string columnfamily) { this.columnfamily = columnfamily; } public string getcolumn() { return column; } public void setcolumn(string column) { this.column = column; } public string getvalue() { return value; } public void setvalue(string value) { this.value = value; } public class getvalueclass() { return valueclass; } public void setvalueclass(class valueclass) { this.valueclass = valueclass; } }
value:依据前文的设计思路,这里我们默认value为string类型,大多数情况下也应该这样做,如果有特殊的类型,如long之类的,需指定valueclass的class
compareoperator:比较器属性,可以设置这个值用于在hbase限制返回column和值过滤的时候传入,可取的值:equal|not equal|greater等,我们这个类默认equal
- hbasefactorybean:hbase的连接初始化工厂bean,用于初始化hbase连接
package com.gaoxin.mop.config; import com.gaoxin.mop.constants.hbaseconstant; import org.apache.commons.lang.stringutils; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.hbase.hbaseconfiguration; import org.apache.hadoop.hbase.client.hconnection; import org.apache.hadoop.hbase.client.hconnectionmanager; import org.springframework.stereotype.component; import java.util.arraylist; import java.util.list; /** * author: mr.tan * date: 2017/08/18 * <p> * hbaseconstant 配置载入。初始化连接 */ @component public class hbasefactorybean { private static hbasefactorybean factorybean = null; private hbasefactorybean() { } public static hbasefactorybean getinstance() { if (factorybean == null) { factorybean = new hbasefactorybean(); } return factorybean; } private static list<hconnection> connections; private list<hbaseconfig> hbaseconfigs; public static void setconnections(list<hconnection> connections) { hbasefactorybean.connections = connections; } public void sethbaseconfigs(list<hbaseconfig> hbaseconfigs) { this.hbaseconfigs = hbaseconfigs; } public void initializeconnections() throws exception { connections = new arraylist<>(); if (hbaseconfigs == null) { throw new runtimeexception("hbase config is null error"); } for (hbaseconfig config : hbaseconfigs) { configuration configuration = hbaseconfiguration.create(); configuration.set("hbase.zookeeper.quorum", config.getzookeeperquorum()); configuration.set("hbase.zookeeper.property.clientport", stringutils.isblank(config.getzookeeperclientport()) ? hbaseconstant.default_hbase_port : config.getzookeeperclientport()); hconnection connection = hconnectionmanager.createconnection(configuration); connections.add(connection); } } public static hconnection getdefaultconnection() { return connections.get(0); } public static hconnection getspecifyconnection(int index) { if (index > connections.size() - 1) { throw new runtimeexception("hbase connection is not exist"); } return connections.get(index); } }
- hbasedao:hbase基础操作核心类
package com.gaoxin.mop.dao; import com.gaoxin.mop.bean.columninfo; import java.util.list; /** * author: mr.tan * date: 2017/08/18 */ public interface hbasedao { <t> t get(string tablename, string rowkey, list<columninfo> columns, list<columninfo> filters, class<? extends t> clazz); <t> t get(string tablename, string rowkey, class<? extends t> clazz); <t> t get(string tablename, string rowkey, list<columninfo> columns, class<? extends t> clazz); string getsinglecolumnvalue(string tablename, string rowkey, string column); <t> t getsinglecolumnvalue(string tablename, string rowkey, string column, class<? extends t> clazz); list<string> getrowkeys(string tablename); list<string> getrowkeys(string tablename, string startrow, string endrow); list<string> getrowkeys(string tablename, string startrow, string endrow, integer pagesize, string separate, integer index); list<string> getrowkeys(string tablename, string startrow, string endrow, integer pagesize, string separate); list<string> getrowkeys(string tablename, string startrow, string endrow, integer pagesize); list<string> getrowkeysbyprefix(string tablename, string prefix); list<string> getrowkeysbyprefix(string tablename, string startrow, string endrow, string prefix); list<columninfo> getcolumns(string tablename, string rowkey, string columnfamily, list<columninfo> columns, list<columninfo> filters); list<columninfo> getcolumns(string tablename, string rowkey, list<columninfo> columns, list<columninfo> filters); list<columninfo> getcolumns(string tablename, string rowkey, string columnfamily); list<columninfo> getcolumns(string tablename, string rowkey); <t> list<t> getlist(string tablename, list<string> rowkeys, class<? extends t> clazz); <t> list<t> getlist(string tablename, list<string> rowkeys,list<columninfo> columns, list<columninfo> filters, class<? extends t> clazz); <t> list<t> getlist(string tablename, class<? extends t> clazz); <t> list<t> getlist(string tablename, list<columninfo> columns, list<columninfo> filters, class<? extends t> clazz); <t> list<t> getlist(string tablename, list<columninfo> columns, list<columninfo> filters, string start, string end, class<? extends t> clazz); <t> list<t> getpagelist(string tablename, string startrow, string endrow, integer pagesize, class<? extends t> clazz); list<columninfo> getcolumnsbypage(string tablename, string rowkey, integer pageno, integer pagesize); list<columninfo> getcolumnsbypage(string tablename, string rowkey, integer pageno, integer pagesize, list<columninfo> columns, list<columninfo> filters); <t> t getcolumnobj(string tablename, string rowkey, string column,class<? extends t> clazz); <t> list<t> getcolumnobjlist(string tablename, string rowkey, list<string> columns,class<? extends t> clazz); <t> list<t> getpagecolumnobjlist(string tablename, string rowkey, integer pageno,integer pagesize,class<? extends t> clazz); <t> boolean put(string tablename, list<t> objects); <t> boolean put(string tablename, t object); boolean put(string tablename, string rowkey, string column, string value); boolean put(string tablename, string rowkey, columninfo columninfo); boolean put(string tablename, string rowkey, list<columninfo> columninfos); boolean delete(string tablename, string rowkey); boolean delete(string tablename, string rowkey, list<columninfo> list); boolean delete(string tablename, string rowkey, columninfo columninfo); boolean delete(string tablename, string rowkey, string column); long addcounter(string tablename, string rowkey, string column, long num); }
上述代码为主要核心代码,封装了总共八大类的方法:
1.获取单个po的get方法,column用于限制返回的column,filter用于批量过滤
2.获取多个po的getlist方法
3.获取单个signlecolumn、多个column的getcolumns方法,支持分页(hbase的宽表分页,基于偏移量设计)
4.支持批量put的put方法
5.支持批量delete的delete方法
6.支持原子操作的addcounter计数器方法
7.支持只获取rowkey的分页方法(基于keyonlyfilter,减少数据传输,适用于仅需要rowkey情况)
8.支持getcolumsobj适用于value是一个json对象的查询方法
3. 说明
retrieve an htableinterface implementation for access to a table. the returned htableinterface is not thread safe, a new instance should be created for each using thread. this is a lightweight operation, pooling or caching of the returned htableinterface is neither required nor desired. note that the hconnection needs to be unmanaged (created with
hconnectionmanager.createconnection(configuration)
).
- 如上述引用,hbase官方推荐hconnecton全局维护,而htablepool也被废弃,不建议使用,所以我们这里也是维护了全局的hconnection,在htable的使用上市即用即关的。
- 以上是主要的核心类,其主要映射也是通过反射来建立关系的,这里就不多说了
- 由于公司使用的hbase-client版本为0.96,所以这版本也只针对0.96,如果是更高版本的,由于部分api的改变暂不支持
- 在dao的模块里面,有相应的demo用例和对应的测试用例,测试用例写的也不规范,主要是当初内部快速开发校验下,可以作为一个验证。
- spring-boot-starter版本也很简单,只是集成了一个扫描注入而已,也有相应的demo
- 最后,奉上源码地址,有不足的地方还望海涵,敬请斧正。
上一篇: 基于vue中解决v-for使用报红并出现警告的问题
下一篇: flask中过滤器的使用详解