欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

全排序与自定义Partitioner分区案例

程序员文章站 2024-01-12 14:27:28
...

**

全排序与自定义Partitioner分区案例:

**
由于之前做过Partitioner分区与全排列,那么把两个内容放在一起将会很好理解,由于全排列案例并没有上传到hadoop集群测试(有点忙)。这次就将jar包运行到集群上测试:

1). 实现测试文件个字段的封装

package com.Sort.Whole;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
    private Float Na_Sales;  //NA_Sales字段
    private Float Other_Sales; //Other_Sales字段
    private Float sum_Sales;  //sum_Sales字段

    public FlowBean() {
        super();
    }

    public FlowBean(Float na_Sales, Float other_Sales) {
        Na_Sales = na_Sales;
        Other_Sales = other_Sales;
        sum_Sales = Na_Sales + Other_Sales;
    }

    //比较
    public int compareTo(FlowBean flowBean) {
        int result;
        if (sum_Sales > flowBean.getSum_Sales()){
            result = -1;
        }else if(sum_Sales < flowBean.getSum_Sales()){
            result = 1;
        }else{
            result = 0;
        }
        return result;
    }

    //序列化
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeFloat(Na_Sales);
        dataOutput.writeFloat(Other_Sales);
        dataOutput.writeFloat(sum_Sales);
    }

    //反序列
    public void readFields(DataInput dataInput) throws IOException {
        Na_Sales = dataInput.readFloat();
        Other_Sales = dataInput.readFloat();
        sum_Sales = dataInput.readFloat();
    }

    public Float getNa_Sales() {
        return Na_Sales;
    }

    public void setNa_Sales(Float na_Sales) {
        Na_Sales = na_Sales;
    }

    public Float getOther_Sales() {
        return Other_Sales;
    }

    public void setOther_Sales(Float other_Sales) {
        Other_Sales = other_Sales;
    }

    public Float getSum_Sales() {
        return sum_Sales;
    }

    public void setSum_Sales(Float sum_Sales) {
        this.sum_Sales = sum_Sales;
    }

    @Override
    public String toString() {
        return  Na_Sales +
                "\t" + Other_Sales +
                "\t" + sum_Sales
                ;
    }
}

2). Map程序

package com.Sort.Whole;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMappper extends Mapper<LongWritable, Text,FlowBean,Text> {

    FlowBean k = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Action	877.83	1751.18	2629.01

        //获取一行
        String line = value.toString();

        //2. 切割
        String[] fields = line.split(",");

        //3.封装对象
        String Genre = fields[0];
        Float Na_Sales = Float.parseFloat(fields[fields.length-3]);
        Float Other_Sales = Float.parseFloat(fields[fields.length-2]);
        Float sum_Sales = Float.parseFloat(fields[fields.length-1]);

       k.setNa_Sales(Na_Sales);
       k.setOther_Sales(Other_Sales);
       k.setSum_Sales(sum_Sales);
        v.set(Genre);

        //4.写出
        context.write(k,v);


    }
}

3).Reduce程序

package com.Sort.Whole;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //Action	877.83	1751.18	2629.01

        for (Text value : values) {
            context.write(value, key);
        }
    }
}

我测试文件的内容如下:
全排序与自定义Partitioner分区案例

4). 自定义Partitioner类:

package com.Sort.Whole;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionSort extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int i) {
        //取出Genre字段第一个字母,按照首字母相同放入一个分区
        String val = text.toString().substring(0, 1);
        int result = 5;
        if ("A".equals(val)) {     //第一分区part-r-00000
            result = 0;
        } else if ("P".equals(val)) {   //第二个分区part-r-00001
            result = 1;
        } else if ("S".equals(val)) {  //第三个分区part-r-00002
            result = 2;
        } else if ("R".equals(val)) {  //第四个分区part-r-00003
             result = 3;
        }else{   //首字母不是APSR的放入第五个分区part-r-00004
            result = 4;
        }


        return result;
    }
}

上述自定义Partitioner类将会第一列数据中每一个首字母进行分区,第一分区part-r-00000文件是以A字母开头,第二分区part-r-00001是以P字母开头
,第三分区part-r-00002是以S开头,第四分区part-r-00003是以R开头,其他首字母不是以上字母的分到第五分区part-r-00004

5). 主程序

package com.Sort.Whole;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//jar包,运行在hadoop集群上,不需要这行
   args = new String[]{"E:\\谷歌文件\\谷歌数据集\\","F:\\scala\\Workerhdfs\\output5"};
        //1.获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2.设置jar路径
        job.setJarByClass(FlowCountSortDriver.class);

        //3.关联mapper和reducer
        job.setMapperClass(FlowCountSortMappper.class);
        job.setReducerClass(FlowCountSortReducer.class);
        //4 设置mapper输出的key和value类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //5. 设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setPartitionerClass(PartitionSort.class);
        job.setNumReduceTasks(5);
        //6.设置输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

效果如下:

全排序与自定义Partitioner分区案例

运行于hadoop集群测试:
全排序与自定义Partitioner分区案例
全排序与自定义Partitioner分区案例

程序运行期间,发现以下警告信息:

WARN io.ReadaheadPool: Failed readahead on ifile
EBADF: Bad file descriptor
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:267)
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:146)
	at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	

我暂时也没想明白,分区文件还是正确的。望有大神解决这个警告信息
全排序与自定义Partitioner分区案例

相关标签: hadoop