欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mapreduce详解

程序员文章站 2022-03-16 11:04:03
...
hadoop主要基于hdfs和mapreduce,其中hdfs为分布式文件系统,mapreduce为一种计算框架。本篇文章,笔者主要基于java介绍mapreduce原理。本篇文章包括以下几部分:
  • mapreduce原理

  • wordcount入门

  • sort

  • reduce side join

  • map side join

  • 自定义WritableComparable

原理

相信很多朋友喜欢打扑克牌,如果两副扑克牌第一次使用完后第二次使用的时候需要清牌,看是否少牌。这时我们会经历以下几个步骤:

  1. 先将牌均匀分成四份给四个人

  2. 每个人将自己手上牌进行分类统计(A有几张,2有几张以此类推)

  3. 每个人将手中的牌在桌上铺开,同一类牌放一堆,不同的牌分开放

  4. 最后四个人分别统计桌上每一堆牌是否都有8张,没有的话表示差牌

以上例子中就用到了mapreduce的原理。步骤一对应InputFormat,此过程将大的文件进行分成小的易处理文件,步骤二对应的是Map过程,每个任务分别对小的文件进行有序统计处理,步骤三对应shuffle过程,此过程将不同文件中相同分类的键值分在一起,步骤四对应Reduce过程,进行最终的统计。通过一系列过程,我么可以轻松化整为零,并行进行运算,快速统计结果,数据流程如下图

mapreduce详解

准备工作

这里所有的代码都是在本地运行调试,不需要安装hadoop。你只需要创建maven项目,maven配置如下

maven配置

  <dependencies>
          <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-mapreduce-client-core</artifactId>
              <version>2.7.3</version>
          </dependency>
  
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-common</artifactId>
              <version>2.7.3</version>
          </dependency>
  
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
              <version>2.7.3</version>
          </dependency>
  
      </dependencies>

本文例子代码已上传github地址: https://github.com/TreasureGitHub/hadoop

wordcount

接下来以最简单的word count程序来开始讲解

  package com.practice.hadoop.mapreduce;
  
  import java.io.IOException;
  import java.util.StringTokenizer;
  
  import com.practice.hadoop.utils.FileOperate;
  
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.RawComparator;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  /**
   * word count
   */
  public class WordCount {
  
      public static class WordCountMap extends Mapper<LongWritable,Text,Text,IntWritable> {
  
          private Text word = new Text();
          private static final IntWritable one = new IntWritable(1);
  
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              StringTokenizer st = new StringTokenizer(value.toString());
              while(st.hasMoreTokens()) {
                  String str = st.nextToken();
                  word.set(str);
                  context.write(word,one);
              }
          }
      }
  
      public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
  
          private IntWritable value = new IntWritable();
  
          @Override
          protected void reduce(Text key, Iterable<IntWritable> iter, Context context) throws IOException, InterruptedException {
              int sum = 0;
              for(IntWritable one:iter) {
                  sum += one.get();
              }
  
              value.set(sum);
              context.write(key,value);
          }
      }
  
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
          // 得到默认配置
          Configuration conf = new Configuration();
          // 得到job
          Job job = Job.getInstance(conf,"Wordcount");
  
          job.setJarByClass(WordCount.class);
          // 设置map class
          job.setMapperClass(WordCountMap.class);
          // 设置combline class
          job.setCombinerClass(WordCountReduce.class);
          // 设置reduce class
          job.setReducerClass(WordCountReduce.class);
  
          // 设置输出key值
          job.setOutputKeyClass(Text.class);
          // 设置输出value值
          job.setOutputValueClass(IntWritable.class);
  
          String input = "./src/main/resources/wc_input";
          String output = "./src/main/resources/wc_output";
          // 删除结果文件,
          FileOperate.deleteDir(output);
  
          FileInputFormat.addInputPath(job,new Path(input));
          FileOutputFormat.setOutputPath(job, new Path(output));
  
          System.out.println(job.waitForCompletion(true) ? 0 : 1);
      }
  }
  • InputFormat()和InputSplit

    InputSplit是hadoop用来把输入数据传送给每个单独的Map,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。生成InputSpit的方法可以通过InputFormat()来设置。当数据传送给Map时,Map会将输入分片传送到InputFormat()上,InputFormat则调用getRecorder()方法生成RecordReader,RecordReader再通过createKey()、createValue()创建可供map处理的<key,value>。简而言之,InputFormat()方法是用来生成可供Map处理的<key,value>.InputFormat有许多子类,其中TextInputFormat是hadoop默认的InputFormat.

  • OutputFormat

    每一种输入格式都有对应的输出格式,默认的输出格式是TextOutFormat

  • map和reducemap程序需要实现Mapper类的map方法,reduce类需要实现Reducer类的reduce方法

    map(LongWritable key, Text value, Context context)方法中key值为记录的偏移量,value为文件中每一行记录,context上下文用来将生成的键值对写入map输出文件中,输出格式为<k2,v2>

    reduce(Text key, Iterable<IntWritable> iter, Context context)方法中key值为上面map的输出的key中,iter为将相同的key值合并后的迭代器。需要注意,map的输出键值对和reduce的输入键值类型需要一致,不然会报错。

