如何在Hadoop里面实现二次排序
程序员文章站
2022-05-25 09:41:14
...
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:
要求输出1:
要求输出2:
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
在eclipse下,执行,打印日志内容如下:
执行完,我们在输出目录里里面查看
执行完,内容如下:
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12
要求输出1:
a 45 b 12,567 三劫 1,32,899 秦东亮 34,72,100
要求输出2:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
package com.qin.groupsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /** * @author qindongliang * * 大数据交流群:376932160 * * * **/ public class GroupSort { /** * map任务 * * */ public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{ private DescSort tx=new DescSort(); private IntWritable second=new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("执行map"); // System.out.println("进map了"); //mos.write(namedOutput, key, value); String ss[]=value.toString().split(";"); String mkey=ss[0]; int mvalue=Integer.parseInt(ss[1]); tx.setFirstKey(mkey); tx.setSecondKey(mvalue); second.set(mvalue); context.write(tx, second); } } /*** * Reduce任务 * * **/ public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{ @Override protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx) throws IOException, InterruptedException { System.out.println("执行reduce"); StringBuffer sb=new StringBuffer(); for(IntWritable t:arg1){ // sb.append(t).append(","); //con ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString())); /**这种写法,是这种输出 *a 45 *b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100 */ } if(sb.length()>0){ sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号 } // 在循环里拼接,在循环外输出是这种格式 // b 12,567 // 三劫 1,32,899 // 秦东亮 34,72,100 // ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString())); } } /*** * * 自定义组合键 * **/ public static class DescSort implements WritableComparable{ public DescSort() { // TODO Auto-generated constructor stub } private String firstKey; private int secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } // @Override // public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, // int arg4, int arg5) { // return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 // } // // @Override // public int compare(Object a, Object b) { // // return -super.compare(a, b);//注意使用负号来完成降序 // } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub DescSort d=(DescSort)o; //this在前代表升序 return this.getFirstKey().compareTo(d.getFirstKey()); } } /** * 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 * * **/ public static class TextComparator extends WritableComparator{ public TextComparator() { // TODO Auto-generated constructor stub super(DescSort.class,true);//注册Comparator } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("执行TextComparator分组排序"); DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; return d1.getFirstKey().compareTo(d2.getFirstKey()); } } /** * 组内排序的策略 * 按照第二个字段排序 * * */ public static class TextIntCompartator extends WritableComparator{ public TextIntCompartator() { super(DescSort.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; System.out.println("执行组内排序TextIntCompartator"); if(!d1.getFirstKey().equals(d2.getFirstKey())){ return d1.getFirstKey().compareTo(d2.getFirstKey()); }else{ return d1.getSecondKey()-d2.getSecondKey();//0,-1,1 } } } /** * 分区策略 * * */ public static class KeyPartition extends Partitioner<DescSort, IntWritable>{ @Override public int getPartition(DescSort key, IntWritable arg1, int arg2) { // TODO Auto-generated method stub System.out.println("执行自定义分区KeyPartition"); return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2; } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(GroupSort.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); // job.setNumReduceTasks(3);//设置为3 job.setMapperClass(GMapper.class); job.setReducerClass(GReduce.class); /**设置分区函数*/ job.setPartitionerClass(KeyPartition.class); //分组函数,Reduce前的一次排序 job.setGroupingComparatorClass(TextComparator.class); //组内排序Map输出完毕后,对key进行的一次排序 job.setSortComparatorClass(TextIntCompartator.class); //TextComparator.class //TextIntCompartator.class // job.setGroupingComparatorClass(TextIntCompartator.class); //组内排序Map输出完毕后,对key进行的一次排序 // job.setSortComparatorClass(TextComparator.class); job.setMapOutputKeyClass(DescSort.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在eclipse下,执行,打印日志内容如下:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=86 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=162 INFO - Counters.log(589) | HDFS_BYTES_READ=205 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=93 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=162 INFO - Counters.log(589) | Map input records=9 INFO - Counters.log(589) | Reduce shuffle bytes=162 INFO - Counters.log(589) | Spilled Records=18 INFO - Counters.log(589) | Map output bytes=138 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=970 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=9 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336 INFO - Counters.log(589) | Reduce output records=9 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488 INFO - Counters.log(589) | Map output records=9
执行完,我们在输出目录里里面查看
执行完,内容如下:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
上一篇: 使用MapReduce对数据文件进行切分
下一篇: 一个正则贪婪和非贪婪的疑问