欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Apache2.2+Tomcat7.0整合配置详解

程序员文章站 2022-07-05 13:37:22
本章主要介绍 MapReduce 的开发原理及应用场景,让大家初步认识 MapReduce 的使用方式,讲解如何利用 Combine、Partitioner、WritableComparable、WritableComparator 等组件对数据进行排序筛选聚合分组的功能。利用例子模仿 SQL 关系... ......

前言

本文主要介绍 mapreduce 的原理及开发,讲解如何利用 combine、partitioner、writablecomparator等组件对数据进行排序筛选聚合分组的功能。
由于文章是针对开发人员所编写的,在阅读本文前,文章假设读者已经对hadoop的工作原理、安装过程有一定的了解,因此对hadoop的安装就不多作说明。请确保源代码运行在hadoop 2.x以上版本,并以伪分布形式安装以方便进行调试(单机版会对 partitioner 功能进行限制)。
文章主要利用例子介绍如何利用 mapreduce 模仿 sql 关系数据库进行select、where、group、join 等操作,并对 groupingcomparator、sortcomparator 等功能进行说明。
希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。

 

目录

一、mapreduce 工作原理简介

二、mapreduce 开发实例

三、利用 partitioner 控制键值分配

四、利用 combiner 提高系统性能

五、writablecomparatable 自定义键值说明

 

 

 

一、mapreduce 工作原理简介

对hadoop有兴趣的朋友相信对hadoop的主要工作原理已经有一定的认识,在讲解mapreduce的程序开发前,本文先针对mapper、reducer、partitioner、combiner、suhffle、sort的工作原理作简单的介绍,以帮助各位更好地了解后面的内容。

Apache2.2+Tomcat7.0整合配置详解

图 1.1

1.1 mapper 阶段

当系统对数据进行分片后,每个输入分片会分配到一个mapper任务来处理,默认情况下系统会以hdfs的一个块大小64m作为分片大小,当然也可以通过配置文件设置块的大小。随后mapper节点输出的数据将保存到一个缓冲区中(缓冲区的大小默认为512m,可通过mapreduce.task.io.sort.mb属性进行修改),缓冲区越大排序效率越高。当该缓冲区快要溢出时(缓冲区默认大小为80%,可通过mapreduce.map.sort.spill.percent属性进行修改),系统会启动一个后台线程,将数据传输到会到本地的一个文件当中。

1.2 partitioner 阶段

在mapper完成 key/value 格式的数据操作后,partitioner 将会被调用,由于真实环境中 hadoop 可能会包含几十个甚至上百个reducer ,partitioner 的主要作用就是根据自定义方式确定数据将被传输到哪一个reducer进行处理。

1.3 combiner 阶段

如果系统定义了combiner,在经过 partitioner 排序处理后将会进行 combiner处理。我们可以把 combiner 看作为一个小型的 reducer ,由于数据从 mapper 通过网络传送到 reducer ,资源开销很大,combiner 目的就是在数据传送到reducer前作出初步聚集处理,减少服务器的压力。如果数据量太大,还可以把 mapred.compress.map.out 设置为 true,就可以将数据进行压缩。(关于数据压缩的内容已经超越本文的讨论范围,以后会有独立的篇章针对数据压缩进行专题讨论,敬请期待)

1.4 shuffle 阶段

在 shuffle 阶段,每个 reducer 会启动 5 个线程(可通过 mapreduce.reduce.shuffle.parallelcopies 进行设置)通过http协议获取mapper传送过来的数据。每次数据发送到 reducer 前,都会根据键先进行排序。开发人员也可通过自定义的 sortcomparator 进行数据排序,也是根据 groupcomparator 按照数据的其他特性进行分组处理,下面章节将会详细举例介绍。对数据进行混洗、排序完成后,将传送到对应的reducer进行处理。

1.5 reducer 阶段

当 mapper 实例完成输入的数据超过设定值后(可通过mapreduce.job.reduce.slowstart.completedmaps 进行设置), reducer 就会开始执行。reducer 会接收到不同 mapper 任务传来已经过排序的数据,并通过iterable 接口进行处理。在 partitioner 阶段,系统已定义哪些数据将由个 reducer 进行管理。当 reducer 检测到 key 时发生变化时,系统就会按照已定的规则生成一个新的 reducer 对数据进行处理。
如果 reducer 端接受的数据量较小,数据则可直接存储在内存缓冲区中,方便后面的数据输出(缓冲区大小可通过mapred.job.shuffle.input.buffer.percent 进行设置)
如果数据量超过了该缓冲区大小的一定比例(可以通过 mapred.job.shuffle.merge.percent 进行设置),数据将会被合并后写到磁盘中。

二、mapreduce 开发实例

上一章节讲解了 mapreduce 的主要流程,下面将以几个简单的例子模仿 sql 关系数据库向大家介绍一下 mapreduce 的开发过程。

hdfs常用命令  (此处只介绍几个常用命令,详细内容可在网上查找)

  • 创建目录    hdfs dfs -mkdir -p 【path】 
  • 复制文件    hdfs dfs -copyfromlocal 【inputpath】【outputpath】
  • 查看目录    hdfs dfs -ls 【path】
  • 运行jar    hadoop jar 【jar名称】 【main类全名称】 【inputpath】 【outputpath】 

2.1 使用 select 获取数据

应用场景:假设在 hdfs 文件夹 / input / 20180509 路径的 *.dat 类型文件中存放在着大量不同型号的 iphone 手机当天在不同地区的销售记录,系统想对这些记录进行统计,计算出不同型号手机的销售总数。

Apache2.2+Tomcat7.0整合配置详解

计算时,在mapper中获取每一行的信息,并把iphone名称作为key插入,把数据作为value插入到context当中。
当reducer接收到相同key数据后,再作统一处理。

注意  :   当前例子当中  mapper 的输入 key 为  longwritable 长类型

在此过程中要注意几点: 例子中 salemanager 继承了 org.apache.hadoop.conf.configured 类并实现了 org.apache.hadoop.util.tool 接口的 public static int run(configuration conf,tool tool, string[] args) 方法,mapreduce的相关操作都在run里面实现。由于 configured 已经实现了 getconf() 与setconfig() 方法,创建job时相关的配置信息就可通过getconf()方法读入。

