MapReduce并行计算框架
1.思想:分而治之
map:对每一部分数据进行处理
reduce:合并
2.数据流动的形式是<key,value>
1.
Map阶段由一定数量的Map Task组成
*输入数据格式解析:InputFormat
*输入数据处理:Mapper
*数据分组:Partitioner
2.
Reduce阶段由一定数量的Reduce任务组成
*数据远程拷贝
*数据按照Key排序
*数据处理:Reducer
*数据输出格式:OutputFormat
3.编写MapReduce程序
1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hadoop</groupId>
<artifactId>mapreduce</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>mapreduce</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
</project>
2.src/main/resources
导入
3.指定输出
4.将hadoop里面配置好的4个xml文件复制到src/main/resources
cp core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml /root/workspace/mapreduce/src/main/resources/
refresh一下src/main/resources这个文件夹
最后确认基本环境没有问题,运行一下自带的代码
好了,可以开始真正的MapReduce部分的代码啦
八股文写代码
map:(k1,v1)->list(k2,V2)
reduce:(K2,list(v2)) ->list(K3,V3)
Context是上下文对象
mapper<longWritable,Text,Text,IntWritable>
longWritable是偏移量 Text是每行类型,对应java中的string
Text,IntWritable 文件中的key,value类型对应string和int
注意,map的输出结果就是reduce的输入结果
map代码中比较好的两个地方:将value设置为常量1
不使用split函数太耗内存 写代码的时候集中注意力到map和reduce函数即可
package com.hadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
//step1 map class
public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private Text mapOutputKey = new Text();
private final static IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
//line value
String lineValue = value.toString();
//split
//lineValue.split(" ");
StringTokenizer strT = new StringTokenizer(lineValue);
//iterator
while(strT.hasMoreTokens()){
String wordValue= strT.nextToken();
mapOutputKey.set(wordValue);
context.write(mapOutputKey, mapOutputValue);
}
}
}
//step2 reduce class
public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable reduceOutputValue = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
//sum
int sum = 0;
//iterator
for (IntWritable value:values){
sum += value.get();
}
reduceOutputValue.set(sum);
context.write(key, reduceOutputValue);
}
}
//step3 driver class
public void run(String[] args) throws Exception{
//1.get configuration
Configuration conf = new Configuration();
//2.create job
Job job = Job.getInstance(conf,this.getClass().getSimpleName());
//run jar
job.setJarByClass(this.getClass());
//set job input->map->reduce->output
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
//map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//submit
boolean isSuccess = job.waitForCompletion(true);
if (isSuccess){
System.out.println("success");
}else{
System.out.println("fail");
}
}
public static void main(String[] args) throws Exception {
new WordCount().run(args);
}
}
4.测试运行
选择main class 打包完了以后 添加权限 chmod u+x wordCount.jar
bin/yarn jar jars/wordCount.jar inpath outpath
推荐阅读
-
使用IDEA配置Maven搭建开发框架ssm教程
-
Python爬虫框架Scrapy实战之批量抓取招聘信息
-
asp.net在iis7中更改网站的.net framework框架版本的方法(图)
-
Spring mvc整合tiles框架的简单入门教程(maven)
-
spring mvc 组合mybatis框架实例详解
-
Java框架搭建之Maven、Mybatis、Spring MVC整合搭建(图文)
-
使用JSCH框架通过跳转机访问其他节点的方法
-
Objective-C的缓存框架EGOCache在iOS App开发中的使用
-
asp.net在iis7中更改网站的.net framework框架版本的方法(图)
-
android教你打造独一无二的上拉下拉刷新加载框架