欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用IDEA编写的第一个MapReduce程序

程序员文章站 2022-03-22 23:22:15
...

一. 创建MAVEN程序

使用IDEA编写的第一个MapReduce程序

二.添加项目依赖包

创建完成后,要给项目添加相关依赖包,否则会出错。
点击Idea的File菜单,然后点击“Project Structure”菜单,如下图所示:

使用IDEA编写的第一个MapReduce程序

导入jar包:
选择hadoop的包,我用得是hadoop3.*。把下面的依赖包都加入到工程中,否则会出现某个类找不到的错误。

(1)”/usr/local/hadoop/share/hadoop/common”目录下的hadoop-common-3.2.1.jar和haoop-nfs-3.2.1.jar;

(2)/usr/local/hadoop/share/hadoop/common/lib”目录下的所有JAR包;

(3)“/usr/local/hadoop/share/hadoop/hdfs”目录下的haoop-hdfs-3.2.1.jar和haoop-hdfs-nfs-3.2.1.jar;

(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目录下的所有JAR包。

可以再加上一个haoop-hdfs-client-3.2.1.jar

如果之后项目import报错,则再配置一下pom.xml文件中的依赖项:

使用IDEA编写的第一个MapReduce程序

三.编写程序

需要处理的文件是一个日志文件,我需要从中提取出ID、邮件发送状态、邮件发送地址,这三个信息。日志文件截图如下:

使用IDEA编写的第一个MapReduce程序

基本思路类似于wordcount程序。ID、邮件发送状态、邮件发送地址,观察发现这三个信息存在于starting和delivery开头的行数据中,所以对行数据按空格切片,接下来在mapper中取所需内容即可,然后在reducer中对相同key值的数据的value进行拼接,基本上一个简单的程序就完成了。

3.1先编写mapper类:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogAnalysisMap extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException{

        String line = value.toString();//读取一行数据

        String[] str = line.split(" ");//因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中

        String[] res = new String[2];

        if (str[1].equals("starting")) {
            String[] id = str[3].split(":");
            res[0] = id[0];
            res[1] = str[8];
            context.write(new Text(res[0]),new Text(res[1]));
        }else if (str[1].equals("delivery")) {
            String[] id = str[2].split(":");
            res[0] = id[0];
            String[] status = str[3].split(":");
            res[1] = status[0];
            context.write(new Text(res[0]),new Text(res[1]));
        }
    }
}

3.2编写Reduce类:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogAnalysisReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException{
        String result = "";
        for(Text value: values) {
            if(value.find("@") == -1){
                result =  value + "\t" + result;
            }else {
                result =  result + "\t" + value;
            }
        }
        context.write(key,new Text(result));
    }
}

3.3编写入口类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.mapper.LogAnalysisMap;
import org.example.reducer.LogAnalysisReducer;

public class LogAnalysisRunner {
    public static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();

        Job wcJob = Job.getInstance(conf);

        wcJob.setJarByClass(LogAnalysisRunner.class);

        //本job使用的mapper和reducer类
        wcJob.setMapperClass(LogAnalysisMap.class);
        wcJob.setReducerClass(LogAnalysisReducer.class);

        // 指定reduce的输出数据kv类型
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(Text.class);

        // 指定mapper的输出数据kv类型
        wcJob.setMapOutputKeyClass(Text.class);
        wcJob.setMapOutputValueClass(Text.class);

        //指定要处理的输入数据存放路径
        FileInputFormat.setInputPaths(wcJob, new Path("指定要处理的输入数据存放路径"));

        //指定处理结果的输出数据存放路径
        FileOutputFormat.setOutputPath(wcJob, new Path("指定处理结果的输出数据存放路径"));

        //将job提交给集群运行
        wcJob.waitForCompletion(true);
    }
}

编写完成后可直接在IDEA中运行入口类,生成文件会存在于入口类中指定的路径下。

到此为止,我们的程序就写完啦!输出文件中的数据如下所示:使用IDEA编写的第一个MapReduce程序

或者也可以修改一下入口类,把整个程序打成jar包,上传到集群环境中运行。这个环节与上一篇中对wordcount程序的处理差不多,就不赘述啦。