系统可以通过以下方法注册mapper及reducer处理类
job.setmapperclass(mymapper.class);
job.setreducerclass(myreducer.class);

在整个运算过程当中,数据会经过筛选与计算,所以mapper的读入信息k1,v1与reducer的输出信息k3,v3不一定是同一格式。
org.apache.hadoop.mapreduce.mapper<k1,v1,k2,v2>
org.apache.hadoop.mapreduce.reducer<k2,v2,k3,v3>

当mapper的输出的键值类型与reduces输出的键值类型相同时,系统可以通过下面方法设置转出数据的格式
job.setoutputkeyclass(k);
job.setoutputvalueclass(v);

当mapper的输出的键值类型与reduces输出的键值类型不相同时,系统则需要通过下面方法设置mapper转出格式
job.setmapoutputkeyclass(k);
job.setmapoutputvalueclass(v);

 1 public class phone {
 2     public string type;
 3     public integer count;
 4     public string area;
 5     
 6     public phone(string line){
 7        string[] data=line.split(",");
 8        this.type=data[0].tostring();
 9        this.count=integer.valueof(data[1].tostring());
10        this.area=data[2].tostring();
11     }
12     
13     public string gettype(){
14         return this.type;
15     }
16     
17     public integer getcount(){
18         return this.count;
19     }
20     
21     public string getarea(){
22         return this.area;
23     }
24 }
25 
26 public class salemanager extends configured implements tool{
27     public static class mymapper extends mapper<longwritable,text,text,intwritable>{
28         public void map(longwritable key,text value,context context)
29             throws ioexception,interruptedexception{
30             string data=value.tostring();
31             phone iphone=new phone(data);
32             //以iphone型号作为key,数量为作value传入
33             context.write(new text(iphone.gettype()), new intwritable(iphone.getcount()));
34         }
35     }
36     
37     public static class myreducer extends reducer<text,intwritable,text,intwritable>{
38         public void reduce(text key,iterable<intwritable> values,context context)
39             throws ioexception,interruptedexception{
40             int sum=0;
41             //对同一型号的iphone数量进行统计
42             for(intwritable val : values){
43                 sum+=val.get();
44             }
45             context.write(key, new intwritable(sum));
46         }
47     }
48 
49     public int run(string[] arg0) throws exception {
50         // todo 自动生成的方法存根
51         // todo auto-generated method stub
52         job job=job.getinstance(getconf());
53         job.setjarbyclass(salemanager.class);
54         //注册key/value类型为text
55         job.setoutputkeyclass(text.class);
56         job.setoutputvalueclass(intwritable.class);
57         //注册mapper及reducer处理类
58         job.setmapperclass(mymapper.class);
59         job.setreducerclass(myreducer.class);
60         //输入输出数据格式化类型为textinputformat
61         job.setinputformatclass(textinputformat.class);
62         job.setoutputformatclass(textoutputformat.class);
63         //默认情况下reducer数量为1个(可忽略)
64         job.setnumreducetasks(1);
65         //获取命令参数
66         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
67         //设置读入文件路径
68         fileinputformat.setinputpaths(job,new path(args[0]));
69         //设置转出文件路径
70         fileoutputformat.setoutputpath(job,new path(args[1]));
71         boolean status=job.waitforcompletion(true);
72         if(status)
73             return 0;
74         else
75             return 1;
76     }
77     
78     public static void main(string[] args) throws exception{
79         configuration conf=new configuration();
80         toolrunner.run(new salemanager(), args);
81     }
82 }

计算结果

Apache2.2+Tomcat7.0整合配置详解 

2.2 使用 where 对数据进行筛选

在计算过程中,并非所有的数据都适用于reduce的计算,由于海量数据是通过网络传输的,所消耗的 i/o 资源巨大,所以可以尝试在mapper过程中提前对数据进行筛选。以上面的数据为例,当前系统只需要计算输入参数地区的销售数据。此时只需要修改一下mapper类,重写setup方法,通过configuration类的 public string[] configuration.getstrings(参数名,默认值) 方法获取命令输入的参数,再对数据进行筛选。

 1 public static class mymapper extends mapper<longwritable,text,text,intwritable>{
 2         private string area;
 3         
 4         @override
 5         public void setup(context context){
 6             this.area=context.getconfiguration().getstrings("area", "beijing")[0];
 7         }
 8         
 9         public void map(longwritable key,text value,context context)
10             throws ioexception,interruptedexception{
11             string data=value.tostring();
12             phone iphone=new phone(data);
13             if(this.area.equals(iphone.area))
14                 context.write(new text(iphone.gettype()), new intwritable(iphone.getcount()));
15         }
16     }

执行命令 hadoop jar 【jar名称】 【main类全名称】-d 【参数名=参数值】 【inputpath】 【outputpath】
例如:hadoop jar hadooptest-0.2.jar sun.hadooptest.salemanager -d area=beijing /tmp/input/050901 /tmp/output/050901 
此时系统将选择 area 等于beijing 的数据进行统计
计算结果

Apache2.2+Tomcat7.0整合配置详解

三、利用 partitioner 控制键值分配

3.1 深入分析 partitioner

partitioner 类在 org.apache.hadoop.mapreduce.partitioner 中,通过 job.setpartitionerclass(class<? extends partitioner> cls) 方法可绑定自定义的 partitioner。若用户没有实现自定义partitioner 时,系统将自动绑定 hadoop 的默认类 org.apache.hadoop.mapreduce.lib.partiton.hashpartitioner 。partitioner 包含一个主要方法是 int getpartition(k key,v value,int numreducetasks) ,功能是控制将哪些键分配到哪个 reducer。此方法的返回值是 reducer 的索引值,若系统定义了4个reducer,其返回值为0~3。numreducetasks 侧是当前系统的 reducer 数量,此数量可通过job.setnumreducetasks(int tasks) 进行设置,在伪分布环境下,其默认值为1。 

注意:

在单机环境下,系统只会使用一个 reducer,这将导致 partitioner 缺乏意义,这也是在本文引言中强调要使用伪分布环境进行调试的原因 。

