Mapreduce的wordcount写法
程序员文章站
2024-03-22 13:56:40
...
Mapreduce的wordcount写法
Mapreduce是一种编程模型,负责海量数据的运算,会在不同的节点进行分布式数据的运算,这样就可以极大的提高运算的效率,以便于进行数据的分析.
当mapreduce运启动后,首先会运行众多的map task,当map task处理完自己的数据之后,还需要启动众多的reduce task,这个时候如果用户通过自己手动启动的话并不科学,所以这个时候需要一个自动化的调度平台,hadoop就为运行mapreduce类的分布式计算程序开发了一个自动化调度平台—yarn。
下面我们就使用java代码编写一个mr的wordcount的程序,Mapreduce程序的主要思想:map映射和reduce归约。
- map 端
package demo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* !!!!!!! 这个几个数据类型导包一定不要导到错了!!!!!!!
* mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,KEYIN是maptask调用的读文本的工
* 具读到的数据key,默认是一行数据的偏移量,为Long类型,VALUEIN是所读取到的
* 这一行数据的值value,默认是String类型,KEYOUT,是我们自定义的mapper类中
* 逻辑将要返回的数据的key,类型根据自己逻辑设定,VALUEOUT也是同样的道理。
*
* 由于map reduce做的是一个分布式的计算,所以数据需要在各个节点之间传递持久
* 化存储,所以数据需要进行序列化,而jdk中自带的序列化机制是非常重的,效率
* 和很低,所以hadoop开发了自己的序列化机制,那么,程序中需要传递的持久化数
* 据类型,就需要实现hadoop自己的额序列化框架。
*
* hadoop为一些常见的数据类型封装了实现自己序列化机制的类型
* LongWritable ==> Long
* Text ==> String
* IntWritable ==> Integer
* DoubleWritable ==> Double
* .....
*/
public class MapWordCount extends Mapper<LongWritable, Text,Text, IntWritable> {
/**
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
* 每一个map task都会调用这个方法,一个map每读一行就会调用一次map()方法,key就是一行的起始偏移量,
* value就是行内容,
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
// 按照指定字符行内容切分成一个个的单词
String[] words = line.split(" ");
// 将每个单词的值定为1,然后交给reduce
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
- reduce 端
package demo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,这里面的KEYIN、VALUEIN对应的就是mapper的KEYOUT、VALUEOUT,
* KEYOUT、VALUEOUT就是我们reduce逻辑想要输出的数据类型
*
*/
public class ReduceWordCount extends Reducer<Text, IntWritable,Text,IntWritable> {
/**
* 众多的reduce都会调用这个reduce()方法,每拿一组相同key的数据调用一次
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> it = values.iterator();
while(it.hasNext()){
count += it.next().get();
}
context.write(key,new IntWritable(count));
System.out.println(key + " " + count);
}
}
- 客户端
package demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmitter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 描述在哪个平台运行,这里使用的是在本地运行mr程序
conf.set("mapreduce.framework.name", "local");
// 如果在yarn上运行需要指定yarn的位
//conf.set("yarn.resourcemanager.hostname", "lx01");
// 客户端
Job job = Job.getInstance(conf);
// 指定mr程序的jar包获取路径,通过类加载机制动态获取
job.setJarByClass(JobSubmitter.class);
job.setMapperClass(MapWordCount.class);
job.setReducerClass(ReduceWordCount.class);
// 指定map的key,value输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定reduce的key,value输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定job所要读取数据源的目录
FileInputFormat.setInputPaths(job,new Path("F:\\etl_test_data\\wordcount.txt"));
//指定job数据结果目录
FileOutputFormat.setOutputPath(job,new Path("F:\\etl_test_data\\output\\"));
// 提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : -1);
}
}
- 原始数据和结果数据比对,图一为原始数据,图二为结果数据
图一
图二
可以看到,通过map reduce程序将我们想要的数据已经计算出来。
上一篇: set
下一篇: const - 常见错误
推荐阅读
-
Mapreduce实例(一):WordCount
-
Mapreduce的wordcount写法
-
3.2 Mapreduce实例—WordCount
-
直接插入排序的严格定义写法与优化算法
-
[笔记]二分查找的几种写法
-
GRAILS中Criteria的OR的写法 grailsjavasql
-
MapReduce原理(3): MapReduce的分片机制 getSplits()方法 源码解析
-
scala语言的spark实现wordcount 博客分类: sparkscala
-
javascript 对象的单继承属性和多继承属性写法
-
Hadoop 上运行基于中文分词算法的 MapReduce 程序,进行词频分析。