一、需求说明
1、数据文件说明
hdfs中有一些存储温度的数据文件,以文本形式存储,示例如下:
日期和时间中间是空格,为整体,表示检测站点监测的时间,后面是检测的温度,中间通过制表符 t 相隔。
2、需求
- 计算在1949-1955年中,每年的温度降序排序且每年单独一个文件输出存储
需要进行自定义分区、自定义分组、自定义排序。
二、解决
1、思路
- 按照年份升序排序再按照每年的温度降序排序
- 按照年份进行分组,每一年份对应一个reduce task
2、自定义mapper输出类型keypair
可以看出,每一行温度姑且称为一个数据,每个数据中有两部分,一部分是时间,另一部分是温度。
因此map输出必须使用自定义的格式输出,并且输出之后需要自定义进行排序和分组等操作,默认的那些都不管用了。
定义keypair
自定义的输出类型因为要将map的输出放到reduce中去运行,因此需要实现hadoop的writablecomparable的接口,并且该接口的模板变量也得是keypair,就像是longwritable一个意思(查看longwritable的定义就可以知道)
实现writablecomparable 的接口,就必须重写write/readfileds/compareto三个方法,依次作用于序列化/反序列化/比较
同时需要重写tostring和hashcode避免equals的问题。
keypair定义如下
值得注意的是:在进行序列化输出的时候也就是write,里面用了将标准格式的时间(文件中显示的格式时间)进行的时间的转换,用了datainput和dataoutput
import org.apache.hadoop.io.writablecomparable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; /** * project : hadooptest2 * package : com.mapreducetest.temp * user : postbird @ http://www.ptbird.cn * time : 2017-01-19 21:53 */ /** * 为温度和年份封装成对象 * year表示年份 而temp为温度 */ public class keypair implements writablecomparable<keypair>{ //年份 private int year; //温度 private int temp; public void setyear(int year) { this.year = year; } public void settemp(int temp) { this.temp = temp; } public int getyear() { return year; } public int gettemp() { return temp; } @override public int compareto(keypair o) { //传过来的对象和当前的year比较 相等为0 不相等为1 int result=integer.compare(year,o.getyear()); if(result != 0){ //两个year不相等 return 0; } //如果年份相等 比较温度 return integer.compare(temp,o.gettemp()); } @override //序列化 public void write(dataoutput dataoutput) throws ioexception { dataoutput.writeint(year); dataoutput.writeint(temp); } @override //反序列化 public void readfields(datainput datainput) throws ioexception { this.year=datainput.readint(); this.temp=datainput.readint(); } @override public string tostring() { return year+"\t"+temp; } @override public int hashcode() { return new integer(year+temp).hashcode(); } }
3、自定义分组
将同一年监测的温度放到一起,因此需要对年份进行比较。
因此比较输入的数据中的年份即可,注意此时比较的都是keypair的类型,map出来的输出也是这个类型。
因为继承了writablecomparator,因此需要重写compare方法,比较的是keypair(keypair实现了writablecomparable接口),实际比较的使他们的年份,年份相同则得到0
/** * project : hadooptest2 * package : com.mapreducetest.temp * user : postbird @ http://www.ptbird.cn * time : 2017-01-19 22:08 */ import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.io.writablecomparator; /** * 为温度分组 比较年份即可 */ public class grouptemp extends writablecomparator{ public grouptemp() { super(keypair.class,true); } @override public int compare(writablecomparable a, writablecomparable b) { //年份相同返回的是0 keypair o1=(keypair)a; keypair o2=(keypair)b; return integer.compare(o1.getyear(),o2.getyear()); } }
4、自定义分区
自定义分区的目的是在根据年份分好了组之后,将不同的年份创建不同的reduce task任务,因此需要对年份处理。
import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.partitioner; /** * project : hadooptest2 * package : com.mapreducetest.temp * user : postbird @ http://www.ptbird.cn * time : 2017-01-19 22:17 */ //自定义分区 //每一个年份生成一个reduce任务 public class firstpartition extends partitioner<keypair,text>{ @override public int getpartition(keypair key, text value, int num) { //按照年份进行分区 年份相同,返回的是同一个值 return (key.getyear()*127)%num; } }
5、自定义排序
最终还是比较的是温度的排序,因此这部分也是非常重要的。
根据上面的需求,需要对年份进行生序排序,而对温度进行降序排序,首选比较条件是年份.
/** * project : hadooptest2 * package : com.mapreducetest.temp * user : postbird @ http://www.ptbird.cn * time : 2017-01-19 22:08 */ import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.io.writablecomparator; /** * 为温度排序的封装类 */ public class sorttemp extends writablecomparator{ public sorttemp() { super(keypair.class,true); } //自定义排序 @override public int compare(writablecomparable a, writablecomparable b) { //按照年份升序排序 按照温度降序排序 keypair o1=(keypair)a; keypair o2=(keypair)b; int result=integer.compare(o1.getyear(),o2.getyear()); //比较年份 如果年份不相等 if(result != 0){ return result; } //两个年份相等 对温度进行降序排序,注意 - 号 return -integer.compare(o1.gettemp(),o2.gettemp()); } }
6、mapreduce程序的编写
几个值得注意的点:
- 数据文件中前面的时间是字符串,但是我们的keypair的set却不是字符串,因此需要进行字符串转日期的format操作,使用的是simpledateformat,格式自然是"yyyy-mm-dd hh:mm:ss"了。
- 输入每行数据之后,通过正则匹配"t"的制表符,然后将温度和时间分开,将时间format并得到年份,将第二部分字符串去掉“℃”的符号得到数字,然后创建keypair类型的数据,在输出即可。
- 每个年份都生成一个reduce task依据就是自定义分区中对年份进行了比较处理,为了简单就把map的输出结果在reduce中再输出一次,三个reduce task,就会生成三个输出文件。
- 因为使用了自定义的排序,分组,分区,因此就需要进行指定相关的class,同时也需要执行reduce task的数量。
- 其实最后客户端还是八股文的固定形式而已,只不过多了自定义的指定,没有别的。
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.genericoptionsparser; import java.io.ioexception; import java.net.uri; import java.text.simpledateformat; import java.util.calendar; import java.util.date; /** * project : hadooptest2 * package : com.mapreducetest.temp * user : postbird @ http://www.ptbird.cn * time : 2017-01-19 22:28 */ public class runtempjob { //字符串转日期format public static simpledateformat sdf=new simpledateformat("yyyy-mm-dd hh:mm:ss"); /** * mapper * 输出的key是自定义的keypair */ static class tempmapper extends mapper<longwritable,text,keypair,text>{ protected void map(longwritable key,text value,context context) throws ioexception,interruptedexception{ string line=value.tostring(); //1949-10-01 14:21:02 34℃ // 前面是空格 时间和温度通过\t分割 string[] ss=line.split("\t"); // system.err.println(ss.length); if(ss.length==2){ try{ //获得日期 date date=sdf.parse(ss[0]); calendar c=calendar.getinstance(); c.settime(date); int year=c.get(1);//得到年份 //字符串截取得到温度,去掉℃ string temp = ss[1].substring(0,ss[1].indexof("℃")); //创建输出key 类型为keypair keypair kp=new keypair(); kp.setyear(year); kp.settemp(integer.parseint(temp)); //输出 context.write(kp,value); }catch(exception ex){ ex.printstacktrace(); } } } } /** * reduce 区域 * map的输出是reduce的输出 */ static class tempreducer extends reducer<keypair,text,keypair,text> { @override protected void reduce(keypair kp, iterable<text> values, context context) throws ioexception, interruptedexception { for (text value:values){ context.write(kp,value); } } } //client public static void main(string args[]) throws ioexception, interruptedexception{ //获取配置 configuration conf=new configuration(); //修改命令行的配置 string[] otherargs = new genericoptionsparser(conf, args).getremainingargs(); if (otherargs.length != 2) { system.err.println("usage: temp <in> <out>"); system.exit(2); } //创建job job job=new job(conf,"temp"); //1.设置job运行的类 job.setjarbyclass(runtempjob.class); //2.设置map和reduce的类 job.setmapperclass(runtempjob.tempmapper.class); job.setreducerclass(runtempjob.tempreducer.class); //3.设置map的输出的key和value 的类型 job.setmapoutputkeyclass(keypair.class); job.setmapoutputvalueclass(text.class); //4.设置输入文件的目录和输出文件的目录 fileinputformat.addinputpath(job,new path(otherargs[0])); fileoutputformat.setoutputpath(job,new path(otherargs[1])); //5.设置reduce task的数量 每个年份对应一个reduce task job.setnumreducetasks(3);//3个年份 //5.设置partition sort group的class job.setpartitionerclass(firstpartition.class); job.setsortcomparatorclass(sorttemp.class); job.setgroupingcomparatorclass(grouptemp.class); //6.提交job 等待运行结束并在客户端显示运行信息 boolean issuccess= false; try { issuccess = job.waitforcompletion(true); } catch (classnotfoundexception e) { e.printstacktrace(); } //7.结束程序 system.exit(issuccess ?0:1); } }
三、生成效果:
hdfs中三个reduce task会生成三个输出。
每个输出文件都是每年中的温度的排序结果:
可以看出,1951是map(也可以说是keypair)输出的年份,46是温度,而后面是将text又输出了一次,每一年都是根据需求降序排序的。)