通过反编译查看 hashpartitioner ,可见系统是通过(key.hashcode() & interger.max_value )%numreducetasks 方法,根据 key 的 hashcode 对 reducer 数量求余方式,确定数据分配到哪一个 reducer 进行处理的。但如果想根据用户自定义的逻辑把数据分配到对应 reducer,单依靠 hashpartitioner 是无法实现的,此时侧需要自定义 partitioner 。

1 public class hashpartitioner<k, v> extends partitioner<k, v> {
2 
3   public int getpartition(k key, v value, int numreducetasks) {
4     return (key.hashcode() & integer.max_value) % numreducetasks;
5   }
6 }

3.2 自定义 partitioner

在例子当中,假设系统需要把北、上、广、深4个不同的地区的iphone销售情况分别交付给不同 reducer 进行统计处理。我们可以自定义一个 mypartitioner, 通过 job.setpartitionerclass( mypartitioner.class ) 进行绑定。通过 job.setnumreducetasks(4) 设置4个reducer 。以手机类型作为key,把销售数据与地区作为value。在 int getpartition(k key,v value,int numreducetasks) 方法中,根据 value 值的不同返回不同的索引值。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring();
  9        this.count=integer.valueof(data[1].tostring());
 10        this.area=data[2].tostring();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class mypatitional extends partitioner<text,text> {
 27 
 28     @override
 29     public int getpartition(text arg0, text arg1, int arg2) {
 30         // todo 自动生成的方法存根
 31         string area=arg1.tostring().split(",")[0];
 32         // 根据不同的地区返回不同的索引值       
 33         if(area.contentequals("beijing"))
 34             return 0;
 35         if(area.contentequals("guangzhou"))
 36             return 1;
 37         if(area.contentequals("shenzhen"))
 38             return 2;
 39         if(area.contentequals("shanghai"))
 40             return 3;
 41         return 0;
 42     }
 43 }    
 44 
 45 public class salemanager extends configured implements tool{
 46     public static class mymapper extends mapper<longwritable,text,text,text>{
 47         
 48         public void map(longwritable key,text value,context context)
 49             throws ioexception,interruptedexception{
 50             string data=value.tostring();
 51             phone iphone=new phone(data);
 52             context.write(new text(iphone.gettype()), new text(iphone.getarea()+","+iphone.getcount().tostring()));
 53         }
 54     }
 55     
 56     public static class myreducer extends reducer<text,text,text,intwritable>{
 57         
 58         public void reduce(text key,iterable<text> values,context context)
 59             throws ioexception,interruptedexception{
 60             int sum=0;
 61             //对同一型号的iphone数量进行统计
 62             for(text value : values){
 63                 string count=value.tostring().split(",")[1];
 64                 sum+=integer.valueof(count).intvalue();
 65             }
 66             context.write(key, new intwritable(sum));
 67         }
 68     }
 69 
 70     public int run(string[] arg0) throws exception {
 71         // todo 自动生成的方法存根
 72          // todo auto-generated method stub
 73         job job=job.getinstance(getconf());
 74         job.setjarbyclass(salemanager.class);
 75         //注册key/value类型为text
 76         job.setoutputkeyclass(text.class);
 77         job.setoutputvalueclass(intwritable.class);
 78         //若map的转出key/value不相同是需要分别注册
 79         job.setmapoutputkeyclass(text.class);
 80         job.setmapoutputvalueclass(text.class);
 81         //注册mapper及reducer处理类
 82         job.setmapperclass(mymapper.class);
 83         job.setreducerclass(myreducer.class);
 84         //输入输出数据格式化类型为textinputformat
 85         job.setinputformatclass(textinputformat.class);
 86         job.setoutputformatclass(textoutputformat.class);
 87         //设置reduce数量为4个,伪分布式情况下不设置时默认为1
 88         job.setnumreducetasks(4);
 89         //注册自定义partitional类
 90         job.setpartitionerclass(mypatitional.class);
 91         //获取命令参数
 92         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
 93         //设置读入文件路径
 94         fileinputformat.setinputpaths(job,new path(args[0]));
 95         //设置转出文件路径
 96         fileoutputformat.setoutputpath(job,new path(args[1]));
 97         boolean status=job.waitforcompletion(true);
 98         if(status)
 99             return 0;
100         else
101             return 1;
102     }
103     
104     public static void main(string[] args) throws exception{
105         configuration conf=new configuration();
106         toolrunner.run(new salemanager(), args);
107     }
108 }

计算结果
Apache2.2+Tomcat7.0整合配置详解


Apache2.2+Tomcat7.0整合配置详解

四、利用 combiner 提高系统性能

在前面几节所描述的例子当中,我们都是把所有的数据完整发送到 reducer 中再作统计。试想一下,在真实环境当中,iphone 的销售记录数以千万计,如此巨大的数据需要在 mapper/reducer 当中进行传输,将会耗费多少的网络资源。这么多年来 iphone 出品的机型不过十多个,系统能否先针对同类的机型在mapper端作出初步的聚合计算,再把计算结果发送到 reducer。如此一来,传到 reducer 端的数据量将会大大减少,只要在适当的情形下使用将有利于系统的性能提升。
针对此类问题,combiner 应运而生,我们可以把 combiner 看作为一个小型的 reducer ,它的目的就是在数据传送到reducer前在mapper中作出初步聚集处理,减少服务器之间的 i/o 数据传输压力。combiner 也继承于reducer,通过job.setcombinerclass(class<? extends reducer> cls) 方法进行注册。
下面继续以第3节的例子作为参考,系统想要在同一个reducer中计算所有地区不同型号手机的销售情况。我们可以把地区名作为key,把销售数量和手机类型转换成 mapwritable 作为 value。当数据输入后,不是直接把数据传输到 reducer ,而是通过combiner 把mapper中不同的型号手机的销售数量进行聚合计算,把5种型号手机的销售总数算好后传输给reducer。在reducer中再把来源于不同 combiner 的数据进行求和,得出最后结果。

注意  :   

