欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

hadoop(二MapReduce)

程序员文章站 2023-01-22 08:27:32
hadoop(二MapReduce) 介绍 MapReduce:其实就是把数据分开处理后再将数据合在一起. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇 ......

hadoop(二mapreduce)


介绍

mapreduce:其实就是把数据分开处理后再将数据合在一起.

  • map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • reduce负责“合”,即对map阶段的结果进行全局汇总。
  • mapreduce运行在yarn集群

hadoop(二MapReduce)

mapreduce中定义了如下的mapreduce两个抽象的编程接口,由用户去编程实现.map和reduce,

mapreduce处理的数据类型是键值对

hadoop(二MapReduce)

hadoop(二MapReduce)

hadoop(二MapReduce)


代码处理

mapreduce 的开发一共有八个步骤, 其中 map 阶段分为 2 个步骤,shuwle 阶段 4 个步 
骤,reduce 阶段分为 2 个步骤

​ map 阶段 2 个步骤

  1. 设置 inputformat 类, 将数据切分为 key-value(k1和v1) 对, 输入到第二步
  2. 自定义 map 逻辑, 将第一步的结果转换成另外的 key-value(k2和v2) 对, 输出结果 
    shuwle 阶段 4 个步骤
  3. 对输出的 key-value 对进行分区
  4. 对不同分区的数据按照相同的 key 排序
  5. (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
  6. 对数据进行分组, 相同 key 的 value 放入一个集合中 
    reduce 阶段 2 个步骤
  7. 对多个 map 任务的结果进行排序以及合并, 编写 reduce 函数实现自己的逻辑, 对输入的 
    key-value 进行处理, 转为新的 key-value(k3和v3)输出
  8. 设置 outputformat 处理并保存 reduce 输出的 key-value 数据

常用maven依赖

<packaging>jar</packaging> <dependencies> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-client-core</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>release</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <!--    <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-shade-plugin</artifactid> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizejar>true</minimizejar> </configuration> </execution> </executions> </plugin> </plugins> </build>

入门---统计

结构

hadoop(二MapReduce)

/*  四个泛型解释:    keyin :k1的类型    valuein: v1的类型    keyout: k2的类型    valueout: v2的类型*/public class wordcountmapper extends mapper<longwritable,text, text , longwritable> { //map方法就是将k1和v1 转为 k2和v2 /*      参数:         key    : k1   行偏移量(默认几乎一直固定为longwritable)         value  : v1   每一行的文本数据         context :表示上下文对象     */ /*      如何将k1和v1 转为 k2和v2        k1         v1        0   hello,world,hadoop        15  hdfs,hive,hello       ---------------------------        k2            v2        hello         1        world         1        hdfs          1        hadoop        1        hello         1     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { text text = new text(); longwritable longwritable = new longwritable(); //1:将一行的文本数据进行拆分 string[] split = value.tostring().split(","); //2:遍历数组,组装 k2 和 v2 for (string word : split) { //3:将k2和v2写入上下文            text.set(word);            longwritable.set(1);            context.write(text, longwritable); } }}

hadoop(二MapReduce)


/*  四个泛型解释:    keyin:  k2类型    valulein: v2类型    keyout: k3类型    valueout:v3类型*/public class wordcountreducer extends reducer<text,longwritable,text,longwritable> { //reduce方法作用: 将新的k2和v2转为 k3和v3 ,将k3和v3写入上下文中 /*      参数:        key : 新k2        values: 集合 新 v2        context :表示上下文对象        ----------------------        如何将新的k2和v2转为 k3和v3        新  k2         v2            hello      <1,1,1>            world      <1,1>            hadoop     <1>        ------------------------           k3        v3           hello     3           world     2           hadoop    1     */ @override protected void reduce(text key, iterable<longwritable> values, context context) throws ioexception, interruptedexception { long count = 0; //1:遍历集合,将集合中的数字相加,得到 v3 for (longwritable value : values) {             count += value.get(); } //2:将k3和v3写入上下文中        context.write(key, new longwritable(count)); }}

hadoop(二MapReduce)

public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "wordcount"); //如果打包运行出错,则需要加该配置        job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径        job.setinputformatclass(textinputformat.class); textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); //textinputformat.addinputpath(job, new path("file:///d:\\mapreduce\\input")); //第二步:指定map阶段的处理方式和数据类型         job.setmapperclass(wordcountmapper.class); //设置map阶段k2的类型          job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型          job.setmapoutputvalueclass(longwritable.class); //第三,四,五,六 采用默认的方式 //第七步:指定reduce阶段的处理方式和数据类型          job.setreducerclass(wordcountreducer.class); //设置k3的类型           job.setoutputkeyclass(text.class); //设置v3的类型           job.setoutputvalueclass(longwritable.class); //第八步: 设置输出类型           job.setoutputformatclass(textoutputformat.class); //设置输出的路径 path path = new path("hdfs://node01:8020/wordcount_out"); textoutputformat.setoutputpath(job, path); //textoutputformat.setoutputpath(job, new path("file:///d:\\mapreduce\\output")); //获取filesystem filesystem filesystem = filesystem.get(new uri("hdfs://node01:8020"), new configuration()); //判断目录是否存在 boolean bl2 = filesystem.exists(path); if(bl2){ //删除目标目录                 filesystem.delete(path, true); } //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}

shuwle阶段

分区

分区实则目的是按照我们的需求,将不同类型的数据分开处理,最终分开获取

代码实现

结构

hadoop(二MapReduce)

public class mypartitioner extends partitioner<text,nullwritable> { /*      1:定义分区规则      2:返回对应的分区编号     */ @override public int getpartition(text text, nullwritable nullwritable, int i) { //1:拆分行文本数据(k2),获取中奖字段的值 string[] split = text.tostring().split("\t"); string numstr = split[5]; //2:判断中奖字段的值和15的关系,然后返回对应的分区编号 if(integer.parseint(numstr) > 15){ return 1; }else{ return 0; } }}

 //第三步,指定分区类            job.setpartitionerclass(mypartitioner.class); //第四, 五,六步 //设置reducetask的个数            job.setnumreducetasks(2);

mapreduce 中的计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计

可辅助诊断系统故障

看能否用一个计数器值来记录某一特定事件的发生 ,比分析一堆日志文件容易

hadoop(二MapReduce)

通过enum枚举类型来定义计数器 统计reduce端数据的输入的key有多少个

public class partitionerreducer extends reducer<text,nullwritable,text,nullwritable> { public static enum counter{            my_input_recoreds,my_input_bytes } @override protected void reduce(text key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception { //方式2:使用枚枚举来定义计数器        context.getcounter(counter.my_input_recoreds).increment(1l);       context.write(key, nullwritable.get()); }}

排序(包含序列化)

  • 序列化 (serialization) 是指把结构化对象转化为字节流
  • 反序列化 (deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传 
    递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取 
    的字节流转换为对象, 就要进行反序列化
  • java 的序列化 (serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额 
    外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, hadoop 
    自己开发了一套序列化机制(writable), 精简高效. 不用像 java 对象类一样传输多层的父子 
    关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
  • writable 是 hadoop 的序列化格式, hadoop 定义了这样一个 writable 接口. 一个类要支持可 
    序列化只需实现这个接口即可
  • 另外 writable 有一个子接口是 writablecomparable, writablecomparable 是既可实现序列 
    化, 也可以对key进行比较, 我们这里可以通过自定义 key 实现 writablecomparable 来实现 
    我们的排序功能

hadoop(二MapReduce)

hadoop(二MapReduce)

public class sortbean implements writablecomparable<sortbean>{ private string word; private int  num; public string getword() { return word; } public void setword(string word) { this.word = word; } public int getnum() { return num; } public void setnum(int num) { this.num = num; } @override public string tostring() { return   word + "\t"+ num ; } //实现比较器,指定排序的规则 /*      规则:        第一列(word)按照字典顺序进行排列    //  aac   aad        第一列相同的时候, 第二列(num)按照升序进行排列     */ @override public int compareto(sortbean sortbean) { //先对第一列排序: word排序 int result = this.word.compareto(sortbean.word); //如果第一列相同,则按照第二列进行排序 if(result == 0){ return this.num - sortbean.num; } return result; } //实现序列化 @override public void write(dataoutput out) throws ioexception {        out.writeutf(word);        out.writeint(num); } //实现反序列 @override public void readfields(datainput in) throws ioexception { this.word = in.readutf(); this.num = in.readint(); }}
public class sortmapper extends mapper<longwritable,text,sortbean,nullwritable> { /*      map方法将k1和v1转为k2和v2:      k1            v1      0            a  3      5            b  7      ----------------------      k2                         v2      sortbean(a  3)         nullwritable      sortbean(b  7)         nullwritable     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:将行文本数据(v1)拆分,并将数据封装到sortbean对象,就可以得到k2 string[] split = value.tostring().split("\t"); sortbean sortbean = new sortbean();        sortbean.setword(split[0]);        sortbean.setnum(integer.parseint(split[1])); //2:将k2和v2写入上下文中        context.write(sortbean, nullwritable.get()); }}
public class sortreducer extends reducer<sortbean,nullwritable,sortbean,nullwritable> { //reduce方法将新的k2和v2转为k3和v3 @override protected void reduce(sortbean key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {       context.write(key, nullwritable.get()); }}

job略


规约combiner

在三大阶段的第一阶段map处理完后,可能数据过多,利用分布式思想,抢在reduce前先做一次合并,后再由reduce合并,目的是:提高网络io 性能

实现步骤

hadoop(二MapReduce)

hadoop(二MapReduce)

 //第三(分区),四 (排序) //第五步: 规约(combiner)      job.setcombinerclass(mycombiner.class); //第六步 分布

hadoop(二MapReduce)


案例:流量统计(key相同则++++++++)

hadoop(二MapReduce)

public class flowbean implements writable { private integer upflow; //上行数据包数 private integer downflow; //下行数据包数 private integer upcountflow; //上行流量总和 private integer downcountflow;//下行流量总和 //下略get   set   序列化  反序列化
public class flowcountmapper extends mapper<longwritable,text,text,flowbean> { /*      将k1和v1转为k2和v2:      k1              v1      0               1363157985059     13600217502    00-1f-64-e2-e8-b1:cmcc    120.196.100.55    www.baidu.com    综合门户    19    128    1177    16852    200     ------------------------------      k2              v2      13600217502     flowbean(19    128    1177    16852)     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:拆分行文本数据,得到手机号--->k2 string[] split = value.tostring().split("\t"); string phonenum = split[1]; //2:创建flowbean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给flowbean对象 flowbean flowbean = new flowbean();        flowbean.setupflow(integer.parseint(split[6]));        flowbean.setdownflow(integer.parseint(split[7]));        flowbean.setupcountflow(integer.parseint(split[8]));        flowbean.setdowncountflow(integer.parseint(split[9])); //3:将k2和v2写入上下文中        context.write(new text(phonenum), flowbean); }}
public class flowcountreducer extends reducer<text,flowbean,text,flowbean> { @override protected void reduce(text key, iterable<flowbean> values, context context) throws ioexception, interruptedexception { //1:遍历集合,并将集合中的对应的四个字段累计 integer upflow = 0; //上行数据包数 integer downflow = 0; //下行数据包数 integer upcountflow = 0; //上行流量总和 integer downcountflow = 0;//下行流量总和 for (flowbean value : values) {            upflow += value.getupflow();            downflow += value.getdownflow();            upcountflow += value.getupcountflow();            downcountflow += value.getdowncountflow(); } //2:创建flowbean对象,并给对象赋值  v3 flowbean flowbean = new flowbean();        flowbean.setupflow(upflow);        flowbean.setdownflow(downflow);        flowbean.setupcountflow(upcountflow);        flowbean.setdowncountflow(downcountflow); //3:将k3和v3下入上下文中        context.write(key, flowbean); }}
public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "mapreduce_flowcount"); //如果打包运行出错,则需要加该配置        job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径        job.setinputformatclass(textinputformat.class); //textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); textinputformat.addinputpath(job, new path("file:///d:\\input\\flowcount_input")); //第二步:指定map阶段的处理方式和数据类型         job.setmapperclass(flowcountmapper.class); //设置map阶段k2的类型          job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型          job.setmapoutputvalueclass(flowbean.class); //第三(分区),四 (排序) //第五步: 规约(combiner) //第六步 分组 //第七步:指定reduce阶段的处理方式和数据类型          job.setreducerclass(flowcountreducer.class); //设置k3的类型           job.setoutputkeyclass(text.class); //设置v3的类型           job.setoutputvalueclass(flowbean.class); //第八步: 设置输出类型           job.setoutputformatclass(textoutputformat.class); //设置输出的路径 textoutputformat.setoutputpath(job, new path("file:///d:\\out\\flowcount_out")); //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}

如增加需求:

上行流量倒序排序

public class flowbean implements writablecomparable<flowbean> { //指定排序的规则 @override public int compareto(flowbean flowbean) { // return this.upflow.compareto(flowbean.getupflow()) * -1; return flowbean.upflow - this.upflow ; }}

需求:手机号码分区

hadoop(二MapReduce)

public class flowcountpartition extends partitioner<text,flowbean> { /*      该方法用来指定分区的规则:        135 开头数据到一个分区文件        136 开头数据到一个分区文件        137 开头数据到一个分区文件        其他分区       参数:         text : k2   手机号         flowbean: v2         i   : reducetask的个数     */ @override public int getpartition(text text, flowbean flowbean, int i) { //1:获取手机号 string phonenum = text.tostring(); //2:判断手机号以什么开头,返回对应的分区编号(0-3) if(phonenum.startswith("135")){ return 0; }else if(phonenum.startswith("136")){ return 1; }else if(phonenum.startswith("137")){ return 2; }else{ return 3; } }}
 //第三(分区),四 (排序)            job.setpartitionerclass(flowcountpartition.class); //第五步: 规约(combiner) //第六步 分组 //设置reduce个数            job.setnumreducetasks(4);