hbase的增删改查
package hbase.com.cn.hbase;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
public class TestDML {
static Configuration conf=null;
static Connection connect=null;
static HTable table = null;
public static void init() throws Exception {
conf=HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “centos01:2181,centos02:2181,centos01:2181”);
connect = ConnectionFactory.createConnection(conf);
//dml操作句柄 需要给操作表
table=(HTable) connect.getTable(TableName.valueOf(“user”));
}
//数据插入
public static void insertData() throws Exception {
//put 对象 数据插入的操作对象
Put put=new Put("rk0001".getBytes());// 行簇
//参数1: 列簇 参数2:列名 参数3:值
put.add("data1".getBytes(),"name".getBytes(),"zhangsan".getBytes());
table.put(put);
}
public static void insertDatas() throws Exception {
//从本地读取文件 读取完毕插入hbase
List<Put> list=new ArrayList<>();
//从本地读取文件
BufferedReader br=new BufferedReader(new FileReader("d:/data/data/mapreduce/student.txt"));
String line=null;
int count=0;
while((line=br.readLine())!=null) {
count++;
String[] split=line.split(",");
Put p=new Put(("rk000"+count).getBytes());//行簇
if(split.length==5) {
p.add("family".getBytes(), "id".getBytes(), split[0].getBytes());
p.add("family".getBytes(), "name".getBytes(), split[1].getBytes());
p.add("family".getBytes(), "sex".getBytes(), split[2].getBytes());
p.add("family".getBytes(), "age".getBytes(), split[3].getBytes());
p.add("family".getBytes(), "dept".getBytes(), split[4].getBytes());
list.add(p);
}
}
br.close();
table.put(list);
}
public static void getData() throws Exception {
Get get=new Get("rk0001".getBytes());
Result result=table.get(get);
List<KeyValue>list=result.list();
for(KeyValue kv:list) {
System.out.print(new String(kv.getRow())+"\t");//行簇
System.out.print(new String(kv.getFamily())+"\t");//列簇
System.out.print(new String(kv.getQualifier())+"\t");//属性字段名
System.out.println(new String(kv.getValue()));
}
}
public static void scanDataFilter() throws Exception {
Scan scan=new Scan();
//Scan scan=new Scan("rk007".getBytes());//给定起始rowkwy进行扫描
/*scan.addColumn("family02".getBytes(), "id".getBytes());*/
scan.addColumn("family".getBytes(), "age".getBytes());//前面列簇 后面字段
//过滤器 前段 操作符 后面行簇
Filter fi1=new RowFilter(CompareOp.GREATER_OR_EQUAL,new BinaryComparator("rk0004".getBytes()));
Filter fi2=new ValueFilter(CompareOp.GREATER,new BinaryComparator("20".getBytes()));
List<Filter> fs=new ArrayList<>();
fs.add(fi1);
fs.add(fi2);
Filter f=new FilterList(fs);
scan.setFilter(f);//将多重过滤条件加入list集合
ResultScanner scanner = table.getScanner(scan);
//迭代器返回所有的信息
Iterator<Result> iterator = scanner.iterator();
while(iterator.hasNext()){
Result next = iterator.next();
List<KeyValue> list = next.list();
for(KeyValue kv:list){
System.out.print(new String(kv.getRow())+"\t");
System.out.print(new String(kv.getFamily())+"\t");
System.out.print(new String(kv.getQualifier())+"\t");
System.out.println(new String(kv.getValue()));
}
}
//
// Scan scan=new Scan();//全表扫描
// scan.setStartRow(“rk002”.getBytes());
// scan.setStopRow(“rk007”.getBytes());
// //scan.setFilter(filter01);//添加多次过滤器 最后一次添加的生效
//
// ResultScanner scanner = table.getScanner(scan);
// //迭代器返回的是所有信息
// Iterator iterator = scanner.iterator();
// while(iterator.hasNext()){
// Result next = iterator.next();
// List list = next.list();
// for(KeyValue kv:list){
// System.out.print(new String(kv.getRow())+"\t");
// System.out.print(new String(kv.getFamily())+"\t");
// System.out.print(new String(kv.getQualifier())+"\t");
// System.out.println(new String(kv.getValue()));
// }
// }
}
public static void main(String[] args) throws Exception {
init();
//insertData();
//insertDatas();
//getData();
scanDataFilter();
}
}
上一篇: 文件系统模块(fs)
下一篇: shell脚本返回值及其使用场景的实现