mapwritable 是 系统自带的 writable 集合类中的其中一个,它实现了  java.util.map<writable,writable> 接口,以单字节充当类型数据的索引,常用于枚举集合的元素。

 

  1 public class salemanager extends configured implements tool{
  2     private static intwritable type=new intwritable(0);
  3     private static intwritable value=new intwritable(1);
  4     private static intwritable iphone7=new intwritable(2);
  5     private static intwritable iphone7_plus=new intwritable(3);
  6     private static intwritable iphone8=new intwritable(4);
  7     private static intwritable iphone8_plus=new intwritable(5);
  8     private static intwritable iphonex=new intwritable(6);
  9     
 10     public static class mymapper extends mapper<longwritable,text,text,mapwritable>{
 11         
 12         public void map(longwritable key,text value,context context)
 13             throws ioexception,interruptedexception{
 14             string data=value.tostring();
 15             phone iphone=new phone(data);
 16             context.write(new text(iphone.getarea()), getmapwritable(iphone.gettype(), iphone.getcount()));
 17         }      
 18         
 19         private mapwritable getmapwritable(string type,integer count){
 20             text _type=new text(type);
 21             text _count=new text(count.tostring());
 22             mapwritable mapwritable=new mapwritable();
 23             mapwritable.put(type,_type);
 24             mapwritable.put(value,_count);
 25             return mapwritable;
 26         }
 27     }
 28     
 29     public static class mycombiner extends reducer<text,mapwritable,text,mapwritable> {
 30         public void reduce(text key,iterable<mapwritable> values,context context) 
 31             throws ioexception, interruptedexception{
 32                int iphone7=0;
 33                int iphone7_plus=0;
 34                int iphone8=0;
 35                int iphone8_plus=0;
 36                int iphonex=0;
 37                //对同一个mapper所处理的不同型号的手机数据进行初步统计
 38                for(mapwritable value:values){
 39                     string type=value.get(type).tostring();
 40                     integer count=integer.valueof(value.get(value).tostring());
 41                     if(type.contentequals("iphone7"))
 42                         iphone7+=count;
 43                     if(type.contentequals("iphone7_plus"))
 44                         iphone7_plus+=count;
 45                     if(type.contentequals("iphone8"))
 46                         iphone8+=count;
 47                     if(type.contentequals("iphone8_plus"))
 48                         iphone8_plus+=count;
 49                     if(type.contentequals("iphonex"))
 50                         iphonex+=count;
 51                 }                
 52                 mapwritable mapwritable=new mapwritable();
 53                 mapwritable.put(iphone7, new intwritable(iphone7));
 54                 mapwritable.put(iphone7_plus, new intwritable(iphone7_plus));
 55                 mapwritable.put(iphone8, new intwritable(iphone8));
 56                 mapwritable.put(iphone8_plus, new intwritable(iphone8_plus));
 57                 mapwritable.put(iphonex, new intwritable(iphonex));
 58                 context.write(key,mapwritable);
 59         }
 60    }
 61     
 62     public static class myreducer extends reducer<text,mapwritable,text,text>{
 63         public void reduce(text key,iterable<mapwritable> values,context context)
 64             throws ioexception,interruptedexception{
 65                int iphone7=0;
 66                int iphone7_plus=0;
 67                int iphone8=0;
 68                int iphone8_plus=0;
 69                int iphonex=0;
 70 
 71             //对同一地区不同型的iphone数量进行统计
 72             for(mapwritable value : values){
 73                 iphone7+=integer.parseint(value.get(iphone7).tostring());
 74                 iphone7_plus+=integer.parseint(value.get(iphone7_plus).tostring());
 75                 iphone8+=integer.parseint(value.get(iphone8).tostring());
 76                 iphone8_plus+=integer.parseint(value.get(iphone8_plus).tostring());
 77                 iphonex+=integer.parseint(value.get(iphonex).tostring());
 78             }
 79             
 80             stringbuffer data=new stringbuffer();
 81             data.append("iphone7:"+iphone7+"   ");
 82             data.append("iphone7_plus:"+iphone7_plus+"   ");
 83             data.append("iphone8:"+iphone8+"   ");
 84             data.append("iphone8_plus:"+iphone8_plus+"   ");
 85             data.append("iphonex:"+iphonex+"   ");
 86             context.write(key, new text(data.tostring()));
 87         }
 88     }
 89 
 90     public int run(string[] arg0) throws exception {
 91         // todo 自动生成的方法存根
 92         // todo auto-generated method stub
 93         job job=job.getinstance(getconf());
 94         job.setjarbyclass(salemanager.class);
 95         //注册key/value类型为text
 96         job.setoutputkeyclass(text.class);
 97         job.setoutputvalueclass(text.class);
 98         //若map的转出key/value不相同是需要分别注册
 99         job.setmapoutputkeyclass(text.class);
100         job.setmapoutputvalueclass(mapwritable.class);
101         //注册mapper及reducer处理类
102         job.setmapperclass(mymapper.class);
103         job.setreducerclass(myreducer.class);
104         //注册combiner处理类
105         job.setcombinerclass(mycombiner.class);
106         //输入输出数据格式化类型为textinputformat
107         job.setinputformatclass(textinputformat.class);
108         job.setoutputformatclass(textoutputformat.class);
109         //伪分布式情况下不设置时默认为1
110         job.setnumreducetasks(1);
111         //获取命令参数
112         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
113         //设置读入文件路径
114         fileinputformat.setinputpaths(job,new path(args[0]));
115         //设置转出文件路径
116         fileoutputformat.setoutputpath(job,new path(args[1]));
117         boolean status=job.waitforcompletion(true);
118         if(status)
119             return 0;
120         else
121             return 1;
122     }
123     
124     public static void main(string[] args) throws exception{
125         configuration conf=new configuration();
126         toolrunner.run(new salemanager(), args);
127     }
128 }

计算结果

Apache2.2+Tomcat7.0整合配置详解

五、writablecomparable自定义键值说明

5.1 writable、comparable、writablecomparable 之间关系

在 mapper 与 reducer 中使用到的键类型、值类型都必须实现 writable 接口,而键类型侧需要实现 writablecomparable,它们之间的关系如下图:

Apache2.2+Tomcat7.0整合配置详解

 

writable 接口有两个方法

  • write(java.io.dataoutput out) 将实例的原始属性写到 dataoutput 输出流中,其作用是序列化基础数据
  • readfields(java.io.datainput in) 从 datainput 对象中抓取数据并重新创建 writable

comparable 接口中 int compareto(object) 方法侧定义了排序的方式,如果返回值为0(判断为两个对象相等),侧被同一个 reduce 方法处理,一旦两个对象不相等,系统就会生成另一个 reduce 处理。

