全排序与自定义Partitioner分区案例
程序员文章站
2024-01-12 14:27:28
...
**
全排序与自定义Partitioner分区案例:
**
由于之前做过Partitioner分区与全排列,那么把两个内容放在一起将会很好理解,由于全排列案例并没有上传到hadoop集群测试(有点忙)。这次就将jar包运行到集群上测试:
1). 实现测试文件个字段的封装
package com.Sort.Whole;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private Float Na_Sales; //NA_Sales字段
private Float Other_Sales; //Other_Sales字段
private Float sum_Sales; //sum_Sales字段
public FlowBean() {
super();
}
public FlowBean(Float na_Sales, Float other_Sales) {
Na_Sales = na_Sales;
Other_Sales = other_Sales;
sum_Sales = Na_Sales + Other_Sales;
}
//比较
public int compareTo(FlowBean flowBean) {
int result;
if (sum_Sales > flowBean.getSum_Sales()){
result = -1;
}else if(sum_Sales < flowBean.getSum_Sales()){
result = 1;
}else{
result = 0;
}
return result;
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeFloat(Na_Sales);
dataOutput.writeFloat(Other_Sales);
dataOutput.writeFloat(sum_Sales);
}
//反序列
public void readFields(DataInput dataInput) throws IOException {
Na_Sales = dataInput.readFloat();
Other_Sales = dataInput.readFloat();
sum_Sales = dataInput.readFloat();
}
public Float getNa_Sales() {
return Na_Sales;
}
public void setNa_Sales(Float na_Sales) {
Na_Sales = na_Sales;
}
public Float getOther_Sales() {
return Other_Sales;
}
public void setOther_Sales(Float other_Sales) {
Other_Sales = other_Sales;
}
public Float getSum_Sales() {
return sum_Sales;
}
public void setSum_Sales(Float sum_Sales) {
this.sum_Sales = sum_Sales;
}
@Override
public String toString() {
return Na_Sales +
"\t" + Other_Sales +
"\t" + sum_Sales
;
}
}
2). Map程序
package com.Sort.Whole;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMappper extends Mapper<LongWritable, Text,FlowBean,Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Action 877.83 1751.18 2629.01
//获取一行
String line = value.toString();
//2. 切割
String[] fields = line.split(",");
//3.封装对象
String Genre = fields[0];
Float Na_Sales = Float.parseFloat(fields[fields.length-3]);
Float Other_Sales = Float.parseFloat(fields[fields.length-2]);
Float sum_Sales = Float.parseFloat(fields[fields.length-1]);
k.setNa_Sales(Na_Sales);
k.setOther_Sales(Other_Sales);
k.setSum_Sales(sum_Sales);
v.set(Genre);
//4.写出
context.write(k,v);
}
}
3).Reduce程序
package com.Sort.Whole;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//Action 877.83 1751.18 2629.01
for (Text value : values) {
context.write(value, key);
}
}
}
我测试文件的内容如下:
4). 自定义Partitioner类:
package com.Sort.Whole;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionSort extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int i) {
//取出Genre字段第一个字母,按照首字母相同放入一个分区
String val = text.toString().substring(0, 1);
int result = 5;
if ("A".equals(val)) { //第一分区part-r-00000
result = 0;
} else if ("P".equals(val)) { //第二个分区part-r-00001
result = 1;
} else if ("S".equals(val)) { //第三个分区part-r-00002
result = 2;
} else if ("R".equals(val)) { //第四个分区part-r-00003
result = 3;
}else{ //首字母不是APSR的放入第五个分区part-r-00004
result = 4;
}
return result;
}
}
上述自定义Partitioner类将会第一列数据中每一个首字母进行分区,第一分区part-r-00000文件是以A字母开头,第二分区part-r-00001是以P字母开头
,第三分区part-r-00002是以S开头,第四分区part-r-00003是以R开头,其他首字母不是以上字母的分到第五分区part-r-00004
5). 主程序
package com.Sort.Whole;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//jar包,运行在hadoop集群上,不需要这行
args = new String[]{"E:\\谷歌文件\\谷歌数据集\\","F:\\scala\\Workerhdfs\\output5"};
//1.获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar路径
job.setJarByClass(FlowCountSortDriver.class);
//3.关联mapper和reducer
job.setMapperClass(FlowCountSortMappper.class);
job.setReducerClass(FlowCountSortReducer.class);
//4 设置mapper输出的key和value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5. 设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setPartitionerClass(PartitionSort.class);
job.setNumReduceTasks(5);
//6.设置输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
效果如下:
运行于hadoop集群测试:
程序运行期间,发现以下警告信息:
WARN io.ReadaheadPool: Failed readahead on ifile
EBADF: Bad file descriptor
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:267)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:146)
at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我暂时也没想明白,分区文件还是正确的。望有大神解决这个警告信息
上一篇: 浅谈本地WAMP环境的搭建
下一篇: requirejs入门教程(附下载地址)