004简单介绍WordCount,统计文本单词次数
程序员文章站
2022-07-02 13:55:59
MapReduce简介 MapReduce的原理图 2.MR原理图 根据代码简单了解MR。 代码简单解析: 根据执行流程图我们不难发现,首先我们从Mapper下手,然后着手Reducer,而Reducer的key(in),value(in),肯定是Mapper的key(out),value(out) ......
mapreduce简介
- mapreduce是一种分布式计算模型,主要解决海量数据的计算问题。
- mr有两个阶段组成:map和reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
mapreduce的原理图
- mr执行的流程
2.mr原理图
- 根据代码简单了解mr。
package com.lj.mr;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
import java.io.ioexception;
public class wcmapper extends mapper<longwritable, text, text, intwritable> {
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
//super.map(key, value, context);
string[] arr = value.tostring().split(" ");
text keyout = new text();
intwritable valueout = new intwritable();
for(string s :arr){
keyout.set(s);
valueout.set(1);
try {
context.write(keyout,valueout);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
}
package com.lj.mr; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.io.text; import java.io.ioexception; public class wcreducce extends reducer<text, intwritable, text, intwritable> { @override protected void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception { //super.reduce(key, values, context); int count = 0; for(intwritable iw:values){ count = count + iw.get(); } context.write(key,new intwritable(count)); } }
package com.lj.mr; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.log4j.basicconfigurator; public class wcapp { public static void main(string[] args) { basicconfigurator.configure(); configuration conf = new configuration();
//此处为本地测试 // conf.set("fs.defaultfs","file:///d://ittools"); try { //单例模式 job job = job.getinstance(conf); //任务作业名字 job.setjobname("wcapp"); //搜索类 job.setjarbyclass(wcapp.class); //设置输入格式 job.setinputformatclass(textinputformat.class); fileinputformat.addinputpath(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); job.setmapperclass(wcmapper.class); job.setreducerclass(wcreducce.class); job.setnumreducetasks(1); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); job.waitforcompletion(false); } catch (exception e) { e.printstacktrace(); } } }
- 代码简单解析:
根据执行流程图我们不难发现,首先我们从mapper下手,然后着手reducer,而reducer的key(in),value(in),肯定是mapper的key(out),value(out),否则我们不难发现,一定会类型不匹配,直接报错。
map:就是将原本文字转换成(k,v),其中k就是word,v就是单词的出现的次数
shuffle:将相同的k排列一起
reduce:将相同的k的v相加