通用MapReduce程序复制HBase表数据
程序员文章站
2022-06-04 18:55:34
编写mr程序,让其可以适合大部分的hbase表数据导入到hbase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(...
编写mr程序,让其可以适合大部分的hbase表数据导入到hbase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。
原始表test1数据如下:
每个row key都有两个版本的数据,这里只显示了row key为1的数据
在hbase shell 中创建数据表:
create 'test2',{name => 'cf1',versions => 10} // 保存无版本、无列导入设置、无列导出设置的数据 create 'test3',{name => 'cf1',versions => 10} // 保存无版本、无列导入设置、有列导出设置的数据 create 'test4',{name => 'cf1',versions => 10} // 保存无版本、有列导入设置、无列导出设置的数据 create 'test5',{name => 'cf1',versions => 10} // 保存有版本、无列导入设置、无列导出设置的数据 create 'test6',{name => 'cf1',versions => 10} // 保存有版本、无列导入设置、有列导出设置的数据 create 'test7',{name => 'cf1',versions => 10} // 保存有版本、有列导入设置、无列导出设置的数据 create 'test8',{name => 'cf1',versions => 10} // 保存有版本、有列导入设置、有列导出设置的数据
main函数入口:
package generalhbasetohbase; import org.apache.hadoop.util.toolrunner; public class drivertest { public static void main(string[] args) throws exception { // 无版本设置、无列导入设置,无列导出设置 string[] myargs1= new string[]{ "test1", // 输入表 "test2", // 输出表 "0", // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表 "-1", // 列导入设置,如果为-1 ,则没有设置列导入 "-1" // 列导出设置,如果为-1,则没有设置列导出 }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs1); // 无版本设置、有列导入设置,无列导出设置 string[] myargs2= new string[]{ "test1", "test3", "0", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs2); // 无版本设置,无列导入设置,有列导出设置 string[] myargs3= new string[]{ "test1", "test4", "0", "-1", "cf1:c1,cf1:c10,cf1:c14" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs3); // 有版本设置,无列导入设置,无列导出设置 string[] myargs4= new string[]{ "test1", "test5", "2", "-1", "-1" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs4); // 有版本设置、有列导入设置,无列导出设置 string[] myargs5= new string[]{ "test1", "test6", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs5); // 有版本设置、无列导入设置,有列导出设置 string[] myargs6= new string[]{ "test1", "test7", "2", "-1", "cf1:c1,cf1:c10,cf1:c14" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs6); // 有版本设置、有列导入设置,有列导出设置 string[] myargs7= new string[]{ "test1", "test8", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "cf1:c1,cf1:c10,cf1:c14" }; toolrunner.run(hbasedriver.getconfiguration(), new hbasedriver(), myargs7); } }
driver:
package generalhbasetohbase; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.hbase.client.put; import org.apache.hadoop.hbase.client.scan; import org.apache.hadoop.hbase.io.immutablebyteswritable; import org.apache.hadoop.hbase.mapreduce.tablemapreduceutil; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.util.tool; import util.jarutil; public class hbasedriver extends configured implements tool{ public static string fromtable=""; //导入表 public static string totable=""; //导出表 public static string setversion=""; //是否设置版本 // args => {fromtable,totable,setversion,columnfromtable,columntotable} @override public int run(string[] args) throws exception { if(args.length!=5){ system.err.println("usage:\n demo.job.hbasedriver <input> <inputtable> " + "<output> <outputtable>" +"< versions >" + " <set columns from inputtable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> " + "<set columns from outputtable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>"); return -1; } configuration conf = getconf(); fromtable = args[0]; totable = args[1]; setversion = args[2]; conf.set("setversion", setversion); if(!args[3].equals("-1")){ conf.set("columnfromtable", args[3]); } if(!args[4].equals("-1")){ conf.set("columntotable", args[4]); } string jobname ="from table "+fromtable+ " ,import to "+ totable; job job = job.getinstance(conf, jobname); job.setjarbyclass(hbasedriver.class); scan scan = new scan(); // 判断是否需要设置版本 if(setversion != "0" || setversion != "1"){ scan.setmaxversions(integer.parseint(setversion)); } // 设置hbase表输入:表名、scan、mapper类、mapper输出键类型、mapper输出值类型 tablemapreduceutil.inittablemapperjob( fromtable, scan, hbasetohbasemapper.class, immutablebyteswritable.class, put.class, job); // 设置hbase表输出:表名,reducer类 tablemapreduceutil.inittablereducerjob(totable, null, job); // 没有 reducers, 直接写入到 输出文件 job.setnumreducetasks(0); return job.waitforcompletion(true) ? 0 : 1; } private static configuration configuration; public static configuration getconfiguration(){ if(configuration==null){ /** * todo 了解如何直接从windows提交代码到hadoop集群 * 并修改其中的配置为实际配置 */ configuration = new configuration(); configuration.setboolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务 configuration.set("fs.defaultfs", "hdfs://master:8020");// 指定namenode configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定资源分配器 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver configuration.set("hbase.master", "master:16000"); configuration.set("hbase.rootdir", "hdfs://master:8020/hbase"); configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3"); configuration.set("hbase.zookeeper.property.clientport", "2181"); //todo 需export->jar file ; 设置正确的jar包所在位置 configuration.set("mapreduce.job.jar",jarutil.jar(hbasedriver.class));// 设置jar包路径 } return configuration; } }
mapper:
package generalhbasetohbase; import java.io.ioexception; import java.util.arraylist; import java.util.hashmap; import java.util.hashset; import java.util.map.entry; import java.util.navigablemap; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.hbase.cell; import org.apache.hadoop.hbase.keyvalue; import org.apache.hadoop.hbase.client.put; import org.apache.hadoop.hbase.client.result; import org.apache.hadoop.hbase.io.immutablebyteswritable; import org.apache.hadoop.hbase.mapreduce.tablemapper; import org.apache.hadoop.hbase.util.bytes; import org.slf4j.logger; import org.slf4j.loggerfactory; public class hbasetohbasemapper extends tablemapper<immutablebyteswritable, put> { logger log = loggerfactory.getlogger(hbasetohbasemapper.class); private static int versionnum = 0; private static string[] columnfromtable = null; private static string[] columntotable = null; private static string column1 = null; private static string column2 = null; @override protected void setup(context context) throws ioexception, interruptedexception { configuration conf = context.getconfiguration(); versionnum = integer.parseint(conf.get("setversion", "0")); column1 = conf.get("columnfromtable",null); if(!(column1 == null)){ columnfromtable = column1.split(","); } column2 = conf.get("columntotable",null); if(!(column2 == null)){ columntotable = column2.split(","); } } @override protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception { context.write(key, resulttoput(key,value)); } /*** * 把key,value转换为put * @param key * @param value * @return * @throws ioexception */ private put resulttoput(immutablebyteswritable key, result value) throws ioexception { hashmap<string, string> ftablemap = new hashmap<>(); hashmap<string, string> ttablemap = new hashmap<>(); put put = new put(key.get()); if(! (columnfromtable == null || columnfromtable.length == 0)){ ftablemap = getfamilyandcolumn(columnfromtable); } if(! (columntotable == null || columntotable.length == 0)){ ttablemap = getfamilyandcolumn(columntotable); } if(versionnum==0){ if(ftablemap.size() == 0){ if(ttablemap.size() == 0){ for (cell kv : value.rawcells()) { put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出 } return put; } else{ return getput(put, value, ttablemap); // 无版本、无列导入、有列导出 } } else { if(ttablemap.size() == 0){ return getput(put, value, ftablemap);// 无版本、有列导入、无列导出 } else { return getput(put, value, ttablemap);// 无版本、有列导入、有列导出 } } } else{ if(ftablemap.size() == 0){ if(ttablemap.size() == 0){ return getput1(put, value); // 有版本,无列导入,无列导出 }else{ return getput2(put, value, ttablemap); //有版本,无列导入,有列导出 } }else{ if(ttablemap.size() == 0){ return getput2(put,value,ftablemap);// 有版本,有列导入,无列导出 }else{ return getput2(put,value,ttablemap); // 有版本,有列导入,有列导出 } } } } /*** * 无版本设置的情况下,对于有列导入或者列导出 * @param put * @param value * @param tablemap * @return * @throws ioexception */ private put getput(put put,result value,hashmap<string, string> tablemap) throws ioexception{ for(cell kv : value.rawcells()){ byte[] family = kv.getfamily(); if(tablemap.containskey(new string(family))){ string columnstr = tablemap.get(new string(family)); arraylist<string> columnby = tobyte(columnstr); if(columnby.contains(new string(kv.getqualifier()))){ put.add(kv); //没有设置版本,没有设置列导入,有设置列导出 } } } return put; } /*** * (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出) * @param put * @param value * @param ttablemap * @return */ private put getput2(put put,result value,hashmap<string, string> tablemap){ navigablemap<byte[], navigablemap<byte[], navigablemap<long, byte[]>>> map=value.getmap(); for(byte[] family:map.keyset()){ if(tablemap.containskey(new string(family))){ string columnstr = tablemap.get(new string(family)); log.info("@@@@@@@@@@@"+new string(family)+" "+columnstr); arraylist<string> columnby = tobyte(columnstr); navigablemap<byte[], navigablemap<long, byte[]>> familymap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familymap.keyset()){ //根据列名循坏 log.info("!!!!!!!!!!!"+new string(column)); if(columnby.contains(new string(column))){ navigablemap<long, byte[]> valuesmap = familymap.get(column); for(entry<long, byte[]> s:valuesmap.entryset()){//获取列对应的不同版本数据,默认最新的一个 system.out.println("***:"+new string(family)+" "+new string(column)+" "+s.getkey()+" "+new string(s.getvalue())); put.addcolumn(family, column, s.getkey(),s.getvalue()); } } } } } return put; } /*** * 有版本、无列导入、无列导出 * @param put * @param value * @return */ private put getput1(put put,result value){ navigablemap<byte[], navigablemap<byte[], navigablemap<long, byte[]>>> map=value.getmap(); for(byte[] family:map.keyset()){ navigablemap<byte[], navigablemap<long, byte[]>> familymap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familymap.keyset()){ //根据列名循坏 navigablemap<long, byte[]> valuesmap = familymap.get(column); for(entry<long, byte[]> s:valuesmap.entryset()){ //获取列对应的不同版本数据,默认最新的一个 put.addcolumn(family, column, s.getkey(),s.getvalue()); } } } return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名与列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static hashmap<string, string> getfamilyandcolumn(string[] str){ hashmap<string, string> map = new hashmap<>(); hashset<string> set = new hashset<>(); for(string s : str){ set.add(s.split(":")[0]); } object[] ob = set.toarray(); for(int i=0; i<ob.length;i++){ string family = string.valueof(ob[i]); string columns = ""; for(int j=0;j < str.length;j++){ if(family.equals(str[j].split(":")[0])){ columns += str[j].split(":")[1]+","; } } map.put(family, columns.substring(0, columns.length()-1)); } return map; } private static arraylist<string> tobyte(string s){ arraylist<string> b = new arraylist<>(); string[] sarr = s.split(","); for(int i=0;i<sarr.length;i++){ b.add(sarr[i]); } return b; } }
程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:
test2:(无版本、无列导入设置、无列导出设置)
test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)
test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
test5(有版本、无列导入设置、无列导出设置)
test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)
test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 词频统计的完善版
下一篇: 她是宋朝的杨皇后,33岁与皇上姐弟恋