欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

hbase与hdfs的交互

程序员文章站 2022-11-25 14:16:12
hdfs和hbase的交互,和写MapReduce程序类似,只是需要修改输入输出数据和使用hbase的javaAPI对其进行操作处理即可 public class HBaseToHdfs extends ToolRunner implements Tool { private Configurati... ......
hdfs和hbase的交互,和写mapreduce程序类似,只是需要修改输入输出数据和使用hbase的javaapi对其进行操作处理即可

public class hbasetohdfs extends toolrunner implements tool {

    private configuration configuration;
    //配置文件需要配置的属性
    private static final string hdfs_name = "fs.defaultfs";
    private static final string hdfs_value = "hdfs://mycluster";
    private static final string mapreduce_name = "mapreduce.framework.name";
    private static final string mapreduce_value = "yarn";
    private static final string hbase_name = "hbase.zookeeper.quorum";
    private static final string hbase_value = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181";

    //获取hbase表的扫描对象
    private scan getscan() {
        return new scan();
    }

    @override
    public int run(string[] args) throws exception {

        getconf();

        //获取job实例对象
        job job = job.getinstance(configuration, "copy_move");

        //map/reduce的class链接
        job.setmapperclass(hbase_to_hdfs.class);
        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(nullwritable.class);

        //设置输入输出
        //由hbase导数据到hdfs故输入端需要使用tablemapreduceutil类
        tablemapreduceutil.inittablemapperjob("ns3:t5", getscan(), hbase_to_hdfs.class, text.class, nullwritable.class, job);
        fileoutputformat.setoutputpath(job, new path(args[0]));

        //设置jar包
        job.setjarbyclass(hbasetohdfs.class);

        //提交作业
        int b = job.waitforcompletion(true) ? 0 : 1;

        return b;
    }

    @override
    public void setconf(configuration configuration) {
        configuration.set(hdfs_name, hdfs_value);
        configuration.set(mapreduce_name, mapreduce_value);
        configuration.set(hbase_name, hbase_value);
        this.configuration = configuration;
    }

    @override
    public configuration getconf() {
        return configuration;
    }

    public static void main(string[] args) throws exception {
        toolrunner.run(hbaseconfiguration.create(),new hbasetohdfs() , args);
    }

    // 创建map程序
    private static text mkey = new text();
static class hbase_to_hdfs extends tablemapper<text, nullwritable> { @override protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception { //定义字符串拼接 stringbuffer stringbuffer = new stringbuffer(); /** * 使用value获取扫描器,获取hbase表的列名/列值等信息 * 使用stringbuffer来对需要的信息进行字符串拼接 */ cellscanner cellscanner = value.cellscanner(); while (cellscanner.advance()) { cell cell = cellscanner.current(); stringbuffer.append(new string(cellutil.clonevalue(cell))).append("\t"); } mkey.set(stringbuffer.tostring()); context.write(mkey, nullwritable.get()); } } }