欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据之hbase(四) --- rowkey设计原则模拟通话日志,BloomFilter,phonix环境部署,hive-hbase集成

程序员文章站 2022-05-27 16:17:49
...
一、rowkey设计 -- 模拟通话日志
--------------------------------------------------
    1.建表
        $hbase> create 'ns1:calllogs' , 'f1'

    2.编写程序
        a.编写主叫日志存放类
           
import org.apache.hadoop.conf.Configuration;
           import org.apache.hadoop.hbase.HBaseConfiguration;
           import org.apache.hadoop.hbase.TableName;
           import org.apache.hadoop.hbase.client.Connection;
           import org.apache.hadoop.hbase.client.ConnectionFactory;
           import org.apache.hadoop.hbase.client.Put;
           import org.apache.hadoop.hbase.client.Table;
           import org.apache.hadoop.hbase.util.Bytes;
           import org.junit.Before;
           import org.junit.Test;

           import java.io.IOException;
           import java.text.DecimalFormat;
           import java.text.SimpleDateFormat;
           import java.util.Date;

           /**
            * 测试通话日志
            */
           public class TsCallLogs {

               public Connection conn;
               public Table tb;

               @Before
               public void getConn() throws Exception {
                   //获取配置文件
                   Configuration conf = HBaseConfiguration.create();
                   //工厂类创建连接
                   conn = ConnectionFactory.createConnection(conf);
                   //get table
                   TableName tbName = TableName.valueOf("ns1:calllogs");
                   tb = conn.getTable(tbName);
               }


               /**
                * rowkey的设计:常用的主要指标,全部编写进来,而且要保证定长
                * 区域号[0-99] , 1_id[主号码] , time , 标识[0/1  主叫/背叫] , 2_id[从属号码] , 时长
                * 区域号[0-99] = (1_id + time[yyyyMM]).hash()  %   100[区域数]
                * @throws Exception
                */
               @Test
               public void tsPutLog() throws Exception {

                   String callerId = "13777777777";            //1_id 主叫
                   String calledId = "13888888888";            //2_id 被叫
                   SimpleDateFormat sdf = new SimpleDateFormat();
                   sdf.applyPattern("yyyyMMDDHHmmss");
                   String calledTime = sdf.format(new Date()); //通话时间
                   int isCaller = 0;                           //主叫
                   int duration = 100;                         //通话时长

                   //为了保证定长duration需要被格式化
                   DecimalFormat df1 = new DecimalFormat();
                   df1.applyPattern("00000");
                   String durStr = df1.format(duration);

                   //获取区域号[0-99]:假设一共有100个区域服务器[100台主机],设计hash值,将号码打散
                   int hash = (callerId + calledTime.substring(0,6)).hashCode();
                   hash =  (hash & Integer.MAX_VALUE) % 100;  //保证hash的非负

                   DecimalFormat df = new DecimalFormat();
                   df.applyPattern("00");
                   String hashStr = df.format(hash);

                   //拼接rowkey ==> 区域号[0-99] , 1_id[主号码] , time , 标识[0/1  主叫/背叫] , 2_id[从属号码] , 时长
                   String rowKey = hashStr + "," + callerId + "," +calledTime + "," + isCaller + "," + calledId + "," + durStr;

                   //开始put数据
                   Put put = new Put(Bytes.toBytes(rowKey));
                   //add put column cile
                   put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"), Bytes.toBytes("河北"));
                   put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calledPos"), Bytes.toBytes("河南"));
                   tb.put(put);
                   System.out.println("put over");
               }
           }

        b.编写被叫日志存放类[触发器类] -- 当主叫被触发,就往被叫里面添加记录
           
 package ts.calllogs;

            import javafx.scene.control.Tab;
            import org.apache.hadoop.hbase.TableName;
            import org.apache.hadoop.hbase.client.Durability;
            import org.apache.hadoop.hbase.client.Put;
            import org.apache.hadoop.hbase.client.Table;
            import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
            import org.apache.hadoop.hbase.coprocessor.ObserverContext;
            import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
            import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
            import org.apache.hadoop.hbase.util.Bytes;

            import java.io.IOException;

            /**
             * 被叫日志处理类
             * 当主叫被触发,就往被叫里面添加记录
             */
            public class TsCalledLogsRegionObserver extends BaseRegionObserver {

                @Override
                public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
                    super.postPut(e, put, edit, durability);

                    TableName tName = TableName.valueOf("ns1:calllogs");

                    TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable();

                    if (tName.equals(tName1)) {
                        String rowKey = Bytes.toString(put.getRow());
                        String [] strs = rowKey.split(",");
                        if(strs[3].equals("1"))
                        {
                            return;
                        }
                        //99,13777777777,201809259220228,1,13888888888,00100
                        String newKey = Util.getHash(strs[4],strs[2]) + "," +strs[4] + "," + strs[2] + ",1," + "," +strs[1] + "," + strs[5];
                        //开始put数据
                        Put p = new Put(Bytes.toBytes(newKey));
                        p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("ccc"),Bytes.toBytes("nothing"));
                        Table tb = e.getEnvironment().getTable(tName);
                        tb.put(p);
                        System.out.println("put over");
                    }
                }
            }

        c.编写打印日志类 -- 查询指定号码指定日期的通话记录
           
 package ts.calllogs;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.hbase.HBaseConfiguration;
            import org.apache.hadoop.hbase.TableName;
            import org.apache.hadoop.hbase.client.*;
            import org.apache.hadoop.hbase.util.Bytes;
            import org.junit.Before;
            import org.junit.Test;

            import java.io.IOException;
            import java.text.SimpleDateFormat;
            import java.util.Date;
            import java.util.NavigableMap;
            import java.util.Set;

            /**
             * 打印通话记录
             */
            public class PrintLogs {

                public Connection conn;
                public Table tb;

                @Before
                public void getConn() throws Exception {
                    //获取配置文件
                    Configuration conf = HBaseConfiguration.create();
                    //工厂类创建连接
                    conn = ConnectionFactory.createConnection(conf);
                    //get table
                    TableName tbName = TableName.valueOf("ns1:calllogs");
                    tb = conn.getTable(tbName);
                }

                @Test
                public void printlogs() throws Exception {
                    Scan scan = new Scan();
                    String callerId = "13888888888";
                    String calledTime = "201809";               //通话时间
                    String hash = Util.getHash(callerId, calledTime);
                    String startKey = hash + "," + callerId + "," + calledTime;
                    String endKey = hash + "," + callerId + "," + "201810";
                    scan.setStartRow(Bytes.toBytes(startKey));
                    scan.setStopRow(Bytes.toBytes(endKey));
                    ResultScanner scanner = tb.getScanner(scan);
                    Result result = null;
                    while((result =  scanner.next()) != null) {
                        System.out.println(Bytes.toString(result.getRow()));
                    }
                }
            }


    3.打包部署
        a.注册协处理器,并分发到所有hbase节点
            [hbase-site.xml]
            <property>
                <name>hbase.coprocessor.region.classes</name>
                <value>ts.calllogs.TsCalledLogsRegionObserver</value>
            </property>

        b.将打好的jar包分发到所有节点的/hbase/lib目录下

    4.运行插入测试