5.2 自定义值类型

以第三节的例子作为讨论,假设系统需要把北、上、广、深4个不同的地区的iphone销售情况分别交付给4个不同 reducer 节点进行统计处理。使用地区 area 作为key,使用继承 writable 接口的phonevalue作为 value 值类型,实现 write 方法与 readfields 方法,最后在 reduce 方法区分不同的型号进行计算。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring().trim();
  9        this.count=integer.valueof(data[1].tostring().trim());
 10        this.area=data[2].tostring().trim();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class phonevalue implements writable {
 27     public text type=new text();
 28     public intwritable count=new intwritable();
 29     public text area=new text();
 30     
 31     public phonevalue(){
 32         
 33     }
 34 
 35     public phonevalue(string type,integer count,string area){        
 36         this.type=new text(type);
 37         this.count=new intwritable(count);
 38         this.area=new text(area);
 39     }
 40     
 41     public text gettype() {
 42         return type;
 43     }
 44 
 45     public void settype(text type) {
 46         this.type = type;
 47     }
 48 
 49     public intwritable getcount() {
 50         return count;
 51     }
 52 
 53     public void setcount(intwritable count) {
 54         this.count = count;
 55     }
 56 
 57     public text getarea() {
 58         return area;
 59     }
 60 
 61     public void setarea(text area) {
 62         this.area = area;
 63     }
 64 
 65     @override
 66     public void readfields(datainput arg0) throws ioexception {
 67         // todo 自动生成的方法存根
 68         this.type.readfields(arg0);
 69         this.count.readfields(arg0);
 70         this.area.readfields(arg0);
 71     }
 72 
 73     @override
 74     public void write(dataoutput arg0) throws ioexception {
 75         // todo 自动生成的方法存根
 76         this.type.write(arg0);
 77         this.count.write(arg0);
 78         this.area.write(arg0);
 79     }
 80 }
 81 
 82 public class salemanager extends configured implements tool{
 83     
 84     public static class mymapper extends mapper<longwritable,text,text,phonevalue>{
 85         
 86         public void map(longwritable key,text value,context context)
 87             throws ioexception,interruptedexception{
 88             string data=value.tostring();
 89             phone iphone=new phone(data);
 90             phonevalue phonevalue=new phonevalue(iphone.gettype(),iphone.getcount(),iphone.getarea());
 91             context.write(new text(iphone.getarea()), phonevalue);
 92         }      
 93     }
 94     
 95     public static class myreducer extends reducer<text,phonevalue,text,text>{
 96         integer iphone7=new integer(0);
 97         integer iphone7_plus=new integer(0);
 98         integer iphone8=new integer(0);
 99         integer iphone8_plus=new integer(0);
100         integer iphonex=new integer(0);
101         
102         public void reduce(text key,iterable<phonevalue> values,context context)
103             throws ioexception,interruptedexception{
104             //对不同类型iphone数量进行统计
105             for(phonevalue phone : values){
106                 int count=phone.getcount().get();
107                 
108                 if(phone.type.tostring().equals("iphone7"))
109                     iphone7+=count;
110                 if(phone.type.tostring().equals("iphone7_plus"))
111                     iphone7_plus+=count;
112                 if(phone.type.tostring().equals("iphone8"))
113                     iphone8+=count;
114                 if(phone.type.tostring().equals("iphone8_plus"))
115                     iphone8_plus+=count;
116                 if(phone.type.tostring().equals("iphonex"))
117                     iphonex+=count;
118             }
119             
120             context.write(new text("iphone7"), new text(iphone7.tostring()));
121             context.write(new text("iphone7_plus"), new text(iphone7_plus.tostring()));
122             context.write(new text("iphone8"), new text(iphone8.tostring()));
123             context.write(new text("iphone8_plus"), new text(iphone8_plus.tostring()));
124             context.write(new text("iphonex"), new text(iphonex.tostring()));
125         }
126     }
127 
128     public int run(string[] arg0) throws exception {
129         // todo 自动生成的方法存根
130          // todo auto-generated method stub
131         job job=job.getinstance(getconf());
132         job.setjarbyclass(salemanager.class);
133         //注册key/value类型为text
134         job.setoutputkeyclass(text.class);
135         job.setoutputvalueclass(text.class);
136         //若map的转出key/value不相同是需要分别注册
137         job.setmapoutputkeyclass(text.class);
138         job.setmapoutputvalueclass(phonevalue.class);
139         //注册mapper及reducer处理类
140         job.setmapperclass(mymapper.class);
141         job.setreducerclass(myreducer.class);
142         //输入输出数据格式化类型为textinputformat
143         job.setinputformatclass(textinputformat.class);
144         job.setoutputformatclass(textoutputformat.class);
145         //伪分布式情况下不设置时默认为1
146         job.setnumreducetasks(4);
147         //获取命令参数
148         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
149         //设置读入文件路径
150         fileinputformat.setinputpaths(job,new path(args[0]));
151         //设置转出文件路径
152         fileoutputformat.setoutputpath(job,new path(args[1]));
153         boolean status=job.waitforcompletion(true);
154         if(status)
155             return 0;
156         else
157             return 1;
158     }
159     
160     public static void main(string[] args) throws exception{
161         configuration conf=new configuration();
162         toolrunner.run(new salemanager(), args);      
163     } 
164 }

计算结果与第三节相同
Apache2.2+Tomcat7.0整合配置详解

Apache2.2+Tomcat7.0整合配置详解

 

5.3 自定义键类型

hadoop 常用的类 intwritable、longwritable、text、booleanwritable 等都实现了writablecomparable 接口,当用户需要自定义键类型时,只需要实现writablecomparable接口即可。public boolean equals(object o) 与 public int hashcode() 都是 object 的方法,回顾本文的第三节可以看到 hashcode 会被系统默认的 partitioner 即 hashpartitioner 类所使用。在使用系统默认的 hashpartitioner 类时,一旦 hashcode 相等,数据将返回到同一个reducer 节点,因此应该按业务的需求重新定义键类型的 hashcode。同时 equals 方法应该按照 hashcode 逻辑统一修改,避免在使用 hash 散列时出现逻辑错误。

