Hadoop学习之路(6)MapReduce自定义分区实现
程序员文章站
2022-04-18 18:11:15
MapReduce自带的分区器是HashPartitioner 原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。 自定义分分区需要继承Partitioner,复写getpariton()方法 自定义分区类: 注意: ......
mapreduce自带的分区器是hashpartitioner
原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。
自定义分分区需要继承partitioner
,复写getpariton()
方法
自定义分区类:
注意:map的输出是<k,v>键值对
其中int partitionindex = dict.get(text.tostring())
,partitionindex
是获取k的值
附:被计算的的文本
dear dear bear bear river car dear dear bear rive dear dear bear bear river car dear dear bear rive
需要在main函数中设置,指定自定义分区类
自定义分区类:
import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.partitioner; import java.util.hashmap; public class custompartitioner extends partitioner<text, intwritable> { public static hashmap<string, integer> dict = new hashmap<string, integer>(); //text代表着map阶段输出的key,intwritable代表着输出的值 static{ dict.put("dear", 0); dict.put("bear", 1); dict.put("river", 2); dict.put("car", 3); } public int getpartition(text text, intwritable intwritable, int i) { // int partitionindex = dict.get(text.tostring()); return partitionindex; } }
注意:map的输出结果是键值对<k,v>,int partitionindex = dict.get(text.tostring());
中的partitionindex
是map输出键值对中的键的值,也就是k的值。
maper类:
import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import java.io.ioexception; public class wordcountmap extends mapper<longwritable, text, text, intwritable> { public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string[] words = value.tostring().split("\t"); for (string word : words) { // 每个单词出现1次,作为中间结果输出 context.write(new text(word), new intwritable(1)); } } }
reducer类:
import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import java.io.ioexception; public class wordcountmap extends mapper<longwritable, text, text, intwritable> { public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string[] words = value.tostring().split("\t"); for (string word : words) { // 每个单词出现1次,作为中间结果输出 context.write(new text(word), new intwritable(1)); } } }
main函数:
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import java.io.ioexception; public class wordcountmain { public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception { if (args.length != 2 || args == null) { system.out.println("please input path!"); system.exit(0); } configuration configuration = new configuration(); configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-snapshot.jar"); job job = job.getinstance(configuration, wordcountmain.class.getsimplename()); // 打jar包 job.setjarbyclass(wordcountmain.class); // 通过job设置输入/输出格式 //job.setinputformatclass(textinputformat.class); //job.setoutputformatclass(textoutputformat.class); // 设置输入/输出路径 fileinputformat.setinputpaths(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); // 设置处理map/reduce阶段的类 job.setmapperclass(wordcountmap.class); //map combine //job.setcombinerclass(wordcountreduce.class); job.setreducerclass(wordcountreduce.class); //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型 //job.setmapoutputkeyclass(.class) // 设置最终输出key/value的类型m job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); job.setpartitionerclass(custompartitioner.class); job.setnumreducetasks(4); // 提交作业 job.waitforcompletion(true); } }
main函数参数设置: