大数据之hbase(四) --- rowkey设计原则模拟通话日志,BloomFilter,phonix环境部署,hive-hbase集成
程序员文章站
2022-05-27 16:17:49
...
一、rowkey设计 -- 模拟通话日志 -------------------------------------------------- 1.建表 $hbase> create 'ns1:calllogs' , 'f1' 2.编写程序 a.编写主叫日志存放类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 测试通话日志
*/
public class TsCallLogs {
public Connection conn;
public Table tb;
@Before
public void getConn() throws Exception {
//获取配置文件
Configuration conf = HBaseConfiguration.create();
//工厂类创建连接
conn = ConnectionFactory.createConnection(conf);
//get table
TableName tbName = TableName.valueOf("ns1:calllogs");
tb = conn.getTable(tbName);
}
/**
* rowkey的设计:常用的主要指标,全部编写进来,而且要保证定长
* 区域号[0-99] , 1_id[主号码] , time , 标识[0/1 主叫/背叫] , 2_id[从属号码] , 时长
* 区域号[0-99] = (1_id + time[yyyyMM]).hash() % 100[区域数]
* @throws Exception
*/
@Test
public void tsPutLog() throws Exception {
String callerId = "13777777777"; //1_id 主叫
String calledId = "13888888888"; //2_id 被叫
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("yyyyMMDDHHmmss");
String calledTime = sdf.format(new Date()); //通话时间
int isCaller = 0; //主叫
int duration = 100; //通话时长
//为了保证定长duration需要被格式化
DecimalFormat df1 = new DecimalFormat();
df1.applyPattern("00000");
String durStr = df1.format(duration);
//获取区域号[0-99]:假设一共有100个区域服务器[100台主机],设计hash值,将号码打散
int hash = (callerId + calledTime.substring(0,6)).hashCode();
hash = (hash & Integer.MAX_VALUE) % 100; //保证hash的非负
DecimalFormat df = new DecimalFormat();
df.applyPattern("00");
String hashStr = df.format(hash);
//拼接rowkey ==> 区域号[0-99] , 1_id[主号码] , time , 标识[0/1 主叫/背叫] , 2_id[从属号码] , 时长
String rowKey = hashStr + "," + callerId + "," +calledTime + "," + isCaller + "," + calledId + "," + durStr;
//开始put数据
Put put = new Put(Bytes.toBytes(rowKey));
//add put column cile
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"), Bytes.toBytes("河北"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calledPos"), Bytes.toBytes("河南"));
tb.put(put);
System.out.println("put over");
}
}
b.编写被叫日志存放类[触发器类] -- 当主叫被触发,就往被叫里面添加记录
package ts.calllogs;
import javafx.scene.control.Tab;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* 被叫日志处理类
* 当主叫被触发,就往被叫里面添加记录
*/
public class TsCalledLogsRegionObserver extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
super.postPut(e, put, edit, durability);
TableName tName = TableName.valueOf("ns1:calllogs");
TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable();
if (tName.equals(tName1)) {
String rowKey = Bytes.toString(put.getRow());
String [] strs = rowKey.split(",");
if(strs[3].equals("1"))
{
return;
}
//99,13777777777,201809259220228,1,13888888888,00100
String newKey = Util.getHash(strs[4],strs[2]) + "," +strs[4] + "," + strs[2] + ",1," + "," +strs[1] + "," + strs[5];
//开始put数据
Put p = new Put(Bytes.toBytes(newKey));
p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("ccc"),Bytes.toBytes("nothing"));
Table tb = e.getEnvironment().getTable(tName);
tb.put(p);
System.out.println("put over");
}
}
}
c.编写打印日志类 -- 查询指定号码指定日期的通话记录
package ts.calllogs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.NavigableMap;
import java.util.Set;
/**
* 打印通话记录
*/
public class PrintLogs {
public Connection conn;
public Table tb;
@Before
public void getConn() throws Exception {
//获取配置文件
Configuration conf = HBaseConfiguration.create();
//工厂类创建连接
conn = ConnectionFactory.createConnection(conf);
//get table
TableName tbName = TableName.valueOf("ns1:calllogs");
tb = conn.getTable(tbName);
}
@Test
public void printlogs() throws Exception {
Scan scan = new Scan();
String callerId = "13888888888";
String calledTime = "201809"; //通话时间
String hash = Util.getHash(callerId, calledTime);
String startKey = hash + "," + callerId + "," + calledTime;
String endKey = hash + "," + callerId + "," + "201810";
scan.setStartRow(Bytes.toBytes(startKey));
scan.setStopRow(Bytes.toBytes(endKey));
ResultScanner scanner = tb.getScanner(scan);
Result result = null;
while((result = scanner.next()) != null) {
System.out.println(Bytes.toString(result.getRow()));
}
}
}
3.打包部署 a.注册协处理器,并分发到所有hbase节点 [hbase-site.xml] <property> <name>hbase.coprocessor.region.classes</name> <value>ts.calllogs.TsCalledLogsRegionObserver</value> </property> b.将打好的jar包分发到所有节点的/hbase/lib目录下 4.运行插入测试 二、BloomFilter 布隆过滤器 ----------------------------------------------------------------- 1.在创建表的时候可以指定布隆过滤器,共有三种模式:NONE[默认],ROW[rowkey],ROWCOL[row and column] 2.当用户需要查询特定的rowkey时,服务器需要加载每一个块来检查是否包含要检索的key,这就产生了极大的I/O资源的浪费 3.可以使用布隆过滤器来避免这种io的浪费。 4.原理就是:布隆过滤器可以快速准确的检测出,一个storefile中包不包含指定的rowkey.布隆过滤器会返回两种检索结果 -- NO--不包含,明确指出,块中没有,准确率100%。 -- MayBe-- 包含。块中可能有,准确度99% 5.API演示
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
/**
* 测试布隆过滤器
*/
public class TsBloomFilter {
public Table tb;
public Connection conn;
@Before
public void getConn() throws Exception {
//获取配置文件
Configuration conf = HBaseConfiguration.create();
//工厂类创建连接
conn = ConnectionFactory.createConnection(conf);
//get table
TableName tbName = TableName.valueOf("ns1:bloom");
tb = conn.getTable(tbName);
}
@Test
public void tsBloom() throws Exception {
Admin admin = conn.getAdmin();
TableName tableName = TableName.valueOf("ns1:bloom");
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hclo = new HColumnDescriptor("f1");
hclo.setBloomFilterType(BloomType.ROW);
desc.addFamily(hclo);
admin.createTable(desc);
System.out.println("over");
}
}
三、phonix环境部署 ------------------------------------------- 1.安装phonix a.下载apache-phoenix-4.10.0-HBase-1.2-bin.tar.gz b.tar开 c.复制xxx-server.jar 到服务器端 hbase/lib下的目录,并且分发 d.重启hbase e.使用phonix $> phonix/bin/.sqlline.py s100 //注意:连接的是zk服务器 $phonix> !tables //显示表格 $phonix> !help //查看帮助 $phonix> !sql create table test (id varchar(20) primary key , name varchar(20)) //创建表 $phonix> !describe test //查看表结构 $phonix> !drop test //删除表 $phonix> select * from test; //全表扫描 2.SQLClient安装[界面操作sql] a.下载squirrel-sql-3.7.1-standard.jar,该文件是安装文件,执行的安装程序。 b.$>jar -jar squirrel-sql-3.7.1-standard.jar $>下一步... c.复制phoenix-4.10.0-HBase-1.2-client.jar到SQuerrel安装目录的lib下(c:\myprograms\squirrel)。 d.启动SQuirrel(GUI),定位安装目录->执行squirrel-sql.bat f.打开GUI界面 g.在左侧的边栏选中"Drivers"选项卡, 点击 "+" -> URL : jdbc:phoenix:192.168.43.131 Driverclass : org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix: s100 h.在Aliases下创建用户,指定好连接的数据库 四、使用phonix和SQLClient ------------------------------------------------------- //建表 $jdbc:phoenix> create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER); //插入数据 $jdbc:phoenix> UPSERT INTO test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12); //删除数据 $jdbc:phoenix> delete from test.person where idcardnum = 1 ; //更新数据 $jdbc:phoenix> upsert into test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12); 五、hive-hbase集成:将hbase的表影射到hive上,使用hive的查询语句。 ----------------------------------------------------------------------- 1.在hive下创建hbase的表 $hive> CREATE TABLE t11(key string, name string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name") TBLPROPERTIES("hbase.table.name" = "ns1:t11"); 2.在hive下操作hbase的表 $hive> select count(*) from t11 ;
上一篇: spring mvc注入配置文件里的属性
下一篇: vue安装一些常用依赖