以第三节的例子作为讨论,假设系统需要把北、上、广、深4个不同的地区的iphone销售情况分别交付给不同 reducer 节点进行统计处理,我们只需要定义 phonekey 作为键类型,当中包含地区号 area 和型号 type。在 hashcode 中以地区号 area 作为指标,在 compareto 方法中我们以手机的类型 type 进行排序。系统就可在不同的 reducer 节点中计算出同一地点不同类型手机的销售情况。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring().trim();
  9        this.count=integer.valueof(data[1].tostring().trim());
 10        this.area=data[2].tostring().trim();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class phonekey implements writablecomparable<phonekey> {
 27     public text type=new text();
 28     public text area=new text();
 29     
 30     public phonekey(){
 31         
 32     }
 33 
 34     public phonekey(string type,string area){        
 35         this.type=new text(type);
 36         this.area=new text(area);
 37     }
 38     
 39     public text gettype() {
 40         return type;
 41     }
 42 
 43     public void settype(text type) {
 44         this.type = type;
 45     }
 46 
 47     public text getarea() {
 48         return area;
 49     }
 50 
 51     public void setarea(text area) {
 52         this.area = area;
 53     }
 54 
 55     @override
 56     public void readfields(datainput arg0) throws ioexception {
 57         // todo 自动生成的方法存根
 58         this.type.readfields(arg0);
 59         this.area.readfields(arg0);
 60     }
 61 
 62     @override
 63     public void write(dataoutput arg0) throws ioexception {
 64         // todo 自动生成的方法存根
 65         this.type.write(arg0);
 66         this.area.write(arg0);
 67     }
 68 
 69     @override
 70     public int compareto(phonekey o) {
 71         // todo 自动生成的方法存根
 72         return this.type.compareto(o.type);
 73     }
 74  
 75     @override
 76     public boolean equals(object o){
 77         if(!(o instanceof phonekey)){
 78             return false;
 79         }
 80         phonekey phone=(phonekey) o;
 81         return this.area.equals(phone.area);
 82     }
 83     
 84     @override
 85     public int hashcode(){
 86         return this.area.hashcode();
 87     }
 88 }
 89 
 90 public class salemanager extends configured implements tool{
 91     
 92     public static class mymapper extends mapper<longwritable,text,phonekey,intwritable>{
 93         
 94         public void map(longwritable key,text value,context context)
 95             throws ioexception,interruptedexception{
 96             string data=value.tostring();
 97             phone iphone=new phone(data);
 98             phonekey phonekey=new phonekey(iphone.gettype(),iphone.getarea());
 99             context.write(phonekey, new intwritable(iphone.count));
100         }      
101     }
102     
103     public static class myreducer extends reducer<phonekey,intwritable,text,text>{        
104         public void reduce(phonekey phonekey,iterable<intwritable> values,context context)
105             throws ioexception,interruptedexception{
106             string type=phonekey.gettype().tostring();
107             integer total=new integer(0);
108             //对不同类型iphone数量进行统计
109             for(intwritable count : values){
110                 total+=count.get();
111             }            
112             context.write(new text(type),new text(total.tostring()));
113         }
114     }
115 
116     public int run(string[] arg0) throws exception {
117         // todo 自动生成的方法存根
118          // todo auto-generated method stub
119         job job=job.getinstance(getconf());
120         job.setjarbyclass(salemanager.class);
121         //注册key/value类型为text
122         job.setoutputkeyclass(text.class);
123         job.setoutputvalueclass(text.class);
124         //若map的转出key/value不相同是需要分别注册
125         job.setmapoutputkeyclass(phonekey.class);
126         job.setmapoutputvalueclass(intwritable.class);
127         //注册mapper及reducer处理类
128         job.setmapperclass(mymapper.class);
129         job.setreducerclass(myreducer.class);
130         //输入输出数据格式化类型为textinputformat
131         job.setinputformatclass(textinputformat.class);
132         job.setoutputformatclass(textoutputformat.class);
133         //伪分布式情况下不设置时默认为1
134         job.setnumreducetasks(4);
135         //获取命令参数
136         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
137         //设置读入文件路径
138         fileinputformat.setinputpaths(job,new path(args[0]));
139         //设置转出文件路径
140         fileoutputformat.setoutputpath(job,new path(args[1]));
141         boolean status=job.waitforcompletion(true);
142         if(status)
143             return 0;
144         else
145             return 1;
146     }
147     
148     public static void main(string[] args) throws exception{
149         configuration conf=new configuration();
150         toolrunner.run(new salemanager(), args);      
151     } 
152 }

计算结果与第三节相同
Apache2.2+Tomcat7.0整合配置详解

Apache2.2+Tomcat7.0整合配置详解

在这个例子中只是为了让大家更好地了解自定义键类型的使用方法,而在真实环境中,自定义键类型,主要作为区分数据的标准。如果需要更好地平衡服务器资源,分配 reducer 数据处理的负荷,还是要通过自定义的 partitioner 进行管理。

六、实现数据排序与分组处理

6.1 rawcomparator 接口介绍

在实际的应用场景当中,很可能会用到第三方类库作为键类型,但我们无法直接对源代码进行修改。为此系统定义了 rawcomparator 接口,假设第三方类已实现了 writable 接口,用户可通过自定义类实现 rawcomparator 接口,通过 job.setsortcomparatorclass(rawcomparator.class) 设置即可。rawcomparator 继承了  java.util.comparator 接口,并添加了 int compare(byte[] b1,int s1, int l1,byte[] b2 ,int s2, int l2) 方法。此方法最简单的实现方式是通过 writable 实例中的 readfield 重构对象,然后使用通用类的 compareto 完成排序。

