欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hbase的API使用(create,put,delete,get)

程序员文章站 2022-05-30 14:04:42
...

构建三张表:1.SpaceName:weibo

表1:内容表:weibo:content  列族:info(用户发布的信息),列:content K      值:content V  Version:时间戳

表2:用户关系表:weibo:relation  列族1:ateend(关注的人)列:attend K  值:attend V  列族2:fans(粉丝),列:uid k 值:  uid

表3:收件箱:weibo:index,列族:info,列:uid K   值:uid V    

三张的表的行键都是:uid+"_"+timestamp

rowKey设计都是:用户ID_timestamp

Maven用到的pom 文件


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

class1 初始化名字表格名字:

public class Content {
    //定义微博名字  Hbase 创建表 构建表空间以及表
    public static final String NAMESPACE="weibo";
    public static final String RELATION="weibo:relation";
    public static final String CONTENT="weibo:content";
    public static final String INDEX="weibo:index";

}

class2:构建Hbase的工具类

package Weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.mortbay.log.Log;

import java.io.IOException;
import java.util.ArrayList;

public class HbaseUtils {
    //创建表 需要获取配置消息
    private static Configuration configuration = HBaseConfiguration.create();
    static {
        configuration.set("hbase.zookeeper.quorum","192.168.128.111");
        configuration.set("hbase.rootdir","hdfs://192.168.128.111:9000/hbase");
    }

    //创建表需要创建表名称
    public static void createNS(String name) throws IOException {
        //获取客户端连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取连接
        Admin admin = connection.getAdmin();
        //获取admin
        NamespaceDescriptor ns = NamespaceDescriptor.create(name).build();
        //创建表空间
        admin.createNamespace(ns);
        HbaseUtils.nameSpaceExits(Content.NAMESPACE);
        close(admin,connection);
    }
    //删除表空间
    public static void deleteNS(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        admin.deleteNamespace(name);
        close(admin,connection);

    }
    //创建表
    public static void createTable(String name,int version,String...strings) throws IOException {
        //判断表空间以及表是否存在
        if(!HbaseUtils.nameSpaceExits(Content.NAMESPACE))return;
        if(!HbaseUtils.isTableExits(name))return;
        //获取连接构建表
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        //创建表 还有列族
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(name));
        //构建列族描述器
        /**
         * 1.在插入数据的时候防止数据出现意外丢失 可以开启wal 日志模式
         * ASYNC_WAL 当数据变动时,异步写WAL日志
         * SYNC_WAL 当数据变动时,同步写WAL日志
         * FSYNC_WAL 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
         * SKIP_WAL 不写WAL日志
         * USE_DEFAULT 默认级别,即SYNC_WAL
         */

