Hadoop学习之路(7)MapReduce自定义排序
程序员文章站
2022-08-02 17:54:32
本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000 MapReduce中,根据key进行分区、排序、分组 MapReduce会按照 ......
本文测试文本:
tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000
mapreduce中,根据key进行分区、排序、分组
mapreduce会按照基本类型对应的key进行排序,如int类型的intwritable,long类型的longwritable,text类型,默认升序排序
为什么要自定义排序规则?现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序
以text类型为例:
text类实现了writablecomparable
接口,并且有write()
、readfields()
和compare()
方法readfields()
方法:用来反序列化操作write()
方法:用来序列化操作
所以要想自定义类型用来排序需要有以上的方法
自定义类代码:
import org.apache.hadoop.io.writablecomparable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; public class person implements writablecomparable<person> { private string name; private int age; private int salary; public person() { } public person(string name, int age, int salary) { //super(); this.name = name; this.age = age; this.salary = salary; } public string getname() { return name; } public void setname(string name) { this.name = name; } public int getage() { return age; } public void setage(int age) { this.age = age; } public int getsalary() { return salary; } public void setsalary(int salary) { this.salary = salary; } @override public string tostring() { return this.salary + " " + this.age + " " + this.name; } //先比较salary,高的排序在前;若相同,age小的在前 public int compareto(person o) { int compareresult1= this.salary - o.salary; if(compareresult1 != 0) { return -compareresult1; } else { return this.age - o.age; } } //序列化,将newkey转化成使用流传送的二进制 public void write(dataoutput dataoutput) throws ioexception { dataoutput.writeutf(name); dataoutput.writeint(age); dataoutput.writeint(salary); } //使用in读字段的顺序,要与write方法中写的顺序保持一致 public void readfields(datainput datainput) throws ioexception { //read string this.name = datainput.readutf(); this.age = datainput.readint(); this.salary = datainput.readint(); } }
mapreuduce程序:
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import java.io.ioexception; import java.net.uri; public class secondarysort { public static void main(string[] args) throws exception { system.setproperty("hadoop_user_name","hadoop2.7"); configuration configuration = new configuration(); //设置本地运行的mapreduce程序 jar包 configuration.set("mapreduce.job.jar","c:\\users\\tanglei1\\ideaprojects\\hadooptang\\target\\com.kaikeba.hadoop-1.0-snapshot.jar"); job job = job.getinstance(configuration, secondarysort.class.getsimplename()); filesystem filesystem = filesystem.get(uri.create(args[1]), configuration); if (filesystem.exists(new path(args[1]))) { filesystem.delete(new path(args[1]), true); } fileinputformat.setinputpaths(job, new path(args[0])); job.setmapperclass(mymap.class); job.setmapoutputkeyclass(person.class); job.setmapoutputvalueclass(nullwritable.class); //设置reduce的个数 job.setnumreducetasks(1); job.setreducerclass(myreduce.class); job.setoutputkeyclass(person.class); job.setoutputvalueclass(nullwritable.class); fileoutputformat.setoutputpath(job, new path(args[1])); job.waitforcompletion(true); } public static class mymap extends mapper<longwritable, text, person, nullwritable> { //longwritable:输入参数键类型,text:输入参数值类型 //persion:输出参数键类型,nullwritable:输出参数值类型 @override //map的输出值是键值对<k,v>,nullwritable说关心v的值 protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //longwritable key:输入参数键值对的键,text value:输入参数键值对的值 //获得一行数据,输入参数的键(距首行的位置),hadoop读取数据的时候逐行读取文本 //fields:代表着文本一行的的数据 string[] fields = value.tostring().split(" "); // 本列中文本一行数据:nancy 22 8000 string name = fields[0]; //字符串转换成int int age = integer.parseint(fields[1]); int salary = integer.parseint(fields[2]); //在自定义类中进行比较 person person = new person(name, age, salary); context.write(person, nullwritable.get()); } } public static class myreduce extends reducer<person, nullwritable, person, nullwritable> { @override protected void reduce(person key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception { context.write(key, nullwritable.get()); } } }
运行结果:
40000 30 socrates 29000 39 white 11000 19 green 10000 19 stone 9000 22 ketty 8000 20 tom 8000 22 nancy