欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop MultipleOutputs输出到多个文件中的实现方法

程序员文章站 2024-02-20 16:18:46
 hadoop multipleoutputs输出到多个文件中的实现方法 1.输出到多个文件或多个文件夹: 驱动中不需要额外改变,只需要在mapclas...

 hadoop multipleoutputs输出到多个文件中的实现方法

1.输出到多个文件或多个文件夹:

驱动中不需要额外改变,只需要在mapclass或reduce类中加入如下代码

private multipleoutputs<text,intwritable> mos;
public void setup(context context) throws ioexception,interruptedexception {
  mos = new multipleoutputs(context);
}
public void cleanup(context context) throws ioexception,interruptedexception {
  mos.close();
}

  然后就可以用mos.write(key key,value value,string baseoutputpath)代替context.write(key, value);

  在mapclass或reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给reduce。

注意:multipleoutputs.write(key, value, baseoutputpath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。

如果baseoutputpath不包含文件分隔符“/”,那么输出的文件格式为baseoutputpath-r-nnnnn(name-r-nnnnn);
如果包含文件分隔符“/”,例如baseoutputpath=“029070-99999/1901/part”,那么输出文件则为029070-99999/1901/part-r-nnnnn

2.案例-需求

需求,下面是有些测试数据,要对这些数据按类目输出到output中:

1512,iphone5s,4英寸,指纹识别,a7处理器,64位,m7协处理器,低功耗

1512,iphone5,4英寸,a6处理器,ios7

1512,iphone4s,3.5英寸,a5处理器,双核,经典

50019780,ipad,9.7英寸,retina屏幕,丰富的应用

50019780,yoga,联想,待机18小时,外形独特

50019780,nexus 7,华硕&google,7英寸

50019780,ipad mini 2,retina显示屏,苹果,7.9英寸

1101,macbook air,苹果超薄,os x mavericks

1101,macbook pro,苹果,os x lion

1101,thinkpad yoga,联想,windows 8,超级本

3.mapper程序:

package cn.edu.bjut.multioutput;

import java.io.ioexception;

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;

public class multioutputmapper extends mapper<longwritable, text, intwritable, text> {

  @override
  protected void map(longwritable key, text value, context context)
      throws ioexception, interruptedexception {
    string line = value.tostring().trim();
    if(null != line && 0 != line.length()) {
      string[] arr = line.split(",");
      context.write(new intwritable(integer.parseint(arr[0])), value);
    }
  }

}

4.reducer程序:

package cn.edu.bjut.multioutput;

import java.io.ioexception;

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.nullwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.output.multipleoutputs;

public class multioutputreducer extends
    reducer<intwritable, text, nullwritable, text> {

  private multipleoutputs<nullwritable, text> multipleoutputs = null;

  @override
  protected void reduce(intwritable key, iterable<text> values, context context)
      throws ioexception, interruptedexception {
    for(text text : values) {
      multipleoutputs.write("keyspilt", nullwritable.get(), text, key.tostring()+"/");
      multipleoutputs.write("allpart", nullwritable.get(), text);
    }
  }

  @override
  protected void setup(context context)
      throws ioexception, interruptedexception {
    multipleoutputs = new multipleoutputs<nullwritable, text>(context);
  }

  @override
  protected void cleanup(context context)
      throws ioexception, interruptedexception {
    if(null != multipleoutputs) {
      multipleoutputs.close();
      multipleoutputs = null;
    }
  }


}

5.主程序:

package cn.edu.bjut.multioutput;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.nullwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import org.apache.hadoop.mapreduce.lib.output.multipleoutputs;
import org.apache.hadoop.mapreduce.lib.output.textoutputformat;

public class mainjob {
  public static void main(string[] args) throws exception {
    configuration conf = new configuration();
    job job = new job(conf, "aaa");
    job.setjarbyclass(mainjob.class);

    job.setmapperclass(multioutputmapper.class);
    job.setmapoutputkeyclass(intwritable.class);
    job.setmapoutputvalueclass(text.class);

    job.setreducerclass(multioutputreducer.class);
    job.setoutputkeyclass(nullwritable.class);
    job.setoutputvalueclass(text.class);

    fileinputformat.addinputpath(job, new path(args[0]));

    multipleoutputs.addnamedoutput(job, "keyspilt", textoutputformat.class, nullwritable.class, text.class);
    multipleoutputs.addnamedoutput(job, "allpart", textoutputformat.class, nullwritable.class, text.class);

    path outpath = new path(args[1]);
    filesystem fs = filesystem.get(conf);
    if(fs.exists(outpath)) {
      fs.delete(outpath, true);
    }
    fileoutputformat.setoutputpath(job, outpath);

    job.waitforcompletion(true);
  }
}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!