二、BloomFilter 布隆过滤器
-----------------------------------------------------------------
    1.在创建表的时候可以指定布隆过滤器,共有三种模式:NONE[默认],ROW[rowkey],ROWCOL[row and column]

    2.当用户需要查询特定的rowkey时,服务器需要加载每一个块来检查是否包含要检索的key,这就产生了极大的I/O资源的浪费

    3.可以使用布隆过滤器来避免这种io的浪费。

    4.原理就是:布隆过滤器可以快速准确的检测出,一个storefile中包不包含指定的rowkey.布隆过滤器会返回两种检索结果
        -- NO--不包含,明确指出,块中没有,准确率100%。
        -- MayBe-- 包含。块中可能有,准确度99%

    5.API演示
        
import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.hbase.HBaseConfiguration;
        import org.apache.hadoop.hbase.HColumnDescriptor;
        import org.apache.hadoop.hbase.HTableDescriptor;
        import org.apache.hadoop.hbase.TableName;
        import org.apache.hadoop.hbase.client.Admin;
        import org.apache.hadoop.hbase.client.Connection;
        import org.apache.hadoop.hbase.client.ConnectionFactory;
        import org.apache.hadoop.hbase.client.Table;
        import org.apache.hadoop.hbase.regionserver.BloomType;
        import org.junit.Before;
        import org.junit.Test;

        import java.io.IOException;

        /**
         * 测试布隆过滤器
         */
        public class TsBloomFilter {

            public Table tb;
            public Connection conn;

            @Before
            public void getConn() throws Exception {
                //获取配置文件
                Configuration conf = HBaseConfiguration.create();
                //工厂类创建连接
                conn = ConnectionFactory.createConnection(conf);
                //get table
                TableName tbName = TableName.valueOf("ns1:bloom");
                tb = conn.getTable(tbName);
            }


            @Test
            public void tsBloom() throws Exception {

                Admin admin = conn.getAdmin();
                TableName tableName = TableName.valueOf("ns1:bloom");
                HTableDescriptor desc = new HTableDescriptor(tableName);
                HColumnDescriptor hclo = new HColumnDescriptor("f1");
                hclo.setBloomFilterType(BloomType.ROW);
                desc.addFamily(hclo);
                admin.createTable(desc);
                System.out.println("over");
            }

        }


