欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop——使用IDEA开发WordCount on Yarn

程序员文章站 2022-04-29 10:57:21
...

首先将之前配置好的mapred-site.xml和yarn-site.xml拷贝进resources文件夹,并在pom.xml中加入

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.5</version>
        </dependency>

具体代码及解释如下:

package com.hadoop.mapreduce.wc;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * @author Song X
 * @date 2020/02/29
 */
public class MyWordCount {


    /**
     * 用于解析配置文件
     */
    private static Configuration configuration = null;

    /**
     * 用于提交计算任务给集群
     */
    private static Job job = null;

    public static void init() throws Exception {
        configuration = new Configuration(true);
        job = Job.getInstance(configuration);
        job.setJarByClass(MyWordCount.class);
        //起一个job名字
        job.setJobName("Test_WordCount");

        //指定输入和输出数据路径
        Path inputPath = new Path("/data/wc/input");
        TextInputFormat.addInputPath(job, inputPath);

        //写这句话时要保证这个目录不存在,不然会报错
        //为防止我们事先不知道存不存在,可以加下面的代码,若存在则先删除,注意正式开发时,小心使用这个代码,防止误删数据!!!
        //if(outputPath.getFileSystem(configuration).exists(outputPath)){
        //      outputPath.getFileSystem(configuration).delete(outputPath, true);
        //}
        Path outputPath = new Path("/data/wc/output");
        TextOutputFormat.setOutputPath(job, outputPath);

    }

    private static void compute() throws Exception {
        //设置Map计算类
        job.setMapperClass(MyMapper.class);

        //需要在这里告诉Reduce端传来的key-value都是什么类型,方便Reduce端做序列化和反序列化
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置Reduce计算类
        job.setReducerClass(MyReduce.class);

        //提交任务,并等待完成
        job.waitForCompletion(true);
    }


    public static void main(String[] args) throws Exception{
        init();
        compute();
    }

}
package com.hadoop.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * 不管在map还是reduce,都会牵扯到Java的序列化和反序列化
 * Text和IntWritable就是Hadoop对String和int的序列化包装类型
 * 如果是自己开发的的类型,必须实现序列化、反序列化接口
 *
 * 在map和reduce中,依照所学,排序是很重要的,而排序实际上就是比较,所以自己开发的类型也需要实现比较器接口,定义自己的比较规则
 *
 * @author Frank
 * @date 2020/02/29
 */
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

    /**
     * 比如这样的数据hello hadoop 1
     *              hello hadoop 2
     * 经过map完的记过应该是hello 1
     *                     hello 1
     *                     hadoop 1
     *                     hadoop 1
     *                     1 1
     *                     2 1
     * 所以你会发现,key永远在变,而value是不变的(正是因为value不变,所以定义为static final对性能是最优的)
     */


    /**
     * 这就是Map输出类型key-value的value
     */
    private static final IntWritable one = new IntWritable(1);

    /**
     * 这就是输出类型key-value的key
     */
    private Text word = new Text();

    /**
     * hello hadoop 1
     * hello hadoop 2
     * @param key 是每一行字符串第一个字符面向源文件的偏移量,比如上面第一行的h偏移量是0,第二行的h偏移量是15(换行符也算,第一个h偏移量是0)
     * @param value 字符串本身,比如hello hadoop 1。所以我们其实只是要处理value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //java的一个工具类,使用正则的方式对字符串进行空白切割,得到一个包含所有单词的迭代器
        StringTokenizer itr = new StringTokenizer(value.toString());

        while(itr.hasMoreTokens()){
            //将单词赋给word
            word.set(itr.nextToken());
            //将单词和1合并输出
            context.write(word, one);
        }
    }

}
package com.hadoop.mapreduce.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * Reducer的输入类型很明显就是Map的输出类型,Reduce的输出类型由我们自己指定
 * @author
 * @date 2020/02/29
 */
public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 回想ReduceTask,是相同的key为一组,一组数据调用一次reduce方法
     * 所以reduce收到的数据应该长这样
     *                      hello 1
     *                      hello 1
     *                      hello 1
     *                        ...
     *
     */

    /**
     * 最后key-value结果的value
     */
    private IntWritable result = new IntWritable();

    /**
     *
     * @param key
     * @param values 我们会得到一个value的迭代器
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
        //业务代码其实很简单,就是对同一个key做value的累加
        int sum = 0;
        for(IntWritable value : values){
            sum += value.get();
        }

        result.set(sum);
        context.write(key, result);
    }
}

写好代码后点击package将程序打包成jar包i
Hadoop——使用IDEA开发WordCount on Yarn
然后会在目录中生成对应的jar包Hadoop——使用IDEA开发WordCount on Yarn
将其拷贝到节点上,使用hadoop jar com.hadoop.mapreduce.wc.MyWordCount进行运行
注意:第三个参数是程序的入口类,第四个参数和第五个参数分别是数据的input path和output path,我在程序中写死了,这里就不用传了

Hadoop入门系列到此就更新结束了