欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop 统计单词个数

程序员文章站 2024-03-12 13:23:08
...



创建项目

按下图所示在resources目录下创建文件夹input,在其中提供文件wc.txt:
Hadoop 统计单词个数
注意:不要创建output目录,系统会自动创建。否则会报目录已存在的错。wc.txt文件的内容:

hello hadoop and hello java
I love java
Liang He Cai

Mapper类

public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> { 
  @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();//读取一行数据
        StringTokenizer st = new StringTokenizer(line);//使用空格分隔
        while (st.hasMoreTokens()) {
            String word = st.nextToken();//单词
            context.write(new Text(word), new IntWritable(1));//单词---> 1
        }
    }
}

Mapper的4个类型分别是:输入key的类型、输入的value类型、输出key的类型、输出value的类型。
mapper类的map()方法,MapReduce框架每读到一行数据,就会调用一次这个map方法。map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对儿。map()方法的三个参数key、value、context分别代表map输入的键、map输入的值、map执行的上下文。

MapReduce框架将读到的一行数据侯以key value形式传进来,key默认情况下是MapReduce所读到一行文本的起始偏移量(Long类型),value默认情况下是MapReduce所读到的一行的数据内容(String类型)。
输出也是key value形式的,key是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。统计单词个数案例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。

Reducer类

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;//汇总
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));//单词--->数量
    }
}

上面代码定义了一个Reducer类和一个reduce方法。
Reducer<Text, IntWritable, Text, IntWritable>4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是:(good,1)(good,1)(good,1)(good,1)
当传给reduce方法时,就变为:

key:good
value:(1,1,1,1)

所以,reduce方法接收到的是同一个key的一组value。

运行部署

Driver编程套路:

  1. 获取配置信息,获取job对象实例
  2. 指定本程序的jar包所在的本地路径
  3. 关联Mapper/Reducer业务类
  4. 指定Mapper输出数据的KV类型
  5. 指定最终输出数据的KV类型
  6. 指定job的输入原始文件所在目录
  7. 指定job的输出结果所在目录
  8. 提交作业

本地模式部署

本地运行模式下:mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行,而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上。
实现本地运行,不要带集群的配置文件(本质是mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数)。
本地模式非常便于进行业务逻辑的debug,只要在IDE中打断点即可。在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量,将hadoop的lib和bin目录替换成windows平台编译的版本

Configuration conf = new Configuration();
conf.set("mapreduce.framework.name","local");
//本地模式运行mr,输入输出的数据可以在本地,也可以在hdfs
//conf.set("fs.defaultFS","hdfs://hcmaster:9000");
conf.set("fs.defaultFS","file:///");

具体代码:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();//创建配置对象
        Job job = Job.getInstance(cfg, "word count");//创建job对象
        
        job.setJarByClass(WordCount.class);//创建运行job的类
        
        job.setMapperClass(TokenizerMapper.class);//设置mapper类
        job.setReducerClass(IntSumReducer.class); //设置Reduce类
        
        job.setOutputKeyClass(Text.class);//设置Reduce输出的key
        job.setOutputValueClass(IntWritable.class);//设置Reduce输出的value

        FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入路径

        Path op1 = new Path(args[1]);
        FileSystem fs = FileSystem.get(cfg);

        if (fs.exists(op1)) {
            fs.delete(op1, true);
            System.out.println("存在此输出路径,已删除!!!");
        }

        FileOutputFormat.setOutputPath(job, op1);//设置输出路径
        boolean b = job.waitForCompletion(true); //提交job
        System.exit(b ? 0 : 1);
    }
}

上面代码args[0]的值为src/main/resources/input/,args[1]的值为src/main/resources/output,可以按照下图所示进行配置:
Hadoop 统计单词个数
运行程序,在resources目录下生成output文件夹,其中part-r-0000是任务结果,_SUCCESS是任务成功标志。打开part-r-00000,结果如下图所示:
Hadoop 统计单词个数

将Mapper中间结果作为最终结果输出

在src/main/resources/input/目录下面准备三个写有单词的文本文件,

public static void main(String[] args) throws Exception {
    args=new String[2];
    args[0]="src/main/resources/input/";
    args[1]="src/main/resources/output";

    Configuration cfg = new Configuration();

    Job job = Job.getInstance(cfg);

    job.setJobName( "word count");//作业名称
    job.setJarByClass(WordCountTest2.class);

    job.setInputFormatClass(TextInputFormat.class);//设置输入格式
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapperClass(WordCountMapper.class);

    job.setNumReduceTasks(0); // 设置Reducer的个数为0

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
小知识:

当job.setNumReduceTasks(0)时,即没有reduce阶段,此时唯一影响的就是map结果的输出方式。

  • 如果有reduce阶段,map的结果被flush到硬盘,作为reduce的输入;reduce的结果将被OutputFormat的RecordWriter写到指定的地方(setOutputPath),作为整个程序的输出。
  • 如果没有reduce阶段,map的结果将直接被OutputFormat的RecordWriter写到指定的地方(setOutputPath),作为整个程序的输出。

OutputFormat可以是普通的FileOutputFormat,也可以是一个空的OutputFormat如NullOutputFormat。所以有无reduce和OutputFormat的多样性将组合出现以下情形(这个组合其实没什么意义,只是为了更加清楚而已)

  • 有reduce
    reduce的结果不需要输出到文件,如reduce里直接将结果插入HBase,此时可以采用NullOutputFormat,当然就不需要setOutputPath。reduce的结果需要输出到文件,如采用FileOutputFormat,需要setOutputPath。
  • 无reduce
    map的结果需要不输出到文件,如map里直接将结果插入HBase,此时可以采用NullOutputFormat,当然就不需要setOutputPath。map的结果需要输出到文件,如采用FileOutputFormat,需要setOutputPath。

有无reduce决定map结果的输出方式。有reduce时reduce的结果作为整个程序的输出;无reduce时,map的结果作为整个程序的输出。如果能在map阶段解决的问题尽量不要丢给直接输出的reduce。如NullOutputFormat层面上OutputFormat的不需要指定OutputPath;其他如FileOutputFormat需要指定,不然报错。
运行程序,结果如下图所示:
Hadoop 统计单词个数

集群模式部署

在资源路径下面添加core-site.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://hcmaster:9000</value>
	</property>
	<property>
		<name>hadoop.tmp.dir</name>
		<value>/usr/local/hadoop/hadoop-3.1.2/data/tmp/</value>
	</property>
</configuration>
修改测试代码如下所示:
public class WordCountTest extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration cfg = new Configuration();   //获取我们的配置

        Job job = Job.getInstance(cfg, this.getClass().getSimpleName());
        job.setJarByClass(WordCountTest.class);

        //设置map与需要设置的内容类 + 输出key与value
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置reduce
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置input与output
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path op1 = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, op1);

        FileSystem fs = FileSystem.get(cfg);
        if (fs.exists(op1)) {
            fs.delete(op1, true);
            System.out.println("存在此输出路径,已删除!!!");
        }

        //将job交给Yarn
        boolean issucess = job.waitForCompletion(true);
        return issucess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
       //需要在resources下面提供core-site.xml文件
        args = new String[]{ "test/", "test/output/"};
        int status = new WordCountTest().run(args);   //跑我们的任务
        System.exit(status);
    }
}

在pom.xml中添加如下语句:

<packaging>jar</packaging>
打包:

Hadoop 统计单词个数
找到打包好的jar包,并将上传到Linux中,然后运行

Hadoop 统计单词个数