Hbase的API使用(create,put,delete,get)
程序员文章站
2022-05-30 14:04:42
...
构建三张表:1.SpaceName:weibo
表1:内容表:weibo:content 列族:info(用户发布的信息),列:content K 值:content V Version:时间戳
表2:用户关系表:weibo:relation 列族1:ateend(关注的人)列:attend K 值:attend V 列族2:fans(粉丝),列:uid k 值: uid
表3:收件箱:weibo:index,列族:info,列:uid K 值:uid V
三张的表的行键都是:uid+"_"+timestamp
rowKey设计都是:用户ID_timestamp
Maven用到的pom 文件
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
class1 初始化名字表格名字:
public class Content {
//定义微博名字 Hbase 创建表 构建表空间以及表
public static final String NAMESPACE="weibo";
public static final String RELATION="weibo:relation";
public static final String CONTENT="weibo:content";
public static final String INDEX="weibo:index";
}
class2:构建Hbase的工具类
package Weibo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.mortbay.log.Log;
import java.io.IOException;
import java.util.ArrayList;
public class HbaseUtils {
//创建表 需要获取配置消息
private static Configuration configuration = HBaseConfiguration.create();
static {
configuration.set("hbase.zookeeper.quorum","192.168.128.111");
configuration.set("hbase.rootdir","hdfs://192.168.128.111:9000/hbase");
}
//创建表需要创建表名称
public static void createNS(String name) throws IOException {
//获取客户端连接
Connection connection = ConnectionFactory.createConnection(configuration);
//获取连接
Admin admin = connection.getAdmin();
//获取admin
NamespaceDescriptor ns = NamespaceDescriptor.create(name).build();
//创建表空间
admin.createNamespace(ns);
HbaseUtils.nameSpaceExits(Content.NAMESPACE);
close(admin,connection);
}
//删除表空间
public static void deleteNS(String name) throws IOException {
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
admin.deleteNamespace(name);
close(admin,connection);
}
//创建表
public static void createTable(String name,int version,String...strings) throws IOException {
//判断表空间以及表是否存在
if(!HbaseUtils.nameSpaceExits(Content.NAMESPACE))return;
if(!HbaseUtils.isTableExits(name))return;
//获取连接构建表
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
//创建表 还有列族
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(name));
//构建列族描述器
/**
* 1.在插入数据的时候防止数据出现意外丢失 可以开启wal 日志模式
* ASYNC_WAL 当数据变动时,异步写WAL日志
* SYNC_WAL 当数据变动时,同步写WAL日志
* FSYNC_WAL 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
* SKIP_WAL 不写WAL日志
* USE_DEFAULT 默认级别,即SYNC_WAL
*/
for(String s:strings){
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(Bytes.toBytes(s));
hTableDescriptor.addFamily(hColumnDescriptor);
// 压缩内存中和文件中的数据,默认为NONE(不压缩)
hColumnDescriptor.setDataBlockEncoding(DataBlockEncoding.NONE);
//bloom过滤器
hColumnDescriptor.setBloomFilterType(BloomType.ROW);
//数据保存的最长时间,即TTL,单位是ms
hColumnDescriptor.setTimeToLive(18000);
//数据存储的压缩类型,默认为无压缩(默认none)
// hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
//是否保存那些已经删除掉的cell
hColumnDescriptor.setKeepDeletedCells(false);
//设置数据保存在内存中已提高相应速度
hColumnDescriptor.setInMemory(true);
//块缓存,保存这每个HFile数据块的startKey
hColumnDescriptor.setBlockCacheEnabled(true);
//块的大小,默认是值65536 64MB
hColumnDescriptor.setBlocksize(64 * 1024);
hColumnDescriptor.setMaxVersions(version);
hColumnDescriptor.setMinVersions(version);
}
admin.createTable(hTableDescriptor);
close(admin,connection);
}
//删除表
public static void dropTable(String name) throws IOException {
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
admin.disableTable(TableName.valueOf(name));
admin.disableTable(TableName.valueOf(name));
//关闭资源
close(admin,connection);
}
//插入数据
public static void publish(String uid,String content) throws IOException {
//插入数据
Connection connection = ConnectionFactory.createConnection(configuration);
//获取表
Table table = connection.getTable(TableName.valueOf(Content.CONTENT));
//构建rowkey = uid + timestaps
long time = System.currentTimeMillis();
String rowKey = uid + "_" + time;
//构建put对象
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
//将数据插入到内容表里取
table.put(put);
//收件箱 粉丝以及你关注的对象
Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
//获取粉丝信息
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relTable.get(get);
//判断
if (result.isEmpty()){
relTable.close();
table.close();
close(null,connection);
return;
}
//不为空
Table indexTable = connection.getTable(TableName.valueOf(Content.INDEX));
//我需要将粉丝的id插入inbox中
ArrayList<Put> puts = new ArrayList<Put>();
for(Cell cell:result.rawCells()){
byte[] qualifier = CellUtil.cloneQualifier(cell);
Put put1 = new Put(qualifier);
put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
puts.add(put1);
}
indexTable.put(puts);
//关闭资源
relTable.close();
table.close();
indexTable.close();
close(null,connection);
}
/**
* 关注用户
*/
public static void attend(String uid,String...strings) throws IOException {
//关注用户 本人成为关注者的粉丝,收件箱 rowkey 就是本人 列值就是关注的用户
if(strings.length<=0) return;
Connection connection = ConnectionFactory.createConnection(configuration);
Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
//将关注者插入到relation表中
Put put = new Put(Bytes.toBytes(uid));
ArrayList<Put> puts = new ArrayList<Put>();
//关注好友
for (String s:strings){
put.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s));
//粉丝表
Put fanPut = new Put(Bytes.toBytes(s));
fanPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(s), Bytes.toBytes(uid));
puts.add(fanPut);
}
puts.add(put);
relTable.put(puts);
//更新收件箱
Table index = connection.getTable(TableName.valueOf(Content.INDEX));
Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
//获取本人收件箱
Put inPut = new Put(Bytes.toBytes(uid));
//我收到信息都是我关注的人
long time = System.currentTimeMillis();
//将我关注的人信息插入的我的收件箱中
for (String s:strings){
//构建rowkey 的预分区
Scan scan = new Scan(Bytes.toBytes(s + "_"), Bytes.toBytes(s + "|"));
//获取关注者最近发布的微博信息,将微博信息插入到我收件箱中
ResultScanner results = conTable.getScanner(scan);
//将results 数据插入到收件箱中
for (Result r:results){
byte[] row = r.getRow();
inPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), time++, row);
}
}
//收件箱插入数据
if(!inPut.isEmpty()){
index.put(inPut);
}
//关流 释放资源
conTable.close();
index.close();
relTable.close();
close(null,connection);
}
//取消关注
public static void delUser(String uid,String...dels) throws IOException {
if(dels.length<=0) return;
Connection connection = ConnectionFactory.createConnection(configuration);
Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
Delete delete = new Delete(Bytes.toBytes(uid));
ArrayList<Delete> deles = new ArrayList<Delete>();
for(String d:dels){
delete.addColumn(Bytes.toBytes("attend"), Bytes.toBytes(d));
//删除 关注者的粉丝 我的数据
Delete ad = new Delete(Bytes.toBytes(d));
ad.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deles.add(ad);
}
deles.add(delete);
relTable.delete(deles);
//删除index中的数据
Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
Delete inD = new Delete(Bytes.toBytes(uid));
for (String d:dels){
inD.addColumn(Bytes.toBytes("info"),Bytes.toBytes(d));
}
inTable.delete(inD);
inTable.close();
relTable.close();
close(null,connection);
}
//获取某个用户所有的数据信息
public static void getData(String uid) throws IOException {
//1.获取连接
Connection connection = ConnectionFactory.createConnection(configuration);
Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
//g构建过滤器
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
scan.setFilter(rowFilter);
//获取数据
ResultScanner results = conTable.getScanner(scan);
//便利结果输出
for(Result r:results){
for (Cell cell:r.rawCells()){
System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//释放资源
conTable.close();
close(null,connection);
}
//初始化用户界面首页 显示数据条目
public static void getInit(String uid) throws IOException {
//获取用户界面的数据
Connection connection = ConnectionFactory.createConnection(configuration);
Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
//获取数据 获取关注用户的信息
Get get = new Get(Bytes.toBytes(uid));
//限制的是该用户获取其他用户的最大信息条数
get.setMaxVersions(1);
Result result = inTable.get(get);
ArrayList<Get> gets = new ArrayList<Get>();
for (Cell cell:result.rawCells()){
Get g = new Get(CellUtil.cloneValue(cell));
gets.add(g);
}
//获取自己信息
Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
Result[] results = conTable.get(gets);
//循环输出内容
for(Result r:results){
for (Cell cell:r.rawCells()){
System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
}
}
//释放资源
conTable.close();
inTable.close();
close(null,connection);
}
//写个方法 判断当前表是否存在
public static boolean isTableExits(String name) throws IOException {
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
boolean tableExists = admin.tableExists(TableName.valueOf(name));
close(admin,connection);
return tableExists;
}
//写个方法 判断当前namespace 是否存在
public static boolean nameSpaceExits(String name) throws IOException {
//查看当前表空间
Connection connection = ConnectionFactory.createConnection(configuration);
//获取admin
Admin admin = connection.getAdmin();
NamespaceDescriptor ns = admin.getNamespaceDescriptor(name);
Log.info("获取表空间的信息"+ns.getConfiguration());
return Boolean.TRUE;
}
public static void close(Admin admin,Connection connection) throws IOException {
if(admin !=null){
admin.close();
}
if (connection!=null){
connection.close();
}
}
}
class3:测试类
package wang;
import java.io.IOException;
public class Weibo {
private static void init() throws IOException {
//创建命名空间
HBaseUtilss.createNS(Content.NAMESPACE);
//创建三张表
HBaseUtilss.createTable(Content.CONTENT, 1, "info");
HBaseUtils.createTable(Content.RELATION, 1, "attends", "fans");
HBaseUtils.createTable(Content.INBOX, 2, "info");
}
/**
* 删除重来
*
* @throws Exception
*/
private static void dele() throws Exception {
// 删除表
HBaseUtils.dropTable(Content.CONTENT);
HBaseUtils.dropTable(Content.INBOX);
HBaseUtils.dropTable(Content.RELATION);
// 删除命名空间
HBaseUtils.deleteNS(Content.NAMESPACE);
}
public static void main(String[] args) throws Exception {
//初始化命名空间、三个表
// dele();
// init();
//关注 第一个为关注者、后边的为fans
// HBaseUtils.attend("1001", "1002", "1003");
// HBaseUtils.attend("1002", "1001", "1003");
//取消关注
HBaseUtils.delUser("1001", "1002");
//发布微博
// HBaseUtils.publish("1001", "我是Andy一号!!");
// HBaseUtils.publish("1001", "我是Andy二号!!");
// HBaseUtils.publish("1001", "我是1001");
// HBaseUtils.publish("1002", "我发了1条消息");
// HBaseUtils.publish("1002", "我发了2条消息");
// HBaseUtils.publish("1002", "我发了3条消息");
// HBaseUtils.publish("1002", "我发了3条消息");
// HBaseUtils.publish("1002", "我发了4条消息");
// HBaseUtils.publish("1003", "我是1003一号");
// HBaseUtils.publish("1003", "我是1003二号");
// HBaseUtils.getData("1001");
System.out.println("-------1001getInt-------");
HBaseUtils.getInit("1001");
// System.out.println("-------1002getInt-------");
// HBaseUtils.getInit("1002");
//
// System.out.println("-------1001getData-------");
// HBaseUtils.getData("1001");
//
// System.out.println("-------1002getData-------");
// HBaseUtils.getData("1002");
//
// System.out.println("-------1003getData-------");
// HBaseUtils.getData("1003");
}
}