执行以上代码,我们可以在wc_output路径下看到结果文件,主要有part-r-00000和SUCCESS及相应的.crc文件,其中_part-r-00000文件内容为计算得到的结果值,SUCCESS文件用来判断任务是否成功。.crc文件记录了文件的校验和,用来校验数据完整性

对于map和reduce每一行到底是如何执行,我么可以查看Mapper源码

  
  /**
     * The <code>Context</code> passed on to the {@link Mapper} implementations.
     */
    public abstract class Context
      implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    }
    
    /**
     * Called once at the beginning of the task.
     */
    protected void setup(Context context
                         ) throws IOException, InterruptedException {
      // NOTHING
    }
  
    /**
     * Called once for each key/value pair in the input split. Most applications
     * should override this, but the default is the identity function.
     */
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, 
                       Context context) throws IOException, InterruptedException {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  
    /**
     * Called once at the end of the task.
     */
    protected void cleanup(Context context
                           ) throws IOException, InterruptedException {
      // NOTHING
    }
    
    /**
     * Expert users can override this method for more complete control over the
     * execution of the Mapper.
     * @param context
     * @throws IOException
     */
    public void run(Context context) throws IOException, InterruptedException {
      setup(context);
      try {
        while (context.nextKeyValue()) {
          map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
      } finally {
        cleanup(context);
      }
    }

Mapper task执行时会调用run方法,run方法调用一次setup方法,然后对输入的每一行记录都调用map方法。最后再调用一次cleanup方法,如果有必要的话,我么可以重写setup和cleanup方法。Reduce task执行原理类似,有兴趣的童鞋可以自行查看源码

Sort

Reducer reduces a set of intermediate values which share a key to a smaller set of values

Reducer包括shuffle, sort 和reduce 三个阶段

  • Shuffle

    Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

  • Sort

    The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage.

框架在生成reducer输入时按照key值进行分发,默认按照HashPartitioner进行分发,根据hash值同一个key值相关数据永远在同一个reduce上,我们可以通过定义Partitioner类来自定义分发方法。下例子中设置3个reduce task任务,但是得到的数据却为升序排序

  
  package com.practice.hadoop.mapreduce;
  
  import java.io.IOException;
  
  import com.practice.hadoop.utils.FileOperate;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Partitioner;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  /**
   * sort partition 示例
   *
   * @author liufeifei
   * @date 2018/05/09
   */
  public class SortPartition {
  
      private static int maxNum = 0;
  
      public static class Map extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
  
          private static IntWritable data = new IntWritable();
  
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  
              String line = value.toString();
              int num = Integer.parseInt(line);
  
              if(num > maxNum) {
                  maxNum = num;
              }
  
              data.set(num);
              context.write(data,new IntWritable(1));
          }
      }
  
      public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
  
          private static IntWritable linenum = new IntWritable(1);
  
          @Override
          protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
              throws IOException, InterruptedException {
              for(IntWritable val:values) {
                  context.write(linenum,key);
                  linenum = new IntWritable(linenum.get() + 1);
              }
          }
      }
  
      public static class Partition extends Partitioner<IntWritable,IntWritable> {
  
          @Override
          public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
              // numPartitions 为设置reduce task的数量
              int bound = 1000 / numPartitions + 1;
              int keynumber = key.get();
              for(int i = 1 ; i <= numPartitions ; i++) {
                  if(keynumber < bound * i && keynumber >= bound * (i - 1)) {
                      return i - 1;
                  }
              }
              return -1;
          }
      }
  
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
          Configuration conf = new Configuration() ;
          Job job = Job.getInstance(conf, "Sort");
  
          job.setJarByClass(SortPartition.class);
          job.setOutputKeyClass(IntWritable.class);
          job.setOutputValueClass(IntWritable.class);
  
          job.setMapperClass(Map.class);
          // 设置 partition
          job.setPartitionerClass(Partition.class);
          job.setReducerClass(Reduce.class);
  
          // reduce数量设置为3,启动三个reduce,并且结果文件为3份
          job.setNumReduceTasks(3);
  
          String input = "./src/main/resources/sort_input";
          String output = "./src/main/resources/sort_output";
          FileOperate.deleteDir(output);
  
          FileInputFormat.addInputPath(job,new Path(input));
          FileOutputFormat.setOutputPath(job, new Path(output));
  
          System.exit(job.waitForCompletion(true) ? 0 :1);
      }
  }

