欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【基于MapReduce的成绩分析系统】——计算每门课程学生的平均成绩,并将平均成绩从高到低输出

程序员文章站 2022-03-24 13:49:10
...

本次用 MapReduce 计算每门课程学生的平均成绩,并将平均成绩从高到低输出是我们《大数据基础》课程的期末大作业的功能需求之一。

临近期末,在这里记录一下自己的学习收获,希望大家在浏览的过程中有所收获。

由于能力有限,博客中难免会存在一些不足,有纰漏之处恳请大佬指正,不胜感激… …????

博客主页:爱跑步的mango ????

一、数据及字段说明

首先准备初始的数据,mark2.txt文件。(把该文件上传到伪分布式HDFS下,处理的输入文件路径对应为:hdfs://localhost:9000/user/hadoop/markinput2),相关数据内容如下:

mucic,85,54,86,85,80,99,85,75,48,88,85
english,57,85,48,85,85,96,85,85,75
chinese,75,76,85,85,42,81
math,76,85,48,90,54,85,89
PE,87,34,96,87,81,90,99,65,78,68,89
  • 以上是处理的数据,该数据每行数据字段个数不固定
    第一个是课程名称,总共五个课程,包括music、chinese、english、math和PE。
    后面全部是该课程每次考试的分数。

二、过程分析及解题思路

  • 需求: 计算每门课程学生的平均成绩,并将平均成绩从高到低输出
    返回结果格式举例:PE 79.5
  • 解题思路: 计算每门课程学生的平均成绩,并将平均成绩从高到低输出,先按照课程分组,然后mapper阶段准备好每门课程的平均分。map方法将课程名称course和计算的平均分作为key输出。reduce方法去拉取数据的时候会进行归并排序,这里用自定义WCsort类实现WritableComparable接口的compareTo方法定义的逻辑来将平均分进行倒序排列。然后reducer阶段直接输出即可。
  • 关键: mapper阶段和reducer阶段的输入和输出是什么?
  • 对于mapper阶段,map方法输出的key-value分别是
    key: WCsort
    value: NullWritable
  • 对于reducer阶段,reduce方法输出的key-value分别是
    key: WCsort
    values: NullWritable

三、具体代码实现

  • 自定义数据类型:WCsort 的代码
package Mapreduce.mark2;
import org.apache.hadoop.io.WritableComparable;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
public class WCsort implements WritableComparable<WCsort> {
 
    private String course;
    private double score;
 
    public WCsort(String course, double score) {
        super();
        this.course = course;
        this.score = score;
    }
 
    public WCsort() {
    }
 
    public String getCourse() {
        return course;
    }
 
    public void setCourse(String course) {
        this.course = course;
    }
 
    public double getScore() {
        return score;
    }
 
    public void setScore(double score) {
        this.score = score;
    }
 
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(course);
        out.writeDouble(score);
    }
 
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        this.course = in.readUTF();
        this.score = in.readDouble();
    }
    // 排序规则
     // compareTo方法充当排序用
    @Override
    public int compareTo(WCsort o) {
    	 	// 倒序排列,从大到小
    	 	return this.score > o.getScore() ? -1 : 1;
    }
    @Override
    public String toString() {
        return course + "\t" + score;
    }
}
  • MapReduce的代码
package Mapreduce.mark2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class averagesort2 {
	  public static void main(String[] args) throws Exception {
		  if (args.length<2) {
	            System.out.printf("Usage:%s <input> <output>\n");
	        }
		  //创建一个Configuration实体类对象
	        Configuration conf = new Configuration();
	        Job job = Job.getInstance(conf, "course averagesort");
	     // 指定我这个job所在的jar包
	        job.setJarByClass(averagesort2.class);
	        //指定mapper类和reducer类 等各种其他业务逻辑组件
	        job.setMapperClass(averagesort2.averagesortMapper.class);
	        job.setReducerClass(averagesort2.MyReducer.class);
	         //设置我们的业务逻辑 Reducer 类的输入key 和 value 的数据类型
	        job.setMapOutputKeyClass(WCsort.class);
	        job.setMapOutputValueClass(NullWritable.class);
	        // 指定reducetask的输出类型
	        job.setOutputKeyClass(WCsort.class);
	        job.setOutputValueClass(NullWritable.class);
	        
	        Path inputPath = new Path(args[0]);
	        Path outPutPath = new Path(args[1]);
	        FileSystem fs = FileSystem.get(conf);
	        if (fs.exists(outPutPath)) fs.delete(outPutPath,true);
	        FileInputFormat.setInputPaths(job,inputPath);
	        FileOutputFormat.setOutputPath(job,outPutPath);
	        
	       //指定处理的输入路径
	        //FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/markinput2"));
			// 指定处理完成之后的结果所保存的位置
			//FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/averagesortoutput2"));
			//最后向yarn集群提交这个job任务
			boolean waitForCompletion = job.waitForCompletion(true);
	        //System.exit(waitForCompletion ? 0 : 1);
			System.out.println(job.waitForCompletion(true)?1:0);
	    }
	  //Mapper组件
	  //输入的key  输入的value
	  //输出的key  输入的value
	  private static class averagesortMapper extends Mapper<LongWritable, Text, WCsort, NullWritable> {	  
		  //map  方法的生命周期:  框架每传一行数据就被调用一次
		    //key :  这一行的起始点在文件中的偏移量
		    //value : 这一行的内容
	        WCsort keyOut = new WCsort();
	        @Override
	        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
	            String[] splits = value.toString().split(",");// 将Text类型的value转换成 string,将这一行用 "," 切分出各个单词
	            String course = splits[0];
	            int sum = 0;
	            int num = 0;
	            for(int i=1; i<splits.length; i++){
	                sum += Integer.valueOf(splits[i]);
	                num ++;
	            }
	            double avgScore = Math.round(sum * 1D / num * 10) / 10D;
	            keyOut.setCourse(course);
	            keyOut.setScore(avgScore);	 
	            context.write(keyOut, NullWritable.get());
	        }
	    }
	  private static class MyReducer extends Reducer<WCsort, NullWritable, WCsort, NullWritable> {
	        @Override
	        protected void reduce(WCsort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
	        			//直接输出
	        	for (NullWritable nullWritable : values) {
					context.write(key, nullWritable);
					}
	        	}
	        }
	  }

四、程序运行结果

  • 程序处理完成之后的结果所保存的位置:hdfs://localhost:9000/user/hadoop/averagesortoutput2(这是自己定义的位置)。
  • 程序的执行结果
    【基于MapReduce的成绩分析系统】——计算每门课程学生的平均成绩,并将平均成绩从高到低输出
    (可见,已经将我们的各科成绩按平均分从大到小排序了。)

本次的分享就到这里就结束了,这里要感谢这位大佬的这篇MapReduce练习学生成绩相关题目的分享,受到了启发。以上有任何错误,希望得到大佬们的指正,互相学习!✨

???? 基于MapReduce的成绩分析系统如果想看更多功能实现的学习体验,欢迎访问:

【基于MapReduce的成绩分析系统】——计算每门课程的平均成绩、最高成绩、最低成绩

【基于MapReduce的成绩分析系统】——查找(输入一个学生的姓名,输出该生姓名以及其参加考试的课程和成绩)

【基于MapReduce的成绩分析系统】——求该成绩表每门课程当中出现了相同分数的分数,出现的姓名以及该相同分数的人数


相关标签: MapReduce学习