hbase与hdfs的交互
程序员文章站
2022-11-25 14:16:12
hdfs和hbase的交互,和写MapReduce程序类似,只是需要修改输入输出数据和使用hbase的javaAPI对其进行操作处理即可 public class HBaseToHdfs extends ToolRunner implements Tool { private Configurati... ......
hdfs和hbase的交互,和写mapreduce程序类似,只是需要修改输入输出数据和使用hbase的javaapi对其进行操作处理即可
public class hbasetohdfs extends toolrunner implements tool { private configuration configuration; //配置文件需要配置的属性 private static final string hdfs_name = "fs.defaultfs"; private static final string hdfs_value = "hdfs://mycluster"; private static final string mapreduce_name = "mapreduce.framework.name"; private static final string mapreduce_value = "yarn"; private static final string hbase_name = "hbase.zookeeper.quorum"; private static final string hbase_value = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181"; //获取hbase表的扫描对象 private scan getscan() { return new scan(); } @override public int run(string[] args) throws exception { getconf(); //获取job实例对象 job job = job.getinstance(configuration, "copy_move"); //map/reduce的class链接 job.setmapperclass(hbase_to_hdfs.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(nullwritable.class); //设置输入输出 //由hbase导数据到hdfs故输入端需要使用tablemapreduceutil类 tablemapreduceutil.inittablemapperjob("ns3:t5", getscan(), hbase_to_hdfs.class, text.class, nullwritable.class, job); fileoutputformat.setoutputpath(job, new path(args[0])); //设置jar包 job.setjarbyclass(hbasetohdfs.class); //提交作业 int b = job.waitforcompletion(true) ? 0 : 1; return b; } @override public void setconf(configuration configuration) { configuration.set(hdfs_name, hdfs_value); configuration.set(mapreduce_name, mapreduce_value); configuration.set(hbase_name, hbase_value); this.configuration = configuration; } @override public configuration getconf() { return configuration; } public static void main(string[] args) throws exception { toolrunner.run(hbaseconfiguration.create(),new hbasetohdfs() , args); } // 创建map程序 private static text mkey = new text();
static class hbase_to_hdfs extends tablemapper<text, nullwritable> { @override protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception { //定义字符串拼接 stringbuffer stringbuffer = new stringbuffer(); /** * 使用value获取扫描器,获取hbase表的列名/列值等信息 * 使用stringbuffer来对需要的信息进行字符串拼接 */ cellscanner cellscanner = value.cellscanner(); while (cellscanner.advance()) { cell cell = cellscanner.current(); stringbuffer.append(new string(cellutil.clonevalue(cell))).append("\t"); } mkey.set(stringbuffer.tostring()); context.write(mkey, nullwritable.get()); } } }