Reduce Side Join

在redcue端实现两张表间的join操作

实现如下代码

  
  select t.factoryname
       , t1.addressname
    from factory t
    left join address t1
      on t.address = t1.addressid

思路 在map端时对两张表分别进行标记:factoryname标记为表一,addressname标记为表二,并将address作为key值。在reduce端输入的key值为address,value为key对应的factoryname、addressname值。对factoryname和addressname求笛卡尔值即可得到结果。直接上代码如下:

  
  package com.practice.hadoop.mapreduce;
  
  import java.io.IOException;
  import java.util.List;
  
  import com.practice.hadoop.utils.FileOperate;
  
  import com.google.common.collect.Lists;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  /**
   *
   * reduce 端join 示例
   *
   * @date 2018/05/09
   */
  public class ReduceSideJoin {
  
      //private static int time = 0;
  
      public static class ReduceSideJoinMap extends Mapper<LongWritable,Text,Text,Text> {
  
          private int side = 0;
  
          private static final String FACTORY_NAME = "factoryname";
  
          private static final String ADDRESS_ID = "addressid";
  
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String[] strArr = value.toString().split(",");
  
              if(FACTORY_NAME.equals(strArr[0]) || ADDRESS_ID.equals(strArr[0])){
                  side = FACTORY_NAME.equals(strArr[0]) ? 1 : 2;
                  return ;
              }
  
              if(side == 1) {
                  // 左表
                  context.write(new Text(strArr[1]),new Text("1+" + strArr[0]));
              } else {
                  // 右表
                  context.write(new Text(strArr[0]),new Text("2+" + strArr[1]));
              }
          }
      }
  
      public static class ReduceSideJoinReduce extends Reducer<Text,Text,Text,Text> {
  
          private static int time = 0;
  
          @Override
          protected void reduce(Text key, Iterable<Text> values, Context context)
              throws IOException, InterruptedException {
  
              if(time == 0 ) {
                  context.write(new Text("factory"),new Text("address"));
                  time ++;
              }
  
  
              List<String> left = Lists.newLinkedList();
              List<String> right = Lists.newLinkedList();
  
              for(Text value:values) {
  
                  String word = value.toString();
                  String subValue = (word == null) ? "":word.substring(2) ;
  
                  if(word.startsWith("1+")) {
                      // 左表值
                      left.add(subValue);
                  } else {
                      // 右表值
                      right.add(subValue);
                  }
  
              }
  
              if(left.size() == 0) {
                  return;
              } else {
                  // 左表和右表求笛卡尔值
                  for(String item:left) {
                      if(right.size() == 0) {
                          context.write(new Text(item),new Text(""));
                      } else {
                          for (String item1 : right) {
                              context.write(new Text(item), new Text(item1));
                          }
                      }
                  }
              }
  
          }
      }
  
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  
          Configuration conf = new Configuration();
          // 设置输出分隔符
          conf.set("mapreduce.output.textoutputformat.separator",",");
  
          Job job = Job.getInstance(conf);
  
          job.setJobName("ReduceSideJoin");
          job.setMapperClass(ReduceSideJoinMap.class);
          job.setReducerClass(ReduceSideJoinReduce.class);
  
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
  
          String input = "./src/main/resources/rsj_input";
          String output = "./src/main/resources/rsj_output";
          FileOperate.deleteDir(output);
  
          FileInputFormat.addInputPath(job, new Path(input));
          FileOutputFormat.setOutputPath(job, new Path(output));
  
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
  }