public interface rawcomparator<t> extends comparator<t> {
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

下面例子假设 phonewritable 是第三方类库中的值类型,我们无法直接修改,但系统需要把 phonewritable 用作 key 处理,按照不同地区不同型号进行排序计算出手机的销售情况。此时可建立 phonecomparator 类并实现 rawcomparator 接口,在主程序中通过 job.setsortcomparatorclass(phonecomparator.class) 设置此接口的实现类。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring().trim();
  9        this.count=integer.valueof(data[1].tostring().trim());
 10        this.area=data[2].tostring().trim();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class phonewritable implements writable {
 27     public text type=new text();
 28     public intwritable count=new intwritable();
 29     public text area=new text();
 30     
 31     public phonewritable(){
 32         
 33     }
 34 
 35     public phonewritable(string type,integer count,string area){        
 36         this.type=new text(type);
 37         this.count=new intwritable(count);
 38         this.area=new text(area);
 39     }
 40     
 41     public text gettype() {
 42         return type;
 43     }
 44 
 45     public void settype(text type) {
 46         this.type = type;
 47     }
 48 
 49     public intwritable getcount() {
 50         return count;
 51     }
 52 
 53     public void setcount(intwritable count) {
 54         this.count = count;
 55     }
 56 
 57     public text getarea() {
 58         return area;
 59     }
 60 
 61     public void setarea(text area) {
 62         this.area = area;
 63     }
 64 
 65     @override
 66     public void readfields(datainput arg0) throws ioexception {
 67         // todo 自动生成的方法存根
 68         this.type.readfields(arg0);
 69         this.count.readfields(arg0);
 70         this.area.readfields(arg0);
 71     }
 72 
 73     @override
 74     public void write(dataoutput arg0) throws ioexception {
 75         // todo 自动生成的方法存根
 76         this.type.write(arg0);
 77         this.count.write(arg0);
 78         this.area.write(arg0);
 79     }
 80 }
 81 
 82 public class phonecomparator implements rawcomparator<phonewritable> {
 83     private datainputbuffer buffer=null;
 84     private phonewritable phone1=null;
 85     private phonewritable phone2=null;
 86     
 87     public phonecomparator(){
 88         buffer=new datainputbuffer();
 89         phone1=new phonewritable();
 90         phone2=new phonewritable();
 91     }
 92     
 93     @override
 94     public int compare(phonewritable o1, phonewritable o2) {
 95         // todo 自动生成的方法存根
 96         if(!o1.getarea().equals(o2.getarea()))
 97             return o1.getarea().compareto(o2.getarea());
 98         else
 99             return o1.gettype().compareto(o2.gettype());
100     }
101 
102     @override
103     public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
104         // todo 自动生成的方法存根
105         try {
106             buffer.reset(arg0,arg1,arg2);
107             phone1.readfields(buffer);
108             buffer.reset(arg3,arg4,arg5);
109             phone2.readfields(buffer);
110         } catch (ioexception e) {
111             // todo 自动生成的 catch 块
112             e.printstacktrace();
113         }
114         return this.compare(phone1, phone2);
115     }
116 
117 }
118 
119 public class salemanager extends configured implements tool{
120     
121     public static class mymapper extends mapper<longwritable,text,phonewritable,intwritable>{
122         
123         public void map(longwritable key,text value,context context)
124             throws ioexception,interruptedexception{
125             string data=value.tostring();
126             phone iphone=new phone(data);
127             phonewritable phone=new phonewritable(iphone.gettype(),iphone.getcount(),iphone.getarea());
128             context.write(phone, phone.getcount());
129         }      
130     }
131     
132     public static class myreducer extends reducer<phonewritable,intwritable,text,text>{        
133         public void reduce(phonewritable phone,iterable<intwritable> values,context context)
134             throws ioexception,interruptedexception{
135             //对不同类型iphone数量进行统计
136             integer total=new integer(0);
137             
138             for(intwritable count : values){
139                 total+=count.get();
140             }
141             context.write(new text(phone.getarea()+" "+phone.gettype()),new text(total.tostring()));
142         }
143     }
144 
145     public int run(string[] arg0) throws exception {
146         // todo 自动生成的方法存根
147          // todo auto-generated method stub
148         job job=job.getinstance(getconf());
149         job.setjarbyclass(salemanager.class);
150         //注册key/value类型为text
151         job.setoutputkeyclass(text.class);
152         job.setoutputvalueclass(text.class);
153         //若map的转出key/value不相同是需要分别注册
154         job.setmapoutputkeyclass(phonewritable.class);
155         job.setsortcomparatorclass(phonecomparator.class);
156         job.setmapoutputvalueclass(intwritable.class);
157         //注册mapper及reducer处理类
158         job.setmapperclass(mymapper.class);
159         job.setreducerclass(myreducer.class);
160         //输入输出数据格式化类型为textinputformat
161         job.setinputformatclass(textinputformat.class);
162         job.setoutputformatclass(textoutputformat.class);
163         //伪分布式情况下不设置时默认为1
164         job.setnumreducetasks(1);
165         //获取命令参数
166         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
167         //设置读入文件路径
168         fileinputformat.setinputpaths(job,new path(args[0]));
169         //设置转出文件路径
170         fileoutputformat.setoutputpath(job,new path(args[1]));
171         boolean status=job.waitforcompletion(true);
172         if(status)
173             return 0;
174         else
175             return 1;
176     }
177     
178     public static void main(string[] args) throws exception{
179         configuration conf=new configuration();
180         toolrunner.run(new salemanager(), args);      
181     } 
182 }

计算结果

Apache2.2+Tomcat7.0整合配置详解

 

6.2 writablecomparator 类介绍

writablecomparator 是系统自带的接口 rawcomparartor 实现类,它实现了 rawcomparator 接口的两个基础方法 int compare(object , object ) 与 int compare(byte[] b1,int s1, int l1,byte[] b2 ,int s2, int l2)

Apache2.2+Tomcat7.0整合配置详解

通过反编译查看源代码可知道,系统也是通过 writablecomparable 接口的 readfield 方法重构对象,然后调用 int compareto (writablecomparable,writablecomparable) 方法完成排序的。因此一般情况下我们在继承 writablecomparator 类实现排序时,只需要重构此方法实现业务逻辑即可。

Apache2.2+Tomcat7.0整合配置详解

 

6.3 利用 writablecomparator 实现数据排序

