欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce并行计算框架

程序员文章站 2022-03-16 11:11:08
...

1.思想:分而治之

map:对每一部分数据进行处理

reduce:合并

2.数据流动的形式是<key,value>

1.

Map阶段由一定数量的Map Task组成

*输入数据格式解析:InputFormat

*输入数据处理:Mapper

*数据分组:Partitioner

2.

Reduce阶段由一定数量的Reduce任务组成

*数据远程拷贝

*数据按照Key排序

*数据处理:Reducer

*数据输出格式:OutputFormat

3.编写MapReduce程序

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hadoop</groupId>
  <artifactId>mapreduce</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>mapreduce</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
    	<groupId>org.apache.hadoop</groupId>
    	<artifactId>hadoop-client</artifactId>
    	<version>2.5.0</version>
    	
    </dependency>
    <dependency>
    	<groupId>junit</groupId>
    	<artifactId>junit</artifactId>
    	<version>4.10</version>
    	
    </dependency>
  </dependencies>
</project>

2.src/main/resources

导入

3.指定输出

MapReduce并行计算框架

MapReduce并行计算框架

4.将hadoop里面配置好的4个xml文件复制到src/main/resources

MapReduce并行计算框架

cp core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml /root/workspace/mapreduce/src/main/resources/

refresh一下src/main/resources这个文件夹

最后确认基本环境没有问题,运行一下自带的代码

MapReduce并行计算框架

好了,可以开始真正的MapReduce部分的代码啦

八股文写代码

map:(k1,v1)->list(k2,V2)

reduce:(K2,list(v2)) ->list(K3,V3)

Context是上下文对象

mapper<longWritable,Text,Text,IntWritable>

longWritable是偏移量 Text是每行类型,对应java中的string 

Text,IntWritable 文件中的key,value类型对应string和int

注意,map的输出结果就是reduce的输入结果

map代码中比较好的两个地方:将value设置为常量1

不使用split函数太耗内存 写代码的时候集中注意力到map和reduce函数即可

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
	//step1 map class
	public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
		private Text mapOutputKey = new Text();
		private final static IntWritable mapOutputValue = new IntWritable(1);
		@Override
		public void map(LongWritable key, Text value,
				org.apache.hadoop.mapreduce.Mapper.Context context)
				throws IOException, InterruptedException {
			//line value
			String lineValue = value.toString();
			//split
			//lineValue.split(" "); 
			StringTokenizer strT = new StringTokenizer(lineValue);
			//iterator
			while(strT.hasMoreTokens()){
				String wordValue= strT.nextToken();
				mapOutputKey.set(wordValue);
				context.write(mapOutputKey, mapOutputValue);
			}
			
		}
	}
	//step2 reduce class
	public static class WordCountReducer  extends Reducer<Text,IntWritable,Text,IntWritable>{
		private IntWritable reduceOutputValue = new IntWritable();
		@Override
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			//sum
			int sum = 0;
			//iterator
			for (IntWritable value:values){
				sum += value.get();
			}
			reduceOutputValue.set(sum);
			context.write(key, reduceOutputValue);
		}
	}
	//step3 driver class 
	public void run(String[] args) throws Exception{
		//1.get configuration
		Configuration conf = new Configuration();
		//2.create job
		Job job = Job.getInstance(conf,this.getClass().getSimpleName());
		//run jar
		job.setJarByClass(this.getClass());
		//set job input->map->reduce->output
		Path inpath = new Path(args[0]);
		FileInputFormat.addInputPath(job, inpath);
		//map
		job.setMapperClass(WordCountMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//reduce
		job.setReducerClass(WordCountReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//output
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);
		//submit
		boolean isSuccess = job.waitForCompletion(true);
		if (isSuccess){
			System.out.println("success");
		}else{
			System.out.println("fail");
		}
	}
	public static void main(String[] args) throws Exception {
		new WordCount().run(args);
		
	}
}

 

4.测试运行

MapReduce并行计算框架

选择main class 打包完了以后 添加权限 chmod u+x wordCount.jar

bin/yarn jar jars/wordCount.jar inpath outpath

相关标签: MapReduce