        for(String s:strings){
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(Bytes.toBytes(s));
            hTableDescriptor.addFamily(hColumnDescriptor);
            // 压缩内存中和文件中的数据,默认为NONE(不压缩)
            hColumnDescriptor.setDataBlockEncoding(DataBlockEncoding.NONE);
            //bloom过滤器
            hColumnDescriptor.setBloomFilterType(BloomType.ROW);
            //数据保存的最长时间,即TTL,单位是ms
            hColumnDescriptor.setTimeToLive(18000);
            //数据存储的压缩类型,默认为无压缩(默认none)
//            hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
            //是否保存那些已经删除掉的cell
            hColumnDescriptor.setKeepDeletedCells(false);
            //设置数据保存在内存中已提高相应速度
            hColumnDescriptor.setInMemory(true);
            //块缓存,保存这每个HFile数据块的startKey
            hColumnDescriptor.setBlockCacheEnabled(true);
            //块的大小,默认是值65536 64MB
            hColumnDescriptor.setBlocksize(64 * 1024);
            hColumnDescriptor.setMaxVersions(version);
            hColumnDescriptor.setMinVersions(version);
        }
        admin.createTable(hTableDescriptor);
        close(admin,connection);
    }
    //删除表
    public static void dropTable(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        admin.disableTable(TableName.valueOf(name));
        admin.disableTable(TableName.valueOf(name));
        //关闭资源
        close(admin,connection);
    }
    //插入数据
    public static void publish(String uid,String content) throws IOException {
        //插入数据
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table table = connection.getTable(TableName.valueOf(Content.CONTENT));
        //构建rowkey = uid + timestaps
        long time = System.currentTimeMillis();
        String rowKey = uid + "_" + time;
        //构建put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
        //将数据插入到内容表里取
        table.put(put);
        //收件箱 粉丝以及你关注的对象
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        //获取粉丝信息
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));
        Result result = relTable.get(get);
        //判断
        if (result.isEmpty()){
            relTable.close();
            table.close();
            close(null,connection);
            return;
        }
        //不为空
        Table indexTable = connection.getTable(TableName.valueOf(Content.INDEX));
        //我需要将粉丝的id插入inbox中
        ArrayList<Put> puts = new ArrayList<Put>();
        for(Cell cell:result.rawCells()){
            byte[] qualifier = CellUtil.cloneQualifier(cell);
            Put put1 = new Put(qualifier);
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
            puts.add(put1);
        }
        indexTable.put(puts);
        //关闭资源
        relTable.close();
        table.close();
        indexTable.close();
        close(null,connection);
    }

    /**
     * 关注用户
     */
    public static void attend(String uid,String...strings) throws IOException {
        //关注用户  本人成为关注者的粉丝,收件箱 rowkey 就是本人 列值就是关注的用户
        if(strings.length<=0) return;
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        //将关注者插入到relation表中
        Put put = new Put(Bytes.toBytes(uid));
        ArrayList<Put> puts = new ArrayList<Put>();
        //关注好友
        for (String s:strings){
            put.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s));
            //粉丝表
            Put fanPut = new Put(Bytes.toBytes(s));
            fanPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(s), Bytes.toBytes(uid));
            puts.add(fanPut);
        }
        puts.add(put);
        relTable.put(puts);
        //更新收件箱
        Table index = connection.getTable(TableName.valueOf(Content.INDEX));
        Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
        //获取本人收件箱
        Put inPut = new Put(Bytes.toBytes(uid));
        //我收到信息都是我关注的人
        long time = System.currentTimeMillis();
        //将我关注的人信息插入的我的收件箱中
        for (String s:strings){
            //构建rowkey 的预分区
            Scan scan = new Scan(Bytes.toBytes(s + "_"), Bytes.toBytes(s + "|"));
            //获取关注者最近发布的微博信息,将微博信息插入到我收件箱中
            ResultScanner results = conTable.getScanner(scan);
            //将results 数据插入到收件箱中
            for (Result r:results){
                byte[] row = r.getRow();
                inPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), time++, row);
            }
        }
        //收件箱插入数据
        if(!inPut.isEmpty()){
            index.put(inPut);
        }
        //关流  释放资源
        conTable.close();
        index.close();
        relTable.close();
        close(null,connection);
    }
    //取消关注
    public static void delUser(String uid,String...dels) throws IOException {
        if(dels.length<=0) return;
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        Delete delete = new Delete(Bytes.toBytes(uid));
        ArrayList<Delete> deles = new ArrayList<Delete>();
        for(String d:dels){
            delete.addColumn(Bytes.toBytes("attend"), Bytes.toBytes(d));
            //删除 关注者的粉丝 我的数据
            Delete ad = new Delete(Bytes.toBytes(d));
            ad.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
            deles.add(ad);
        }
        deles.add(delete);
        relTable.delete(deles);
        //删除index中的数据
        Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
        Delete inD = new Delete(Bytes.toBytes(uid));
        for (String d:dels){
            inD.addColumn(Bytes.toBytes("info"),Bytes.toBytes(d));
        }
        inTable.delete(inD);
        inTable.close();
        relTable.close();
        close(null,connection);
    }
    //获取某个用户所有的数据信息
    public static void getData(String uid) throws IOException {
        //1.获取连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
        //g构建过滤器
        Scan scan = new Scan();
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
        scan.setFilter(rowFilter);
        //获取数据
        ResultScanner results = conTable.getScanner(scan);
        //便利结果输出
        for(Result r:results){
            for (Cell cell:r.rawCells()){
                System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        //释放资源
        conTable.close();
        close(null,connection);
    }
    //初始化用户界面首页 显示数据条目
public static void getInit(String uid) throws IOException {
        //获取用户界面的数据
    Connection connection = ConnectionFactory.createConnection(configuration);
    Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
    //获取数据  获取关注用户的信息
    Get get = new Get(Bytes.toBytes(uid));
    //限制的是该用户获取其他用户的最大信息条数
    get.setMaxVersions(1);
    Result result = inTable.get(get);
    ArrayList<Get> gets = new ArrayList<Get>();
    for (Cell cell:result.rawCells()){
        Get g = new Get(CellUtil.cloneValue(cell));
        gets.add(g);
    }
    //获取自己信息
    Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
    Result[] results = conTable.get(gets);
    //循环输出内容
    for(Result r:results){
        for (Cell cell:r.rawCells()){
            System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    //释放资源
    conTable.close();
    inTable.close();
    close(null,connection);
}
    //写个方法 判断当前表是否存在
    public static boolean isTableExits(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        boolean tableExists = admin.tableExists(TableName.valueOf(name));
        close(admin,connection);
        return tableExists;
    }

    //写个方法 判断当前namespace 是否存在
    public static boolean nameSpaceExits(String name) throws IOException {
        //查看当前表空间
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取admin
        Admin admin = connection.getAdmin();
        NamespaceDescriptor ns = admin.getNamespaceDescriptor(name);
        Log.info("获取表空间的信息"+ns.getConfiguration());
        return Boolean.TRUE;

    }
    public static void close(Admin admin,Connection connection) throws IOException {
        if(admin !=null){
            admin.close();
        }
        if (connection!=null){
            connection.close();
        }
    }

}

class3:测试类

package wang;

import java.io.IOException;

public class Weibo {

    private static void init() throws IOException {
        //创建命名空间
        HBaseUtilss.createNS(Content.NAMESPACE);

        //创建三张表
        HBaseUtilss.createTable(Content.CONTENT, 1, "info");
        HBaseUtils.createTable(Content.RELATION, 1, "attends", "fans");
        HBaseUtils.createTable(Content.INBOX, 2, "info");
    }

    /**
     * 删除重来
     *
     * @throws Exception
     */
    private static void dele() throws Exception {
        //        删除表
        HBaseUtils.dropTable(Content.CONTENT);
        HBaseUtils.dropTable(Content.INBOX);
        HBaseUtils.dropTable(Content.RELATION);
//        删除命名空间
        HBaseUtils.deleteNS(Content.NAMESPACE);
    }


    public static void main(String[] args) throws Exception {
        //初始化命名空间、三个表
//        dele();
//        init();


        //关注  第一个为关注者、后边的为fans
//        HBaseUtils.attend("1001", "1002", "1003");
//        HBaseUtils.attend("1002", "1001", "1003");

        //取消关注
        HBaseUtils.delUser("1001", "1002");

        //发布微博
//        HBaseUtils.publish("1001", "我是Andy一号!!");
//        HBaseUtils.publish("1001", "我是Andy二号!!");
//        HBaseUtils.publish("1001", "我是1001");
//        HBaseUtils.publish("1002", "我发了1条消息");
//        HBaseUtils.publish("1002", "我发了2条消息");
//        HBaseUtils.publish("1002", "我发了3条消息");
//        HBaseUtils.publish("1002", "我发了3条消息");
//        HBaseUtils.publish("1002", "我发了4条消息");
//        HBaseUtils.publish("1003", "我是1003一号");
//        HBaseUtils.publish("1003", "我是1003二号");
//        HBaseUtils.getData("1001");
        System.out.println("-------1001getInt-------");
        HBaseUtils.getInit("1001");
//        System.out.println("-------1002getInt-------");
//        HBaseUtils.getInit("1002");
//
//        System.out.println("-------1001getData-------");
//        HBaseUtils.getData("1001");
//
//        System.out.println("-------1002getData-------");
//        HBaseUtils.getData("1002");
//
//        System.out.println("-------1003getData-------");
//        HBaseUtils.getData("1003");
    }
}

 

相关标签: Hbase