三、phonix环境部署
-------------------------------------------
    1.安装phonix
        a.下载apache-phoenix-4.10.0-HBase-1.2-bin.tar.gz
        b.tar开
        c.复制xxx-server.jar 到服务器端 hbase/lib下的目录,并且分发
        d.重启hbase
        e.使用phonix
            $> phonix/bin/.sqlline.py s100 //注意:连接的是zk服务器
            $phonix> !tables                 //显示表格
            $phonix> !help             //查看帮助
            $phonix> !sql create table test (id varchar(20) primary key , name varchar(20))     //创建表
            $phonix> !describe  test        //查看表结构
            $phonix> !drop  test            //删除表
            $phonix> select * from test;    //全表扫描


    2.SQLClient安装[界面操作sql]
        a.下载squirrel-sql-3.7.1-standard.jar,该文件是安装文件,执行的安装程序。
        b.$>jar -jar squirrel-sql-3.7.1-standard.jar
          $>下一步...
        c.复制phoenix-4.10.0-HBase-1.2-client.jar到SQuerrel安装目录的lib下(c:\myprograms\squirrel)。
        d.启动SQuirrel(GUI),定位安装目录->执行squirrel-sql.bat
        f.打开GUI界面
        g.在左侧的边栏选中"Drivers"选项卡,
            点击 "+" ->
            URL             : jdbc:phoenix:192.168.43.131
            Driverclass       : org.apache.phoenix.jdbc.PhoenixDriver
            jdbc:phoenix: s100
        h.在Aliases下创建用户,指定好连接的数据库


四、使用phonix和SQLClient
-------------------------------------------------------
    //建表
    $jdbc:phoenix> create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER);

    //插入数据
    $jdbc:phoenix> UPSERT INTO test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12);

    //删除数据
    $jdbc:phoenix> delete from test.person where idcardnum = 1 ;

    //更新数据
    $jdbc:phoenix> upsert into test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12);


五、hive-hbase集成:将hbase的表影射到hive上,使用hive的查询语句。
-----------------------------------------------------------------------
    1.在hive下创建hbase的表
        $hive> CREATE TABLE t11(key string, name string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name")
        TBLPROPERTIES("hbase.table.name" = "ns1:t11");

    2.在hive下操作hbase的表
        $hive> select count(*) from t11 ;