假设系统原有的键类型 phonekey 是以手机类型 type作为排序标准,现在我们需要通过 writablecomparator 把排序标准修改为按先按地区 area 再按类型 type 排序。按照第上节所述,我们只需要继承 writablecomparator 类,重写 int compareto (writablecomparable,writablecomparable),按照地区 area 及 类型 type 进行排序,最后使用 job.setsortcomparatorclass(class<? extends rawcomparator> cls) 设置排序方式即可。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring().trim();
  9        this.count=integer.valueof(data[1].tostring().trim());
 10        this.area=data[2].tostring().trim();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class phonekey implements writablecomparable<phonekey> {
 27     public text type=new text();
 28     public text area=new text();
 29     
 30     public phonekey(){
 31         
 32     }
 33 
 34     public phonekey(string type,string area){        
 35         this.type=new text(type);
 36         this.area=new text(area);
 37     }
 38     
 39     public text gettype() {
 40         return type;
 41     }
 42 
 43     public void settype(text type) {
 44         this.type = type;
 45     }
 46 
 47     public text getarea() {
 48         return area;
 49     }
 50 
 51     public void setarea(text area) {
 52         this.area = area;
 53     }
 54 
 55     @override
 56     public void readfields(datainput arg0) throws ioexception {
 57         // todo 自动生成的方法存根
 58         this.type.readfields(arg0);
 59         this.area.readfields(arg0);
 60     }
 61 
 62     @override
 63     public void write(dataoutput arg0) throws ioexception {
 64         // todo 自动生成的方法存根
 65         this.type.write(arg0);
 66         this.area.write(arg0);
 67     }
 68 
 69     @override
 70     public int compareto(phonekey o) {
 71         // todo 自动生成的方法存根
 72         return this.type.compareto(o.type);
 73     }
 74  
 75     @override
 76     public boolean equals(object o){
 77         if(!(o instanceof phonekey)){
 78             return false;
 79         }
 80         phonekey phone=(phonekey) o;
 81         return this.area.equals(phone.area);
 82     }
 83     
 84     @override
 85     public int hashcode(){
 86         return this.area.hashcode();
 87     }
 88 }
 89 
 90 public class phonesortcomparator extends writablecomparator{
 91 
 92     public phonesortcomparator(){
 93         super(phonekey.class,true);
 94     }
 95     
 96     @override
 97     public int compare(writablecomparable a,writablecomparable b){
 98         phonekey key1=(phonekey) a;
 99         phonekey key2=(phonekey) b;
100         if(!key1.getarea().equals(key2.getarea()))
101             return key1.getarea().compareto(key2.getarea());
102         else
103             return key1.gettype().compareto(key2.gettype());
104     }
105 }
106 
107 public class salemanager extends configured implements tool{
108     
109     public static class mymapper extends mapper<longwritable,text,phonekey,intwritable>{
110         
111         public void map(longwritable key,text value,context context)
112             throws ioexception,interruptedexception{
113             string data=value.tostring();
114             phone iphone=new phone(data);
115             phonekey phone=new phonekey(iphone.gettype(),iphone.getarea());
116             context.write(phone, new intwritable(iphone.getcount()));
117         }      
118     }
119     
120     public static class myreducer extends reducer<phonekey,intwritable,text,text>{        
121         public void reduce(phonekey phone,iterable<intwritable> values,context context)
122             throws ioexception,interruptedexception{
123             //对不同类型iphone数量进行统计
124             integer total=new integer(0);
125             
126             for(intwritable count : values){
127                 total+=count.get();
128             }
129             context.write(new text(phone.getarea()+" "+phone.gettype()+": "),new text(total.tostring()));
130         }
131     }
132 
133     public int run(string[] arg0) throws exception {
134         // todo 自动生成的方法存根
135          // todo auto-generated method stub
136         job job=job.getinstance(getconf());
137         job.setjarbyclass(salemanager.class);
138         //注册key/value类型为text
139         job.setoutputkeyclass(text.class);
140         job.setoutputvalueclass(text.class);
141         //若map的转出key/value不相同是需要分别注册
142         job.setmapoutputkeyclass(phonekey.class);
143         job.setmapoutputvalueclass(intwritable.class);
144         //设置排序类型 sortcomparator
145         job.setsortcomparatorclass(phonesortcomparator.class);
146         //注册mapper及reducer处理类
147         job.setmapperclass(mymapper.class);
148         job.setreducerclass(myreducer.class);
149         //输入输出数据格式化类型为textinputformat
150         job.setinputformatclass(textinputformat.class);
151         job.setoutputformatclass(textoutputformat.class);
152         //伪分布式情况下不设置时默认为1
153         job.setnumreducetasks(1);
154         //获取命令参数
155         string[] args=new genericoptionsparser(getconf(),arg0).getremainingargs();
156         //设置读入文件路径
157         fileinputformat.setinputpaths(job,new path(args[0]));
158         //设置转出文件路径
159         fileoutputformat.setoutputpath(job,new path(args[1]));
160         boolean status=job.waitforcompletion(true);
161         if(status)
162             return 0;
163         else
164             return 1;
165     }
166     
167     public static void main(string[] args) throws exception{
168         configuration conf=new configuration();
169         toolrunner.run(new salemanager(), args);      
170     } 
171 }

从计算结果可以看到数据是先按照地区 area 再按手机型号 type 进行排序的

Apache2.2+Tomcat7.0整合配置详解

 

6.4 利用 writablecomparator 实现数据分组

在 6.3 节的例子中,数据是先按照地区号 area 再按手机类型 type 进行排序的,因此在 reduce 方法中根据 iterable 集合计算出来的将会同一地区同一类型的手机。若此时需要对同一地区所有手机类型的销售情况进行合计,可以使用 groupingcomparator 分组计算方式 。其方法是继承 writablecomparator 类,重写 int compareto (writablecomparable,writablecomparable),以地区号 area 作为分组标识,最后使用 job.setgroupcomparatorclass(class<? extends rawcomparator> cls) 设置分组方式即可。

  1 public class phone {
  2     public string type;
  3     public integer count;
  4     public string area;
  5     
  6     public phone(string line){
  7        string[] data=line.split(",");
  8        this.type=data[0].tostring().trim();
  9        this.count=integer.valueof(data[1].tostring().trim());
 10        this.area=data[2].tostring().trim();
 11     }
 12     
 13     public string gettype(){
 14         return this.type;
 15     }
 16     
 17     public integer getcount(){
 18         return this.count;
 19     }
 20     
 21     public string getarea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class phonekey

                    
                
(0)
打赏 Apache2.2+Tomcat7.0整合配置详解 微信扫一扫

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

Apache2.2+Tomcat7.0整合配置详解
验证码: Apache2.2+Tomcat7.0整合配置详解