欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop学习笔记 2 - MapReduce 简单实例

程序员文章站 2022-03-16 11:04:21
...

1.2 MapReduce开发实例

 

MapReduce 执行过程,如下图,(先由Mapper进行map计算,将数据进行分组,然后在由Reduce进行结果汇总计算)

Hadoop学习笔记 2 - MapReduce 简单实例

 

直接上代码

package com.itbuilder.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static void main(String[] args) throws Exception {

		//构建一个JOB对象
		Job job = Job.getInstance(new Configuration());
		
		//注意:main方法所在的类
		job.setJarByClass(WordCount.class);
		
		//设置Mapper相关属性
		job.setMapperClass(WCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		//设置Reducer相关属性
		job.setReducerClass(WCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//提交任务
		job.waitForCompletion(true);
	}

	
	
	public static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		
		public WCMapper() {
			super();
		}

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			
			String words[] = line.split(" ");
			
			for (String word : words) {
				context.write(new Text(word), new LongWritable(1));
			}
			
		}
		
		
	}
	
	public static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

		
		public WCReducer() {
			super();
		}

		@Override
		protected void reduce(Text k2, Iterable<LongWritable> v2,
				Reducer<Text, LongWritable, Text, LongWritable>.Context arg2)
				throws IOException, InterruptedException {
			long counter = 0;
			for (LongWritable count : v2) {
				counter += count.get();
			}
			arg2.write(k2, new LongWritable(counter));
		}

	}
	
	
}

 

需要注意:

WCMapper、WCReducer 作为内部类,必须是静态的内部类

 

 

pom.xml 中的jar包依赖

<dependencies>
  	<dependency>
  		<groupId>junit</groupId>
  		<artifactId>junit</artifactId>
  		<version>4.11</version>
  		<scope>test</scope>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.hadoop</groupId>
  		<artifactId>hadoop-mapreduce-client-core</artifactId>
  		<version>2.7.1</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.hadoop</groupId>
  		<artifactId>hadoop-common</artifactId>
  		<version>2.7.1</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.hadoop</groupId>
  		<artifactId>hadoop-hdfs</artifactId>
  		<version>2.7.1</version>
  	</dependency>
  	
  	<dependency>
  		<groupId>org.apache.hadoop</groupId>
  		<artifactId>hadoop-yarn-common</artifactId>
  		<version>2.7.1</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.hadoop</groupId>
  		<artifactId>hadoop-yarn-client</artifactId>
  		<version>2.7.1</version>
  	</dependency>
  </dependencies>

 

相关标签: mapreduce