欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop的MR on Yarn

程序员文章站 2022-03-08 08:36:55
...

1.MapReduce是什么

MapReduce
Map 映射 ?个 生产
Reduce 聚合 默认1个 生产
单词统计案例代码:

map

package com.ccj.pxj.word;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 1. Mapper  Interface:  hadoop.mapred包下 (hadoop1.x版本提供的实现)
 *  2. Mapper Class : hadoop mapreduce包下(hadoop2.x版本提供的实现)
 *          
 *      相关的组件导入Jar包,都选择com.apache.hadoop.mapreduce包下!
 *  
 *  3. MapReduce不管是在Map阶段还是Reduce阶段,处理的数据,都是key-value格式!
 *          输入数据:  key-value
 *          输出数据: key-value
 *      KEYIN:  输入数据的key类型,每行的偏移量,整数! Long===> LongWritable
 *     VALUEIN: 输入数据的value类型  , String====>  Text
 *     KEYOUT:  输出数据的key类型, String
 *     VALUEOUT: 输出数据的value类型,int===》  IntWritable
 *     
 *     key-value都是需要在多台机器中进行传输的! 要求key和value都要使用序列化类型!
 *          传统的序列化,必须实现java.io.Serrilizable接口!
 *              Hadoop提供了新的序列化机制! 就是Writable接口!
 *      
 *     总结: key 和 value的数据类型,必须实现Writable接口
 *     
 *   4.  RecordReader,负责将数据,从文件的切片中读取到Map程序中!
 *          默认情况,使用RecordReader,一次只会读取一行内容!
 *          默认情况,系统使用的RecordReader将数据的偏移量(offset)作为key
 *              hello how are you
 *   5.  map()方法是自己定义的处理逻辑,会对每一组key-value循环调用!
 *              context对象,是当前Mapper任务执行的上下文(环境和配置)
 * @author lenovo
 *
 */
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text text=new Text();
    IntWritable one=new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
        System.out.println("key的值是:"+key+",value的值是:"+value);
//      //value代表一行文本
        //将当前的这行文本,根据空格,进行切分!
//      获取一行
        String line=value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            text.set(word);
            one.set(1);
            context.write(text, one);
        }
    }
}

reduce

package com.ccj.pxj.word;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
 * 1. KEYIN, VALUEIN : 对应的是Map任务的输出的key-value
 *      KEYOUT, VALUEOUT: 最总合并和汇总完结果,输出的key-value
 *          KEYOUT: Text
 *          VALUEOUT: IntWritable
 * 
 * 2.  针对每一对  KEYIN, VALUEIN ,也会循环调用 reduce方法
 * 
 * 3. Map ===Shuffle(分组,分区,排序)==Reducer
 *          Map的输出是:  hello-1,I-1,I-1,hello-1
 *          Reduce的输入:   I-{1,1}
 */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable v=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int count=0;
        for (IntWritable value : values) {
            count+=value.get();
        }
        v.set(count);
        context.write(key, v);
    }
}
package com.ccj.pxj.word;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * /*
 * 1.  组件:  
 *          Job:  一个MapReduce程序,就是一个Job,一个Job包含了Map阶段和Reduce阶段
 *          MapTask:  Map阶段,进行运算的主进程;
 *                      调用Mapper类的相关方法!
 *          ReduceTask:  Reduce阶段,进行运算的主进程;
 *                      调用Reducer类的相关方法!
 * 
 *      客户端需要,将Job(MR任务)提交到YARN平台运行!
 *          Driver:  用来提交Job
 *              Driver类的主进程(main方法)中,执行Job的提交!
 * 
 * 2.  需求,是单词统计!
 *          MapTask中处理出入的数据,进行每个单词的统计!
 *          ReduceTask中,对Map阶段的结果进行合并和归总,将最终结果输出!
 * 
 * 3.  输入数据的格式:  使用UTF-8进行编码!
 *              保存时,使用UTF-8无BOM格式保存!
 * 
 * 
 * 
 *
 * @author lenovo
 *
 */
