欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop学习之路(7)MapReduce自定义排序

程序员文章站 2022-05-07 09:34:01
本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000 MapReduce中,根据key进行分区、排序、分组 MapReduce会按照 ......

本文测试文本:

tom 20 8000
nancy 22 8000
ketty 22 9000
stone 19 10000
green 19 11000
white 39 29000
socrates 30 40000

   mapreduce中,根据key进行分区、排序、分组
mapreduce会按照基本类型对应的key进行排序,如int类型的intwritable,long类型的longwritable,text类型,默认升序排序
   为什么要自定义排序规则?现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序
以text类型为例:
Hadoop学习之路(7)MapReduce自定义排序
Hadoop学习之路(7)MapReduce自定义排序
Hadoop学习之路(7)MapReduce自定义排序
Hadoop学习之路(7)MapReduce自定义排序
text类实现了writablecomparable接口,并且有write()readfields()compare()方法
readfields()方法:用来反序列化操作
write()方法:用来序列化操作
所以要想自定义类型用来排序需要有以上的方法
自定义类代码

import org.apache.hadoop.io.writablecomparable;
import java.io.datainput;
import java.io.dataoutput;
import java.io.ioexception;
public class person implements writablecomparable<person> {
    private string name;
    private int age;
    private int salary;
    public person() {
    }
    public person(string name, int age, int salary) {
        //super();
        this.name = name;
        this.age = age;
        this.salary = salary;
    }
    public string getname() {
        return name;
    }
    public void setname(string name) {
        this.name = name;
    }
    public int getage() {
        return age;
    }
    public void setage(int age) {
        this.age = age;
    }
    public int getsalary() {
        return salary;
    }
    public void setsalary(int salary) {
        this.salary = salary;
    }
    @override
    public string tostring() {
        return this.salary + "  " + this.age + "    " + this.name;
    }
    //先比较salary,高的排序在前;若相同,age小的在前
    public int compareto(person o) {
        int compareresult1= this.salary - o.salary;
        if(compareresult1 != 0) {
            return -compareresult1;
        } else {
            return this.age - o.age;
        }
    }
    //序列化,将newkey转化成使用流传送的二进制
    public void write(dataoutput dataoutput) throws ioexception {
        dataoutput.writeutf(name);
        dataoutput.writeint(age);
        dataoutput.writeint(salary);
    }
    //使用in读字段的顺序,要与write方法中写的顺序保持一致
    public void readfields(datainput datainput) throws ioexception {
        //read string
        this.name = datainput.readutf();
        this.age = datainput.readint();
        this.salary = datainput.readint();
    }

}

mapreuduce程序:

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.nullwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import java.io.ioexception;
import java.net.uri;
public class  secondarysort {
	public static void main(string[] args) throws exception {
		system.setproperty("hadoop_user_name","hadoop2.7");
		configuration configuration = new configuration();
        //设置本地运行的mapreduce程序 jar包
        configuration.set("mapreduce.job.jar","c:\\users\\tanglei1\\ideaprojects\\hadooptang\\target\\com.kaikeba.hadoop-1.0-snapshot.jar");
		job job = job.getinstance(configuration, secondarysort.class.getsimplename());
		filesystem filesystem = filesystem.get(uri.create(args[1]), configuration);
		if (filesystem.exists(new path(args[1]))) {
			filesystem.delete(new path(args[1]), true);
		}
		fileinputformat.setinputpaths(job, new path(args[0]));
		job.setmapperclass(mymap.class);
		job.setmapoutputkeyclass(person.class);
		job.setmapoutputvalueclass(nullwritable.class);
		//设置reduce的个数
		job.setnumreducetasks(1);
		job.setreducerclass(myreduce.class);
		job.setoutputkeyclass(person.class);
		job.setoutputvalueclass(nullwritable.class);
		fileoutputformat.setoutputpath(job, new path(args[1]));
		job.waitforcompletion(true);
	}
	public static class mymap extends
			mapper<longwritable, text, person, nullwritable> {
		//longwritable:输入参数键类型,text:输入参数值类型
		//persion:输出参数键类型,nullwritable:输出参数值类型
		@override
		//map的输出值是键值对<k,v>,nullwritable说关心v的值
		protected void map(longwritable key, text value,
				context context)
				throws ioexception, interruptedexception {
			//longwritable key:输入参数键值对的键,text value:输入参数键值对的值
			//获得一行数据,输入参数的键(距首行的位置),hadoop读取数据的时候逐行读取文本
			//fields:代表着文本一行的的数据
			string[] fields = value.tostring().split(" ");
			// 本列中文本一行数据:nancy 22 8000
			string name = fields[0];
			//字符串转换成int
			int age = integer.parseint(fields[1]);
			int salary = integer.parseint(fields[2]);
			//在自定义类中进行比较
			person person = new person(name, age, salary);
			context.write(person, nullwritable.get());
		}
	}
	public static class myreduce extends
			reducer<person, nullwritable, person, nullwritable> {
		@override
		protected void reduce(person key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {
			context.write(key, nullwritable.get());
		}
	}
}

运行结果:

40000  30    socrates
29000  39    white
11000  19    green
10000  19    stone
9000  22    ketty
8000  20    tom
8000  22    nancy