mapreduce详解
hadoop主要基于hdfs和mapreduce,其中hdfs为分布式文件系统,mapreduce为一种计算框架。本篇文章,笔者主要基于java介绍mapreduce原理。本篇文章包括以下几部分:
mapreduce原理
wordcount入门
sort
reduce side join
map side join
自定义WritableComparable
原理
相信很多朋友喜欢打扑克牌,如果两副扑克牌第一次使用完后第二次使用的时候需要清牌,看是否少牌。这时我们会经历以下几个步骤:
先将牌均匀分成四份给四个人
每个人将自己手上牌进行分类统计(A有几张,2有几张以此类推)
每个人将手中的牌在桌上铺开,同一类牌放一堆,不同的牌分开放
最后四个人分别统计桌上每一堆牌是否都有8张,没有的话表示差牌
以上例子中就用到了mapreduce的原理。步骤一对应InputFormat,此过程将大的文件进行分成小的易处理文件,步骤二对应的是Map过程,每个任务分别对小的文件进行有序统计处理,步骤三对应shuffle过程,此过程将不同文件中相同分类的键值分在一起,步骤四对应Reduce过程,进行最终的统计。通过一系列过程,我么可以轻松化整为零,并行进行运算,快速统计结果,数据流程如下图
准备工作
这里所有的代码都是在本地运行调试,不需要安装hadoop。你只需要创建maven项目,maven配置如下
maven配置
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.7.3</version> </dependency> </dependencies>
本文例子代码已上传github地址: https://github.com/TreasureGitHub/hadoop
wordcount
接下来以最简单的word count程序来开始讲解
package com.practice.hadoop.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import com.practice.hadoop.utils.FileOperate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * word count */ public class WordCount { public static class WordCountMap extends Mapper<LongWritable,Text,Text,IntWritable> { private Text word = new Text(); private static final IntWritable one = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()) { String str = st.nextToken(); word.set(str); context.write(word,one); } } } public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable value = new IntWritable(); protected void reduce(Text key, Iterable<IntWritable> iter, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable one:iter) { sum += one.get(); } value.set(sum); context.write(key,value); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 得到默认配置 Configuration conf = new Configuration(); // 得到job Job job = Job.getInstance(conf,"Wordcount"); job.setJarByClass(WordCount.class); // 设置map class job.setMapperClass(WordCountMap.class); // 设置combline class job.setCombinerClass(WordCountReduce.class); // 设置reduce class job.setReducerClass(WordCountReduce.class); // 设置输出key值 job.setOutputKeyClass(Text.class); // 设置输出value值 job.setOutputValueClass(IntWritable.class); String input = "./src/main/resources/wc_input"; String output = "./src/main/resources/wc_output"; // 删除结果文件, FileOperate.deleteDir(output); FileInputFormat.addInputPath(job,new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); System.out.println(job.waitForCompletion(true) ? 0 : 1); } }
-
InputFormat()和InputSplit
InputSplit是hadoop用来把输入数据传送给每个单独的Map,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSpit的方法可以通过InputFormat()来设置。当数据传送给Map时,Map会将输入分片传送到InputFormat()上,InputFormat则调用getRecorder()方法生成RecordReader,RecordReader再通过createKey()、createValue()创建可供map处理的<key,value>。简而言之,InputFormat()方法是用来生成可供Map处理的<key,value>.InputFormat有许多子类,其中TextInputFormat是hadoop默认的InputFormat.
-
OutputFormat
每一种输入格式都有对应的输出格式,默认的输出格式是TextOutFormat
-
map和reducemap程序需要实现Mapper类的map方法,reduce类需要实现Reducer类的reduce方法
map(LongWritable key, Text value, Context context)方法中key值为记录的偏移量,value为文件中每一行记录,context上下文用来将生成的键值对写入map输出文件中,输出格式为<k2,v2>
reduce(Text key, Iterable<IntWritable> iter, Context context)方法中key值为上面map的输出的key中,iter为将相同的key值合并后的迭代器。需要注意,map的输出键值对和reduce的输入键值类型需要一致,不然会报错。
执行以上代码,我们可以在wc_output路径下看到结果文件,主要有part-r-00000和SUCCESS及相应的.crc文件,其中_part-r-00000文件内容为计算得到的结果值,SUCCESS文件用来判断任务是否成功。.crc文件记录了文件的校验和,用来校验数据完整性
对于map和reduce每一行到底是如何执行,我么可以查看Mapper源码
/** * The <code>Context</code> passed on to the {@link Mapper} implementations. */ public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ "unchecked") ( protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
Mapper task执行时会调用run方法,run方法调用一次setup方法,然后对输入的每一行记录都调用map方法。最后再调用一次cleanup方法,如果有必要的话,我么可以重写setup和cleanup方法。Reduce task执行原理类似,有兴趣的童鞋可以自行查看源码
Sort
Reducer reduces a set of intermediate values which share a key to a smaller set of values
Reducer包括shuffle, sort 和reduce 三个阶段
-
Shuffle
Input to the
Reducer
is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP. -
Sort
The framework groups
Reducer
inputs by keys (since different mappers may have output the same key) in this stage.
框架在生成reducer输入时按照key值进行分发,默认按照HashPartitioner进行分发,根据hash值同一个key值相关数据永远在同一个reduce上,我们可以通过定义Partitioner类来自定义分发方法。下例子中设置3个reduce task任务,但是得到的数据却为升序排序
package com.practice.hadoop.mapreduce; import java.io.IOException; import com.practice.hadoop.utils.FileOperate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * sort partition 示例 * * @author liufeifei * @date 2018/05/09 */ public class SortPartition { private static int maxNum = 0; public static class Map extends Mapper<LongWritable,Text,IntWritable,IntWritable> { private static IntWritable data = new IntWritable(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); int num = Integer.parseInt(line); if(num > maxNum) { maxNum = num; } data.set(num); context.write(data,new IntWritable(1)); } } public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private static IntWritable linenum = new IntWritable(1); protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for(IntWritable val:values) { context.write(linenum,key); linenum = new IntWritable(linenum.get() + 1); } } } public static class Partition extends Partitioner<IntWritable,IntWritable> { public int getPartition(IntWritable key, IntWritable value, int numPartitions) { // numPartitions 为设置reduce task的数量 int bound = 1000 / numPartitions + 1; int keynumber = key.get(); for(int i = 1 ; i <= numPartitions ; i++) { if(keynumber < bound * i && keynumber >= bound * (i - 1)) { return i - 1; } } return -1; } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration() ; Job job = Job.getInstance(conf, "Sort"); job.setJarByClass(SortPartition.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); // 设置 partition job.setPartitionerClass(Partition.class); job.setReducerClass(Reduce.class); // reduce数量设置为3,启动三个reduce,并且结果文件为3份 job.setNumReduceTasks(3); String input = "./src/main/resources/sort_input"; String output = "./src/main/resources/sort_output"; FileOperate.deleteDir(output); FileInputFormat.addInputPath(job,new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 :1); } }
Reduce Side Join
在redcue端实现两张表间的join操作
实现如下代码
select t.factoryname
, t1.addressname
from factory t
left join address t1
on t.address = t1.addressid
思路 在map端时对两张表分别进行标记:factoryname标记为表一,addressname标记为表二,并将address作为key值。在reduce端输入的key值为address,value为key对应的factoryname、addressname值。对factoryname和addressname求笛卡尔值即可得到结果。直接上代码如下:
package com.practice.hadoop.mapreduce; import java.io.IOException; import java.util.List; import com.practice.hadoop.utils.FileOperate; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * reduce 端join 示例 * * @date 2018/05/09 */ public class ReduceSideJoin { //private static int time = 0; public static class ReduceSideJoinMap extends Mapper<LongWritable,Text,Text,Text> { private int side = 0; private static final String FACTORY_NAME = "factoryname"; private static final String ADDRESS_ID = "addressid"; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strArr = value.toString().split(","); if(FACTORY_NAME.equals(strArr[0]) || ADDRESS_ID.equals(strArr[0])){ side = FACTORY_NAME.equals(strArr[0]) ? 1 : 2; return ; } if(side == 1) { // 左表 context.write(new Text(strArr[1]),new Text("1+" + strArr[0])); } else { // 右表 context.write(new Text(strArr[0]),new Text("2+" + strArr[1])); } } } public static class ReduceSideJoinReduce extends Reducer<Text,Text,Text,Text> { private static int time = 0; protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if(time == 0 ) { context.write(new Text("factory"),new Text("address")); time ++; } List<String> left = Lists.newLinkedList(); List<String> right = Lists.newLinkedList(); for(Text value:values) { String word = value.toString(); String subValue = (word == null) ? "":word.substring(2) ; if(word.startsWith("1+")) { // 左表值 left.add(subValue); } else { // 右表值 right.add(subValue); } } if(left.size() == 0) { return; } else { // 左表和右表求笛卡尔值 for(String item:left) { if(right.size() == 0) { context.write(new Text(item),new Text("")); } else { for (String item1 : right) { context.write(new Text(item), new Text(item1)); } } } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 设置输出分隔符 conf.set("mapreduce.output.textoutputformat.separator",","); Job job = Job.getInstance(conf); job.setJobName("ReduceSideJoin"); job.setMapperClass(ReduceSideJoinMap.class); job.setReducerClass(ReduceSideJoinReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String input = "./src/main/resources/rsj_input"; String output = "./src/main/resources/rsj_output"; FileOperate.deleteDir(output); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Map Side Join
在map端进行join操作
主要原理是将address缓存到内存中,在进行map计算的时候,将缓存依次和map记录进行匹配,找到相应的address_name并写入结果文件中,job.addCacheFile(new URI(cache))方法会在map task启动前将缓存加载到各节点。
注意:此处只需进行map操作,无需设置reduce。cache 目录设置为 /tmp/address.txt,直接放资源目录下可能出现无权限错误。具体实现代码如下
package com.practice.hadoop.mapreduce; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import com.google.common.collect.Maps; import com.practice.hadoop.utils.FileOperate; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * map 端 join 示例 * * @date 2018/05/09 */ public class MapSideJoin { public static class MapSideJoinMap extends Mapper<LongWritable,Text,Text,Text> { private static final String FACTORY_NAME = "factoryname"; private Configuration conf; private Map<String,String> addMap = Maps.newHashMap(); protected void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); URI[] uris = Job.getInstance(conf).getCacheFiles(); for(URI uri : uris) { String fileName = uri.getPath(); BufferedReader bf = new BufferedReader(new FileReader(fileName)); String str = null; while( (str = bf.readLine()) != null) { String[] arr =str.split(","); if(StringUtils.equals("addressid",arr[0])) { continue; } else { addMap.put(arr[0],arr[1]); } } } } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strArr = value.toString().split(","); if(FACTORY_NAME.equals(strArr[0])){ return; } if( addMap.get(strArr[1]) != null) { context.write(new Text(strArr[0]),new Text(addMap.get(strArr[1]))); } else { context.write(new Text(strArr[0]),new Text("")); } } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.output.textoutputformat.separator",","); Job job = Job.getInstance(conf,"MapSideJoin"); String cache = "/tmp/address.txt"; String input = "./src/main/resources/msj_input"; String output = "./src/main/resources/msj_output"; FileOperate.deleteDir(output); job.setMapperClass(MapSideJoinMap.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(input)); FileOutputFormat.setOutputPath(job,new Path(output)); // 加入缓存 job.addCacheFile(new URI(cache)); System.exit(job.waitForCompletion(true)? 0:1); } }
自定义WritableComparable
mapreduce中key、value值需要实现WritableComparable接口。默认的类有Text,IntWritable,LongWritable、BooleanWritable等。用户可自定义类,只需实现WritableComparable接口
package com.practice.hadoop.pojo; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * @author liufeifei * @date 2018/05/20 */ public class User implements WritableComparable<User> { /** * 姓名 */ private String name; /** * 年龄 */ private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } /** * 写入 数据 * * @param in * @throws IOException */ public void readFields(DataInput in) throws IOException { name = in.readUTF(); age = in.readInt(); } /** * 读取数据 * * @param out * @throws IOException */ public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); } /** * 比较方法 * * @param o * @return */ public int compareTo(User o) { int res1 = Integer.compare(this.age,o.age); int res2 = this.name.compareTo(o.name); // 先比较年龄,在比较姓名 return res1 != 0 ? res1 : - res2; } public int hashCode() { return name.hashCode() + age; } public boolean equals(Object obj) { if( !(obj instanceof User)) { return false; } User o = (User) obj; return this.name == o.name && this.age == o.age; } /** * 输出值 * * @return */ public String toString() { return name + "\t" + age; } }
用起来也比较简单,结果是先按照年龄排序,年龄相同再按照姓名排序,直接上代码
package com.practice.hadoop.mapreduce; import java.io.IOException; import com.practice.hadoop.pojo.User; import com.practice.hadoop.utils.FileOperate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * User 排序 * * @author liufeifei * @date 2018/05/20 */ public class UserScore { public static class UserScoreMap extends Mapper<LongWritable,Text,User,IntWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(","); User user = new User(); user.setName(arr[0]); user.setAge(Integer.parseInt(arr[1])); context.write(user,new IntWritable(Integer.parseInt(arr[3]))); } } public static class UserScoreReducer extends Reducer<User,IntWritable,User,IntWritable> { protected void reduce(User key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; for(IntWritable value: values) { sum = sum + value.get(); cnt ++; } context.write(key, new IntWritable(sum / cnt)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.output.textoutputformat.separator",","); Job job = Job.getInstance(conf,"UserScore"); job.setJarByClass(UserScore.class); job.setMapperClass(UserScoreMap.class); job.setReducerClass(UserScoreReducer.class); job.setOutputKeyClass(User.class); job.setOutputValueClass(IntWritable.class); String input = "./src/main/resources/userscore_input"; String output = "./src/main/resources/userscore_output"; FileOperate.deleteDir(output); FileInputFormat.addInputPath(job,new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } }
至此,mapreduce介绍告一段落
参考书籍<hadoop实战>
上一篇: 打印空心菱形(Java)
推荐阅读