欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java 中自定义OutputFormat的实例详解

程序员文章站 2024-02-20 20:37:22
java 中 自定义outputformat的实例详解 实例代码: package com.ccse.hadoop.outputformat; imp...

java 中 自定义outputformat的实例详解

实例代码:

package com.ccse.hadoop.outputformat; 
 
import java.io.ioexception; 
import java.net.uri; 
import java.net.urisyntaxexception; 
import java.util.stringtokenizer; 
 
import org.apache.hadoop.conf.configuration; 
import org.apache.hadoop.fs.fsdataoutputstream; 
import org.apache.hadoop.fs.filesystem; 
import org.apache.hadoop.fs.path; 
import org.apache.hadoop.io.longwritable; 
import org.apache.hadoop.io.text; 
import org.apache.hadoop.mapreduce.job; 
import org.apache.hadoop.mapreduce.jobcontext; 
import org.apache.hadoop.mapreduce.mapper; 
import org.apache.hadoop.mapreduce.outputcommitter; 
import org.apache.hadoop.mapreduce.outputformat; 
import org.apache.hadoop.mapreduce.recordwriter; 
import org.apache.hadoop.mapreduce.reducer; 
import org.apache.hadoop.mapreduce.taskattemptcontext; 
import org.apache.hadoop.mapreduce.lib.input.fileinputformat; 
import org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter; 
 
 
public class myselfoutputformatapp { 
   
  public final static string input_path = "hdfs://chaoren1:9000/mapinput"; 
  public final static string output_path = "hdfs://chaoren1:9000/mapoutput"; 
  public final static string output_filename = "/abc"; 
   
  public static void main(string[] args) throws ioexception, urisyntaxexception,  
    classnotfoundexception, interruptedexception { 
    configuration conf = new configuration(); 
    filesystem filesystem = filesystem.get(new uri(output_path), conf); 
    filesystem.delete(new path(output_path), true); 
     
    job job = new job(conf, myselfoutputformatapp.class.getsimplename()); 
    job.setjarbyclass(myselfoutputformatapp.class); 
     
    fileinputformat.setinputpaths(job, new path(input_path)); 
    job.setmapperclass(mymapper.class); 
    job.setmapoutputkeyclass(text.class); 
    job.setmapoutputvalueclass(longwritable.class); 
     
    job.setreducerclass(myreducer.class); 
    job.setoutputkeyclass(text.class); 
    job.setoutputvalueclass(longwritable.class); 
    job.setoutputformatclass(myselfoutputformat.class); 
     
    job.waitforcompletion(true); 
  } 
   
  public static class mymapper extends mapper<longwritable, text, text, longwritable> { 
 
    private text word = new text(); 
    private longwritable writable = new longwritable(1); 
     
    @override 
    protected void map(longwritable key, text value, 
        mapper<longwritable, text, text, longwritable>.context context) 
        throws ioexception, interruptedexception { 
      if (value != null) { 
        string line = value.tostring(); 
        stringtokenizer tokenizer = new stringtokenizer(line); 
        while (tokenizer.hasmoretokens()) { 
          word.set(tokenizer.nexttoken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class myreducer extends reducer<text, longwritable, text, longwritable> { 
 
    @override 
    protected void reduce(text key, iterable<longwritable> values, 
        reducer<text, longwritable, text, longwritable>.context context) 
        throws ioexception, interruptedexception { 
      long sum = 0;  
      for (longwritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new longwritable(sum)); 
    } 
  } 
 
  public static class myselfoutputformat extends outputformat<text, longwritable> { 
 
    private fsdataoutputstream outputstream = null; 
     
    @override 
    public recordwriter<text, longwritable> getrecordwriter( 
        taskattemptcontext context) throws ioexception, 
        interruptedexception { 
      try { 
        filesystem filesystem = filesystem.get(new uri(myselfoutputformatapp.output_path), context.getconfiguration()); 
        //指定文件的输出路径 
        final path path = new path(myselfoutputformatapp.output_path  
                     + myselfoutputformatapp.output_filename); 
        this.outputstream = filesystem.create(path, false); 
      } catch (urisyntaxexception e) { 
        e.printstacktrace(); 
      } 
      return new myselfrecordwriter(outputstream); 
    } 
 
    @override 
    public void checkoutputspecs(jobcontext context) throws ioexception, 
        interruptedexception { 
    } 
 
    @override 
    public outputcommitter getoutputcommitter(taskattemptcontext context) 
        throws ioexception, interruptedexception { 
      return new fileoutputcommitter(new path(myselfoutputformatapp.output_path), context); 
    } 
     
  } 
   
  public static class myselfrecordwriter extends recordwriter<text, longwritable> { 
 
    private fsdataoutputstream outputstream = null; 
     
    public myselfrecordwriter(fsdataoutputstream outputstream) { 
      this.outputstream = outputstream; 
    } 
     
    @override 
    public void write(text key, longwritable value) throws ioexception, 
        interruptedexception { 
      this.outputstream.writebytes(key.tostring()); 
      this.outputstream.writebytes("\t"); 
      this.outputstream.writelong(value.get()); 
    } 
 
    @override 
    public void close(taskattemptcontext context) throws ioexception, 
        interruptedexception { 
      this.outputstream.close(); 
    } 
     
  } 
   
} 

 2.outputformat是用于处理各种输出目的地的。

2.1 outputformat需要写出去的键值对,是来自于reducer类,是通过recordwriter获得的。

2.2 recordwriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入outputstream来处理。write就是把k和v写入到outputstream中的。

2.3 recordwriter类位于outputformat中的。因此,我们自定义的outputfromat必须继承outputformat类型。那么,流对象必须在getrecordwriter(...)方法中获得。

以上就是java 中自定义outputformat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!