Map Side Join

在map端进行join操作

主要原理是将address缓存到内存中,在进行map计算的时候,将缓存依次和map记录进行匹配,找到相应的address_name并写入结果文件中,job.addCacheFile(new URI(cache))方法会在map task启动前将缓存加载到各节点。

注意:此处只需进行map操作,无需设置reduce。cache 目录设置为 /tmp/address.txt,直接放资源目录下可能出现无权限错误。具体实现代码如下

  
  package com.practice.hadoop.mapreduce;
  
  import java.io.BufferedReader;
  import java.io.FileReader;
  import java.io.IOException;
  import java.net.URI;
  import java.net.URISyntaxException;
  import java.util.Map;
  
  import com.google.common.collect.Maps;
  import com.practice.hadoop.utils.FileOperate;
  import org.apache.commons.lang.StringUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  /**
   *
   * map 端 join 示例
   *
   * @date 2018/05/09
   */
  public class MapSideJoin {
  
      public static class MapSideJoinMap extends Mapper<LongWritable,Text,Text,Text> {
  
          private static final String FACTORY_NAME = "factoryname";
  
          private Configuration conf;
  
          private Map<String,String> addMap = Maps.newHashMap();
  
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              conf = context.getConfiguration();
              URI[] uris = Job.getInstance(conf).getCacheFiles();
              for(URI uri : uris) {
                  String fileName = uri.getPath();
  
                  BufferedReader bf = new BufferedReader(new FileReader(fileName));
  
                  String str = null;
                  while( (str = bf.readLine()) != null) {
                      String[] arr =str.split(",");
                      if(StringUtils.equals("addressid",arr[0])) {
                          continue;
                      } else {
                          addMap.put(arr[0],arr[1]);
                      }
                  }
  
              }
          }
  
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String[] strArr = value.toString().split(",");
  
              if(FACTORY_NAME.equals(strArr[0])){
                  return;
              }
  
              if( addMap.get(strArr[1]) != null) {
                  context.write(new Text(strArr[0]),new Text(addMap.get(strArr[1])));
              } else {
                  context.write(new Text(strArr[0]),new Text(""));
              }
  
  
          }
      }
  
      public static void main(String[] args)
          throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
          Configuration conf = new Configuration();
          conf.set("mapreduce.output.textoutputformat.separator",",");
  
          Job job = Job.getInstance(conf,"MapSideJoin");
  
          String cache = "/tmp/address.txt";
          String input = "./src/main/resources/msj_input";
          String output = "./src/main/resources/msj_output";
          FileOperate.deleteDir(output);
  
          job.setMapperClass(MapSideJoinMap.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
  
          FileInputFormat.addInputPath(job,new Path(input));
          FileOutputFormat.setOutputPath(job,new Path(output));
  
          // 加入缓存
          job.addCacheFile(new URI(cache));
  
          System.exit(job.waitForCompletion(true)? 0:1);
  
      }
  }

自定义WritableComparable

