欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据Hadoop之倒排索引,求平均值

程序员文章站 2022-04-28 18:10:23
...

一.题目
倒排索引
数据准备
file_1
Welcome to MapReduce World

file_2
MapReduce is simple

file_3
MapReduce is powerful and simple

file_4
Hello MapReduce and Bye MapReduce

需求
实现输出某个单词在每个文件中出现的次数。输出如下格式:
具体单词 file_1:出现次数, file_2:出现次数, file_3:出现次数, file_4:出现次数
示例:
is file_1:0, file_2:1, file_3:1, file_4:0

计算平均成绩
数据准备
subject1
a|李一|88
a|王二|26
a|张三|99
a|刘四|58
a|陈五|45
a|杨六|66
a|赵七|78
a|黄八|100
a|周九|62
a|吴十|12

sbuject2
b|李一|36
b|王二|66
b|张三|86
b|刘四|56
b|陈五|43
b|杨六|86
b|赵七|99
b|黄八|80
b|周九|70
b|吴十|33

subject3
c|李一|83
c|王二|36
c|张三|92
c|刘四|58
c|陈五|75
c|杨六|66
c|赵七|63
c|黄八|60
c|周九|62
c|吴十|72

数据说明:
三门subject的数据,数据用 | 分割,第一列代表科目,第二列是名字,第三列是成绩。

需求
输出每个人的总分以及平均分。
输出示例:
刘四 a:58 b:56 c:58 Total:172 Avg:75
吴十 a:12 b:72 c:33 Total:117 Avg:39

二.实验代码
1.倒排索引

package Hadoop1;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {
    

    public static class Map extends Mapper<Object, Text, Text, Text> {

        private Text keyInfo = new Text(); // 存储单词和URL组合
        private Text valueInfo = new Text(); // 存储词频
        private FileSplit split; // 存储Split对象

        // 实现map函数
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // 获得<key,value>对所属的FileSplit对象
        	//把词频作为值,把单词和URI组成key值
            split = (FileSplit) context.getInputSplit();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
            	int splitIndex = split.getPath().toString().indexOf("file");
                keyInfo.set(itr.nextToken() + ":" + split.getPath().toString().substring(splitIndex));
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
        }
    }

    public static class Combine extends Reducer<Text, Text, Text, Text> {
        private Text info = new Text();

        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 统计词频
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }

            int splitIndex = key.toString().indexOf(":");
            // 重新设置value值由URL和词频组成
            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
            // 重新设置key值为单词
            key.set(key.toString().substring(0, splitIndex));
            context.write(key, info);
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();

        // 实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 生成文档列表
            String fileList = new String();
            for (Text value : values) {
            	fileList += value.toString() + ";";
            }
            result.set(fileList);

            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "Inverted Index");
        job.setJarByClass(InvertedIndex.class);

        // 设置Map、Combine和Reduce处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Combine.class);
        job.setReducerClass(Reduce.class);
        // 设置Map输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 设置Reduce输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

大数据Hadoop之倒排索引,求平均值

2.平均值

package Hadoop2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class AverageCore {

    public static class Map extends Mapper<Object, Text, Text, Text> {
        private Text keyInfo = new Text();//keyInfo 存储单词和URL组合
        private Text valueInfo = new Text();//valueInfo 存储词频
        private FileSplit split;//split 存储Split对象

        /// 实现map函数
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // 获得<key,value>对所属的FileSplit对象
            split = (FileSplit) context.getInputSplit();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String str[] = itr.nextToken().split("\\|");
                System.out.println("str1 = "+str[1]+"str[2] = "+str[2]);
                keyInfo.set(str[1]);
                valueInfo.set(str[0]+":"+str[2]);
                context.write(keyInfo, valueInfo);
            }
        }
    }

    public static class Combine extends Reducer<Text, Text, Text, Text> {
        private Text info = new Text();

        //实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            StringBuilder str = new StringBuilder();
            for (Text value : values) {
                str.append(value.toString());
            }
            info.set(str.toString());
            key.set(key);
            context.write(key, info);
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        //实现reduce函数
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //统计总分
            int sum = 0;
            StringBuilder sb = new StringBuilder(key.toString()+"   ");
            for (Text value : values) {
                String str = value.toString();
                sum += Integer.parseInt(str.split(":")[1]);
                sb.append(str+" ");
            }
            result.set("Total:"+sum+" Avg:"+sum/3);
            // 重新设置key值为单词
            key.set(sb.toString());
            context.write(key, result);
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Inverted Index");
        job.setJarByClass(AverageCore.class);
        // 设置Map、Combine和Reduce处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Combine.class);
        job.setReducerClass(Reduce.class);
        // 设置Map输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 设置Reduce输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

大数据Hadoop之倒排索引,求平均值
三.实验总结
1.在外部eclipse中写好代码,将hadoop依赖手动导入项目中,新建lib文件夹,复制进去。
大数据Hadoop之倒排索引,求平均值
2.将完成的项目打包为jar包,共享到虚拟机中,建input/input1文件夹,将数据放入其中,用上面的Linux命令运行,运行结束,会生成output/output1文件夹,结果存储其中。
大数据Hadoop之倒排索引,求平均值
大数据Hadoop之倒排索引,求平均值
大数据Hadoop之倒排索引,求平均值