public class WCDriver {
    public static void main(String[] args) throws Exception {
        args=new String[] {"F:\\hadoopTest1\\wordInput","F:\\hadoopTest1\\wordoutput_88"};
//      args=new String[] {"hdfs://pxj104:9000/user/pxj/wordInput","file:///F:\\hadoopTest1\\wordoutput2"};
//      args=new String[] {"hdfs://pxj104:9000/user/pxj/wordInput","hdfs://pxj104:9000/user/pxj/wordoput1"};
        Configuration conf = new Configuration();
        //conf.set("fs.defaultFS", "hdfs://pxj104:9000");
        //conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
    //  System.setProperty("HADOOP_USER_NAME", "pxj");
        // 1.创建Job对象,代表此次WordCount的MR程序
        Job job=Job.getInstance(conf);
        // 2. 设置配置信息,设置到Configuration对象中
        Configuration configuration=new Configuration();
        // 3. MR运行的所有环节信息,使用哪个Mapper类,进行Map阶段运算,等等..
          // 设置Job所在Jar包的路径,和Job的名称
        job.setJobName("wordcount");
        job.setJarByClass(WCDriver.class);
        // 4.  设置Mapper类
        job.setMapperClass(WCMapper.class);
        // 5. 设置Reducer类
        job.setReducerClass(WCReducer.class);
        // 6.  设置计算数据的输入目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 7.  设置计算数据的输出目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 8.  设置Mapper阶段输出的结果的key-value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置自定义的输入格式
                job.setInputFormatClass(CombineTextInputFormat.class);
                
                CombineTextInputFormat.setMaxInputSplitSize(job, 3145728);
                // 设置Combiner
                job.setCombinerClass(WCReducer.class);
        // 9.  设置最终产生结果(reduce阶段)输出的结果的key-value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 10. 将Job提交运行!
        boolean result = job.waitForCompletion(true);
        if(result) {
            System.out.println("程序执行完毕!");
        }
        System.exit(result?0:1);
        
    }
}

MR2.x 架构设计(MR on Yarn流程、mr提交job)

如图:
Hadoop的MR on Yarn

Yarn: RM NM

RM:
applications Manager 应用程序管理器
resource scheduler 资源memory+cpu调度器

红色框是什么?
container 虚拟的概念 属于NM节点上,专门用来MR、spark等技术的最小单元
map task
reduce task

2.1.用户向Yarn提交应用程序(job app application),jar文件、sql;
其中包裹ApplicationMaster程序、启动ApplicationMaster的命令等等

2.2.RM为该job分配第一个container,运行job的ApplicationMaster
2.3.App Master向applications Manager注册,这样就可以
在RM WEB界面查询这个job的运行状态

2.4.App Master采用轮询的方式通过RPC协议向RM申请和领取资源

2.5.一旦App Master拿到资源,就对应的与NM通信,要求启动任务
2.6.NM为任务设置好运行环境(jar包等),将任务启动命令写在一个脚本里。
并通过该脚本启动任务 task
2.7.各个task通过rpc协议向App Master汇报自己的状态和进度,以此让App Master随时掌握各个task的运行状态。
从而在task运行失败重启任务。

2.8.App Master向applications Manager注销且关闭自己

简单地说:

分为两步:
启动App Master,申请资源;
运行任务,直到任务运行完成。
三、map task个数
Map 映射 ?个 生产
Reduce 聚合 默认1个 生产

参考博客:

http://blog.itpub.net/30089851/viewspace-2095837/

[aaa@qq.com /home/pxj/app/hadoop]$hadoop jar \
> ./share/hadoop/mapreduce2/hadoop-mapreduce-examples-2.6.0-cdh5.16.2.jar \
>  wordcount \
>  /wordcount/input  /wordcount/output
19/12/09 00:02:01 INFO input.FileInputFormat: Total input paths to process : 1
19/12/09 00:02:01 INFO mapreduce.JobSubmitter: number of splits:1
Job Counters 
  Launched map tasks=1
  Launched reduce tasks=1
[aaa@qq.com /home/pxj/app/hadoop]$hadoop fs -put /home/pxj/2.log  /wordcount/input
19/12/09 00:07:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/12/09 00:07:04 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/12/09 00:07:05 INFO input.FileInputFormat: Total input paths to process : 2
19/12/09 00:07:05 INFO mapreduce.JobSubmitter: number of splits:2
    Launched map tasks=2
        Launched reduce tasks=1

map task:
1.log 86字节 1块
m 1份

2.log 20字节 1块
m 2份

认为: map task个数和 文件的数量有关

3.log 130M 2块
m : 4份?

尽量在生产上控制一个文件的大小稍微小于一个blocksize
比如128M 文件为120M

假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,
大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片
(input split),65mb则是两个输入分片(input split)
而127mb也是两个输入分片(input split),
换句话说我们如果在map计算前做输入分片调整,
例如合并小文件,那么就会有5个map任务将执行,
而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。
???
应该在生产要考虑你的文件
是什么文件格式: txtfile orc parquet
压缩格式: gzip snappy lzo

hive: orc/parquet + bzip2
hbase: hfile + snappy
参考网站
https://ruozedata.github.io/2018/04/18/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%8E%8B%E7%BC%A9%EF%BC%8C%E4%BD%A0%E4%BB%AC%E7%9C%9F%E7%9A%84%E4%BA%86%E8%A7%A3%E5%90%97%EF%BC%9F/

四、shuffle过程

Hadoop的MR on Yarn

解释:

Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段快排;溢写产生大量溢写文件,需要对溢写文件进行归并排序;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。
每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。

五、压缩格式和文件格式

文件格式

Hadoop的MR on Yarn

压缩格式

Hadoop的MR on Yarn
Hadoop的MR on Yarn

Compression
压缩比
解压速度
压缩比=压缩/解压速度