mapreduce中key、value值需要实现WritableComparable接口。默认的类有Text,IntWritable,LongWritable、BooleanWritable等。用户可自定义类,只需实现WritableComparable接口

  
  package com.practice.hadoop.pojo;
  
  import java.io.DataInput;
  import java.io.DataOutput;
  import java.io.IOException;
  
  import org.apache.hadoop.io.WritableComparable;
  
  /**
   * @author liufeifei
   * @date 2018/05/20
   */
  public class User implements WritableComparable<User> {
  
      /**
       * 姓名
       */
      private String name;
  
      /**
       * 年龄
       */
      private int age;
  
      public String getName() {
          return name;
      }
  
      public void setName(String name) {
          this.name = name;
      }
  
      public int getAge() {
          return age;
      }
  
      public void setAge(int age) {
          this.age = age;
      }
  
      /**
       * 写入 数据
       *
       * @param in
       * @throws IOException
       */
      @Override
      public void readFields(DataInput in) throws IOException {
          name = in.readUTF();
          age = in.readInt();
      }
  
      /**
       * 读取数据
       *
       * @param out
       * @throws IOException
       */
      @Override
      public void write(DataOutput out) throws IOException {
          out.writeUTF(name);
          out.writeInt(age);
      }
  
      /**
       * 比较方法
       *
       * @param o
       * @return
       */
      @Override
      public int compareTo(User o) {
          int res1 = Integer.compare(this.age,o.age);
          int res2 = this.name.compareTo(o.name);
  
          // 先比较年龄,在比较姓名
          return res1 != 0 ? res1 : - res2;
      }
  
      @Override
      public int hashCode() {
          return name.hashCode() + age;
      }
  
      @Override
      public boolean equals(Object obj) {
  
          if( !(obj instanceof User)) {
              return false;
          }
  
          User o = (User) obj;
          return this.name == o.name && this.age == o.age;
      }
  
      /**
       * 输出值
       *
       * @return
       */
      @Override
      public String toString() {
          return name + "\t" +  age;
      }
  
  }

用起来也比较简单,结果是先按照年龄排序,年龄相同再按照姓名排序,直接上代码

  
  package com.practice.hadoop.mapreduce;
  
  import java.io.IOException;
  
  import com.practice.hadoop.pojo.User;
  import com.practice.hadoop.utils.FileOperate;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.InputFormat;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  /**
   * User 排序
   *
   * @author liufeifei
   * @date 2018/05/20
   */
  public class UserScore {
  
      public static class UserScoreMap extends Mapper<LongWritable,Text,User,IntWritable> {
  
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String[] arr = value.toString().split(",");
              User user = new User();
              user.setName(arr[0]);
              user.setAge(Integer.parseInt(arr[1]));
              context.write(user,new IntWritable(Integer.parseInt(arr[3])));
          }
      }
  
      public static class UserScoreReducer extends Reducer<User,IntWritable,User,IntWritable> {
          @Override
          protected void reduce(User key, Iterable<IntWritable> values, Context context)
              throws IOException, InterruptedException {
              int sum = 0;
              int cnt = 0;
              for(IntWritable value: values) {
                  sum = sum + value.get();
                  cnt ++;
              }
              context.write(key, new IntWritable(sum / cnt));
          }
      }
  
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
          Configuration conf = new Configuration();
          conf.set("mapreduce.output.textoutputformat.separator",",");
  
          Job job = Job.getInstance(conf,"UserScore");
  
          job.setJarByClass(UserScore.class);
          job.setMapperClass(UserScoreMap.class);
          job.setReducerClass(UserScoreReducer.class);
  
          job.setOutputKeyClass(User.class);
          job.setOutputValueClass(IntWritable.class);
  
          String input = "./src/main/resources/userscore_input";
          String output = "./src/main/resources/userscore_output";
          FileOperate.deleteDir(output);
  
          FileInputFormat.addInputPath(job,new Path(input));
          FileOutputFormat.setOutputPath(job, new Path(output));
  
          job.waitForCompletion(true);
      }
  }

至此,mapreduce介绍告一段落

参考书籍<hadoop实战>

hadoop官网: http://hadoop.apache.org/docs/r2.7.6/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

相关标签: mapreduce