欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop自定义排序,分组排序

程序员文章站 2022-07-14 16:37:19
...

在一堆数据中,名字后表示各科成绩
1303,3012,肖芷青,118,78,136,55,64,58,81,83,48
1303,3013,曹娴瑶,117,106,130,46,94,70,75,77,56
1303,3014,黄嘉炜,104,128,105,56,92,89,65,84,72
1303,3015,张旭,103,124,129,67,88,81,78,71,78
1303,3016,张峻闻,113,103,124,51,74,60,78,65,76
1303,3017,姚涵,97,141,42,95,90,92,38,55,64
1303,3018,宋洋,114,98,132,26,46,57,62,67,48
1303,3019,王天灿,113,119,137,67,86,79,78,88,48
1303,3020,张世川,106,67,84,36,62,54,81,77,54
1304,4052,吴熔宇,102,105,111,9,57,45,56,66,46
1304,4053,雷磊,111,132,123,88,72,74,73,73,80
1304,4054,谭凌云,97,108,105,15,41,65,72,73,54
1304,4055,游槟羽,119,104,120,70,72,0,72,68,60
1304,4056,赵培廷,97,88,79,47,34,41,41,45,44
1304,4057,吴锦添,75,51,42,27,28,34,39,25,34
1305,5001,田景锋,90,84,111,29,58,77,76,70,74
1305,5002,彭圣颖,109,96,126,27,62,69,74,79,72
1305,5003,何亚男,112,58,120,40,40,57,78,57,56
1305,5004,林紫琪,107,82,119,36,80,55,68,50,54
1305,5005,杨珽杰,111,119,97,69,64,71,76,83,76
1305,5006,晏峰,96,88,99,47,72,57,71,72,60
1305,5007,聂鑫,89,97,105,34,24,41,78,64,70
1305,5008,李彦,115,116,96,55,60,60,78,81,68
1305,5009,冯志超,111,126,129,79,78,83,86,82,58
1305,5010,林硕,94,92,91,52,74,79,70,71,70
1305,5011,林清怡,103,65,97,20,28,52,71,74,49
1305,5012,成天,103,94,97,25,46,66,50,44,56
1305,5013,蒋婉,109,77,91,20,32,49,74,84,58
1305,5014,陈智杰,105,102,111,30,46,46,78,48,32
1305,5015,曹瑜洁,92,38,46,22,28,40,52,56,44
1305,5016,彭紫瑛,109,31,76,19,16,33,45,45,38
1305,5017,曹静文,109,77,86,33,58,43,64,51,36
1305,5019,陈汇,84,92,83,18,78,47,62,47,70
1305,5032,李杨,102,84,117,48,68,52,73,78,76
1305,5033,胡启利,86,67,38,35,20,34,46,36,34
1305,5034,谢靓,104,43,96,9,26,33,65,60,58
1305,5035,刘睿,95,69,107,17,44,51,76,74,38
1305,5036,钟帅,98,61,61,40,67,43,62,64,42
1305,5037,刘松,100,70,63,23,64,43,61,31,40
1306,6024,杨宗灏,101,90,72,40,92,78,75,52,60
1306,6025,龚珈明,95,94,41,44,64,70,69,71,46
1306,6026,余婕,97,58,85,17,52,45,64,58,38
1307,7001,邓思维,104,103,107,42,46,48,72,57,44
1307,7002,张佳琪,108,81,109,18,40,50,71,48,38
1307,7003,刘鹏,114,86,97,30,42,68,62,67,60
1307,7004,周舟,106,66,81,19,22,55,46,56,24
1307,7005,朱星宇,91,77,90,27,30,52,68,65,50
1307,7006,苏新兴,111,119,99,27,66,62,68,78,75
1307,7007,刘俊辉,99,100,109,21,52,63,66,73,32
1307,7008,周沁心,104,52,76,20,36,54,56,77,0
1307,7009,范祥,120,85,120,68,67,88,71,91,92
1307,7010,丁子建,101,78,103,38,48,74,57,75,68
1307,7011,胡文轩,103,98,73,25,18,39,63,61,60
1307,7012,刘万诚,102,99,110,24,35,50,37,43,46
1307,7013,李怡娴,74,22,54,14,8,25,29,51,24

把名字后的数字加起来得到总成绩(我放在行末),然后在班级内部进行排序
效果图

hadoop自定义排序,分组排序

思路就是利用mapreduce的shuffle阶段来实现。
shuffle阶段是按mapreduce 中的map的输出key来排序的。
key需要实现WritableComparable

比如mapreduce中LongWritable

