hadoop(二MapReduce)
程序员文章站
2022-05-26 15:45:43
hadoop(二MapReduce) 介绍 MapReduce:其实就是把数据分开处理后再将数据合在一起. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇 ......
hadoop(二mapreduce)
介绍
mapreduce:其实就是把数据分开处理后再将数据合在一起.
- map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- reduce负责“合”,即对map阶段的结果进行全局汇总。
- mapreduce运行在yarn集群
mapreduce中定义了如下的map和reduce两个抽象的编程接口,由用户去编程实现.map和reduce,
mapreduce处理的数据类型是
代码处理
mapreduce 的开发一共有八个步骤, 其中 map 阶段分为 2 个步骤,shuwle 阶段 4 个步
骤,reduce 阶段分为 2 个步骤
map 阶段 2 个步骤
- 设置 inputformat 类, 将数据切分为 key-value(k1和v1) 对, 输入到第二步
- 自定义 map 逻辑, 将第一步的结果转换成另外的 key-value(k2和v2) 对, 输出结果
shuwle 阶段 4 个步骤
- 对输出的 key-value 对进行分区
- 对不同分区的数据按照相同的 key 排序
- (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
- 对数据进行分组, 相同 key 的 value 放入一个集合中
reduce 阶段 2 个步骤
- 对多个 map 任务的结果进行排序以及合并, 编写 reduce 函数实现自己的逻辑, 对输入的
key-value 进行处理, 转为新的 key-value(k3和v3)输出 - 设置 outputformat 处理并保存 reduce 输出的 key-value 数据
常用maven依赖
<packaging>jar</packaging> <dependencies> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-client-core</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>release</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-shade-plugin</artifactid> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizejar>true</minimizejar> </configuration> </execution> </executions> </plugin> </plugins> </build>
入门---统计
结构
/* 四个泛型解释: keyin :k1的类型 valuein: v1的类型 keyout: k2的类型 valueout: v2的类型*/public class wordcountmapper extends mapper<longwritable,text, text , longwritable> { //map方法就是将k1和v1 转为 k2和v2 /* 参数: key : k1 行偏移量(默认几乎一直固定为longwritable) value : v1 每一行的文本数据 context :表示上下文对象 */ /* 如何将k1和v1 转为 k2和v2 k1 v1 0 hello,world,hadoop 15 hdfs,hive,hello --------------------------- k2 v2 hello 1 world 1 hdfs 1 hadoop 1 hello 1 */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { text text = new text(); longwritable longwritable = new longwritable(); //1:将一行的文本数据进行拆分 string[] split = value.tostring().split(","); //2:遍历数组,组装 k2 和 v2 for (string word : split) { //3:将k2和v2写入上下文 text.set(word); longwritable.set(1); context.write(text, longwritable); } }}
/* 四个泛型解释: keyin: k2类型 valulein: v2类型 keyout: k3类型 valueout:v3类型*/public class wordcountreducer extends reducer<text,longwritable,text,longwritable> { //reduce方法作用: 将新的k2和v2转为 k3和v3 ,将k3和v3写入上下文中 /* 参数: key : 新k2 values: 集合 新 v2 context :表示上下文对象 ---------------------- 如何将新的k2和v2转为 k3和v3 新 k2 v2 hello <1,1,1> world <1,1> hadoop <1> ------------------------ k3 v3 hello 3 world 2 hadoop 1 */ @override protected void reduce(text key, iterable<longwritable> values, context context) throws ioexception, interruptedexception { long count = 0; //1:遍历集合,将集合中的数字相加,得到 v3 for (longwritable value : values) { count += value.get(); } //2:将k3和v3写入上下文中 context.write(key, new longwritable(count)); }}
public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "wordcount"); //如果打包运行出错,则需要加该配置 job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径 job.setinputformatclass(textinputformat.class); textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); //textinputformat.addinputpath(job, new path("file:///d:\\mapreduce\\input")); //第二步:指定map阶段的处理方式和数据类型 job.setmapperclass(wordcountmapper.class); //设置map阶段k2的类型 job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型 job.setmapoutputvalueclass(longwritable.class); //第三,四,五,六 采用默认的方式 //第七步:指定reduce阶段的处理方式和数据类型 job.setreducerclass(wordcountreducer.class); //设置k3的类型 job.setoutputkeyclass(text.class); //设置v3的类型 job.setoutputvalueclass(longwritable.class); //第八步: 设置输出类型 job.setoutputformatclass(textoutputformat.class); //设置输出的路径 path path = new path("hdfs://node01:8020/wordcount_out"); textoutputformat.setoutputpath(job, path); //textoutputformat.setoutputpath(job, new path("file:///d:\\mapreduce\\output")); //获取filesystem filesystem filesystem = filesystem.get(new uri("hdfs://node01:8020"), new configuration()); //判断目录是否存在 boolean bl2 = filesystem.exists(path); if(bl2){ //删除目标目录 filesystem.delete(path, true); } //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}
shuwle阶段
分区
分区实则目的是按照我们的需求,将不同类型的数据分开处理,最终分开获取
代码实现
结构
public class mypartitioner extends partitioner<text,nullwritable> { /* 1:定义分区规则 2:返回对应的分区编号 */ @override public int getpartition(text text, nullwritable nullwritable, int i) { //1:拆分行文本数据(k2),获取中奖字段的值 string[] split = text.tostring().split("\t"); string numstr = split[5]; //2:判断中奖字段的值和15的关系,然后返回对应的分区编号 if(integer.parseint(numstr) > 15){ return 1; }else{ return 0; } }}
//第三步,指定分区类 job.setpartitionerclass(mypartitioner.class); //第四, 五,六步 //设置reducetask的个数 job.setnumreducetasks(2);
mapreduce 中的计数器
计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计
可辅助诊断系统故障
看能否用一个计数器值来记录某一特定事件的发生 ,比分析一堆日志文件容易
通过enum枚举类型来定义计数器 统计reduce端数据的输入的key有多少个
public class partitionerreducer extends reducer<text,nullwritable,text,nullwritable> { public static enum counter{ my_input_recoreds,my_input_bytes } @override protected void reduce(text key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception { //方式2:使用枚枚举来定义计数器 context.getcounter(counter.my_input_recoreds).increment(1l); context.write(key, nullwritable.get()); }}
排序(包含序列化)
- 序列化 (serialization) 是指把结构化对象转化为字节流
- 反序列化 (deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传
递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取
的字节流转换为对象, 就要进行反序列化 - java 的序列化 (serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额
外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, hadoop
自己开发了一套序列化机制(writable), 精简高效. 不用像 java 对象类一样传输多层的父子
关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销 - writable 是 hadoop 的序列化格式, hadoop 定义了这样一个 writable 接口. 一个类要支持可
序列化只需实现这个接口即可 - 另外 writable 有一个子接口是 writablecomparable, writablecomparable 是既可实现序列
化, 也可以对key进行比较, 我们这里可以通过自定义 key 实现 writablecomparable 来实现
我们的排序功能
public class sortbean implements writablecomparable<sortbean>{ private string word; private int num; public string getword() { return word; } public void setword(string word) { this.word = word; } public int getnum() { return num; } public void setnum(int num) { this.num = num; } @override public string tostring() { return word + "\t"+ num ; } //实现比较器,指定排序的规则 /* 规则: 第一列(word)按照字典顺序进行排列 // aac aad 第一列相同的时候, 第二列(num)按照升序进行排列 */ @override public int compareto(sortbean sortbean) { //先对第一列排序: word排序 int result = this.word.compareto(sortbean.word); //如果第一列相同,则按照第二列进行排序 if(result == 0){ return this.num - sortbean.num; } return result; } //实现序列化 @override public void write(dataoutput out) throws ioexception { out.writeutf(word); out.writeint(num); } //实现反序列 @override public void readfields(datainput in) throws ioexception { this.word = in.readutf(); this.num = in.readint(); }}
public class sortmapper extends mapper<longwritable,text,sortbean,nullwritable> { /* map方法将k1和v1转为k2和v2: k1 v1 0 a 3 5 b 7 ---------------------- k2 v2 sortbean(a 3) nullwritable sortbean(b 7) nullwritable */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:将行文本数据(v1)拆分,并将数据封装到sortbean对象,就可以得到k2 string[] split = value.tostring().split("\t"); sortbean sortbean = new sortbean(); sortbean.setword(split[0]); sortbean.setnum(integer.parseint(split[1])); //2:将k2和v2写入上下文中 context.write(sortbean, nullwritable.get()); }}
public class sortreducer extends reducer<sortbean,nullwritable,sortbean,nullwritable> { //reduce方法将新的k2和v2转为k3和v3 @override protected void reduce(sortbean key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception { context.write(key, nullwritable.get()); }}
job略
规约combiner
在三大阶段的第一阶段map处理完后,可能数据过多,利用分布式思想,抢在reduce前先做一次合并,后再由reduce合并,目的是:提高网络io 性能
实现步骤
//第三(分区),四 (排序) //第五步: 规约(combiner) job.setcombinerclass(mycombiner.class); //第六步 分布
案例:流量统计(key相同则++++++++)
public class flowbean implements writable { private integer upflow; //上行数据包数 private integer downflow; //下行数据包数 private integer upcountflow; //上行流量总和 private integer downcountflow;//下行流量总和 //下略get set 序列化 反序列化
public class flowcountmapper extends mapper<longwritable,text,text,flowbean> { /* 将k1和v1转为k2和v2: k1 v1 0 1363157985059 13600217502 00-1f-64-e2-e8-b1:cmcc 120.196.100.55 www.baidu.com 综合门户 19 128 1177 16852 200 ------------------------------ k2 v2 13600217502 flowbean(19 128 1177 16852) */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:拆分行文本数据,得到手机号--->k2 string[] split = value.tostring().split("\t"); string phonenum = split[1]; //2:创建flowbean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给flowbean对象 flowbean flowbean = new flowbean(); flowbean.setupflow(integer.parseint(split[6])); flowbean.setdownflow(integer.parseint(split[7])); flowbean.setupcountflow(integer.parseint(split[8])); flowbean.setdowncountflow(integer.parseint(split[9])); //3:将k2和v2写入上下文中 context.write(new text(phonenum), flowbean); }}
public class flowcountreducer extends reducer<text,flowbean,text,flowbean> { @override protected void reduce(text key, iterable<flowbean> values, context context) throws ioexception, interruptedexception { //1:遍历集合,并将集合中的对应的四个字段累计 integer upflow = 0; //上行数据包数 integer downflow = 0; //下行数据包数 integer upcountflow = 0; //上行流量总和 integer downcountflow = 0;//下行流量总和 for (flowbean value : values) { upflow += value.getupflow(); downflow += value.getdownflow(); upcountflow += value.getupcountflow(); downcountflow += value.getdowncountflow(); } //2:创建flowbean对象,并给对象赋值 v3 flowbean flowbean = new flowbean(); flowbean.setupflow(upflow); flowbean.setdownflow(downflow); flowbean.setupcountflow(upcountflow); flowbean.setdowncountflow(downcountflow); //3:将k3和v3下入上下文中 context.write(key, flowbean); }}
public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "mapreduce_flowcount"); //如果打包运行出错,则需要加该配置 job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径 job.setinputformatclass(textinputformat.class); //textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); textinputformat.addinputpath(job, new path("file:///d:\\input\\flowcount_input")); //第二步:指定map阶段的处理方式和数据类型 job.setmapperclass(flowcountmapper.class); //设置map阶段k2的类型 job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型 job.setmapoutputvalueclass(flowbean.class); //第三(分区),四 (排序) //第五步: 规约(combiner) //第六步 分组 //第七步:指定reduce阶段的处理方式和数据类型 job.setreducerclass(flowcountreducer.class); //设置k3的类型 job.setoutputkeyclass(text.class); //设置v3的类型 job.setoutputvalueclass(flowbean.class); //第八步: 设置输出类型 job.setoutputformatclass(textoutputformat.class); //设置输出的路径 textoutputformat.setoutputpath(job, new path("file:///d:\\out\\flowcount_out")); //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}
如增加需求:
上行流量倒序排序
public class flowbean implements writablecomparable<flowbean> { //指定排序的规则 @override public int compareto(flowbean flowbean) { // return this.upflow.compareto(flowbean.getupflow()) * -1; return flowbean.upflow - this.upflow ; }}
需求:手机号码分区
public class flowcountpartition extends partitioner<text,flowbean> { /* 该方法用来指定分区的规则: 135 开头数据到一个分区文件 136 开头数据到一个分区文件 137 开头数据到一个分区文件 其他分区 参数: text : k2 手机号 flowbean: v2 i : reducetask的个数 */ @override public int getpartition(text text, flowbean flowbean, int i) { //1:获取手机号 string phonenum = text.tostring(); //2:判断手机号以什么开头,返回对应的分区编号(0-3) if(phonenum.startswith("135")){ return 0; }else if(phonenum.startswith("136")){ return 1; }else if(phonenum.startswith("137")){ return 2; }else{ return 3; } }}
//第三(分区),四 (排序) job.setpartitionerclass(flowcountpartition.class); //第五步: 规约(combiner) //第六步 分组 //设置reduce个数 job.setnumreducetasks(4);