java 中自定义OutputFormat的实例详解
程序员文章站
2024-02-23 23:17:44
java 中 自定义outputformat的实例详解
实例代码:
package com.ccse.hadoop.outputformat;
imp...
java 中 自定义outputformat的实例详解
实例代码:
package com.ccse.hadoop.outputformat; import java.io.ioexception; import java.net.uri; import java.net.urisyntaxexception; import java.util.stringtokenizer; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdataoutputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.jobcontext; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.outputcommitter; import org.apache.hadoop.mapreduce.outputformat; import org.apache.hadoop.mapreduce.recordwriter; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.taskattemptcontext; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter; public class myselfoutputformatapp { public final static string input_path = "hdfs://chaoren1:9000/mapinput"; public final static string output_path = "hdfs://chaoren1:9000/mapoutput"; public final static string output_filename = "/abc"; public static void main(string[] args) throws ioexception, urisyntaxexception, classnotfoundexception, interruptedexception { configuration conf = new configuration(); filesystem filesystem = filesystem.get(new uri(output_path), conf); filesystem.delete(new path(output_path), true); job job = new job(conf, myselfoutputformatapp.class.getsimplename()); job.setjarbyclass(myselfoutputformatapp.class); fileinputformat.setinputpaths(job, new path(input_path)); job.setmapperclass(mymapper.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(longwritable.class); job.setreducerclass(myreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(longwritable.class); job.setoutputformatclass(myselfoutputformat.class); job.waitforcompletion(true); } public static class mymapper extends mapper<longwritable, text, text, longwritable> { private text word = new text(); private longwritable writable = new longwritable(1); @override protected void map(longwritable key, text value, mapper<longwritable, text, text, longwritable>.context context) throws ioexception, interruptedexception { if (value != null) { string line = value.tostring(); stringtokenizer tokenizer = new stringtokenizer(line); while (tokenizer.hasmoretokens()) { word.set(tokenizer.nexttoken()); context.write(word, writable); } } } } public static class myreducer extends reducer<text, longwritable, text, longwritable> { @override protected void reduce(text key, iterable<longwritable> values, reducer<text, longwritable, text, longwritable>.context context) throws ioexception, interruptedexception { long sum = 0; for (longwritable value : values) { sum += value.get(); } context.write(key, new longwritable(sum)); } } public static class myselfoutputformat extends outputformat<text, longwritable> { private fsdataoutputstream outputstream = null; @override public recordwriter<text, longwritable> getrecordwriter( taskattemptcontext context) throws ioexception, interruptedexception { try { filesystem filesystem = filesystem.get(new uri(myselfoutputformatapp.output_path), context.getconfiguration()); //指定文件的输出路径 final path path = new path(myselfoutputformatapp.output_path + myselfoutputformatapp.output_filename); this.outputstream = filesystem.create(path, false); } catch (urisyntaxexception e) { e.printstacktrace(); } return new myselfrecordwriter(outputstream); } @override public void checkoutputspecs(jobcontext context) throws ioexception, interruptedexception { } @override public outputcommitter getoutputcommitter(taskattemptcontext context) throws ioexception, interruptedexception { return new fileoutputcommitter(new path(myselfoutputformatapp.output_path), context); } } public static class myselfrecordwriter extends recordwriter<text, longwritable> { private fsdataoutputstream outputstream = null; public myselfrecordwriter(fsdataoutputstream outputstream) { this.outputstream = outputstream; } @override public void write(text key, longwritable value) throws ioexception, interruptedexception { this.outputstream.writebytes(key.tostring()); this.outputstream.writebytes("\t"); this.outputstream.writelong(value.get()); } @override public void close(taskattemptcontext context) throws ioexception, interruptedexception { this.outputstream.close(); } } }
2.outputformat是用于处理各种输出目的地的。
2.1 outputformat需要写出去的键值对,是来自于reducer类,是通过recordwriter获得的。
2.2 recordwriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入outputstream来处理。write就是把k和v写入到outputstream中的。
2.3 recordwriter类位于outputformat中的。因此,我们自定义的outputfromat必须继承outputformat类型。那么,流对象必须在getrecordwriter(...)方法中获得。
以上就是java 中自定义outputformat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!