public class LongWritable implements WritableComparable<LongWritable> {
  private long value;
  public LongWritable() {} 
  public LongWritable(long value) { set(value); }
  @Override
  public void readFields(DataInput in) throws IOException {
    value = in.readLong();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(value);
  }
  /** Compares two LongWritables. */
  @Override
  public int compareTo(LongWritable o) {
    long thisValue = this.value;
    long thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }
}

序列化(Writable)作用于map输出二进制流数据,shuffle根据流数据进行反序列化对象。
ps:序列化时必须明确写出一个不带参数的构造方法以便序列化框架对它们进行实例化,我就因为懒,认为已经有了我想要的带参数的构造方法,少写了默认构造方法,结果运行时报构造方法出错,其实回过头来看,序列化时并没有谁给他参数的具体值,它也不知道怎么给参数,所以选定了默认的构造参数,但是在java中你写了带参数的构造函数后,默认的构造参数便不可new了
Comparable便是本次的重点,shuffle阶段的排序便是需要按自定义的compareTo来实现自定义排序。

  1. 自定义类Student实现接口
//student 按班排序
public class Student implements WritableComparable<Student> {
    int classnum;
    String name;
    int sum;

    public int getClassnum() {
        return classnum;
    }

    public void setClassnum(int classnum) {
        this.classnum = classnum;
    }

    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return name+" sum:"+sum;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getSum() {
        return sum;
    }

    public void setSum(int sum) {
        this.sum = sum;
    }

    public Student(){  //默认构造方法必须的有
        System.out.println("default");
    }
    public Student(String name,int classnum, int sum) {
        System.out.println("daic  ansns");
        this.name = name;
        this.classnum =classnum;
        this.sum=sum;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        System.out.println("写入,进行序列化");
        out.writeUTF(name);
        out.writeInt(classnum);
        out.writeInt(sum);
        System.out.println("写入end,进行序列化");
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        System.out.println("读取,进行反序列化");
        name=in.readUTF();
        classnum=in.readInt();
        sum=in.readInt();
        System.out.println("读取end,进行反序列化");
    }

    @Override
    public int compareTo(Student o) {
        if(classnum!=o.getClassnum())
            return classnum>o.getClassnum()?1:-1;

        if(sum==o.sum){
            return 0;
        }
        return sum>o.sum?-1:1;   //此处进行降序排列使成绩高的排在前面
    }
}

在比较中

数字 (a,b) 意思
0 key相等
-1 前者小于后者
1 后者小于前者

shuffle阶段的是默认升序 就是先排序 -1的对应项 0 1
然后你自定义降序的时候 把大的对应 -1 小的对应 1就行

return sum>o.sum?-1:1; 

其实reduce接到的key是这样的

班级号 分数 班内名次
1305 600
1305 599
1306 700
1306 500
1307 600
1307 360

此处我在reduce阶段又犯了错误,想在结果旁标记名次

班级号 分数 班内名次
1305 600 1
1305 599 2
1306 700 1
1306 500 2
1307 600 1
1307 360 2

其实看似是写在一个代码里,其实reduce处理的是同key的结果,也就是说处理的是班上和你总成绩一样的人。
后来我放弃了,又把每个班的信息抽取出来单独排序

下面是mapreduce的代码,文件具体路径和包名你们需要自己修改

package student;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RankClass2 {
    public static int classnum = 1;

    public static class MaxMapper extends Mapper<LongWritable, Text, Student, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String valueline = value.toString().trim();
            if (valueline.length() < 1)
                return;
            String[] list = valueline.split(",");
            String classnum = list[0];
            int sum = 0;
            for (int i = 3; i <= 11; i++) {
                sum += Integer.parseInt(list[i]);
            }
            String name = list[2];
            context.write(new Student(list[2], Integer.parseInt(classnum), sum), new Text(valueline));
        }
    }

    public static class MaxReducer extends Reducer<Student, Text, NullWritable, Text> {

        @Override
        protected void reduce(Student key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text t : values)
                context.write(NullWritable.get(), new Text(t.toString()+","+key.getSum()));
        }

    }

    public static void main(String[] args)
            throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setMapperClass(MaxMapper.class);
        job.setReducerClass(MaxReducer.class);

        job.setMapOutputKeyClass(Student.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        // job.setSortComparatorClass(LongWritable.DecreasingComparator.class);

        String url = "hdfs://192.168.133.32:9000/user/SJ-PC/";
        // 删除上次的输出目录
        FileSystem fs = FileSystem.get(new URI(url), conf);
        boolean result = fs.deleteOnExit(new Path("hdfs://192.168.133.32:9000/user/SJ-PC/rankclass2/"));
        System.out.println(result);
        fs.close();

        FileInputFormat.addInputPath(job, new Path(url + "/data/*"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.133.32:9000/user/SJ-PC/rankclass2/"));
        job.waitForCompletion(true);
    }

}