欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop学习之路(6)MapReduce自定义分区实现

程序员文章站 2022-04-18 18:11:15
MapReduce自带的分区器是HashPartitioner 原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。 自定义分分区需要继承Partitioner,复写getpariton()方法 自定义分区类: 注意: ......

mapreduce自带的分区器是hashpartitioner
原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。
Hadoop学习之路(6)MapReduce自定义分区实现
自定义分分区需要继承partitioner,复写getpariton()方法
自定义分区类:
Hadoop学习之路(6)MapReduce自定义分区实现
注意:map的输出是<k,v>键值对
其中int partitionindex = dict.get(text.tostring())partitionindex是获取k的值

附:被计算的的文本

dear dear bear bear river car dear dear  bear rive
dear dear bear bear river car dear dear  bear rive

需要在main函数中设置,指定自定义分区类
Hadoop学习之路(6)MapReduce自定义分区实现
自定义分区类:

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.partitioner;
import java.util.hashmap;
public class custompartitioner extends partitioner<text, intwritable> {
    public static hashmap<string, integer> dict = new hashmap<string, integer>();
    //text代表着map阶段输出的key,intwritable代表着输出的值
    static{
        dict.put("dear", 0);
        dict.put("bear", 1);
        dict.put("river", 2);
        dict.put("car", 3);
    }
    public int getpartition(text text, intwritable intwritable, int i) {
        //
        int partitionindex = dict.get(text.tostring());
        return partitionindex;
    }
}

注意:map的输出结果是键值对<k,v>,int partitionindex = dict.get(text.tostring());中的partitionindex是map输出键值对中的键的值,也就是k的值。
maper类:

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;

import java.io.ioexception;

public class wordcountmap extends mapper<longwritable, text, text, intwritable> {
    public void map(longwritable key, text value, context context)
            throws ioexception, interruptedexception {
        string[] words = value.tostring().split("\t");
        for (string word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new text(word), new intwritable(1));
        }
    }
}

reducer类:

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
import java.io.ioexception;
public class wordcountmap extends mapper<longwritable, text, text, intwritable> {
    public void map(longwritable key, text value, context context)
            throws ioexception, interruptedexception {
        string[] words = value.tostring().split("\t");
        for (string word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new text(word), new intwritable(1));
        }
    }
}

main函数:

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import java.io.ioexception;
public class wordcountmain {
    public static void main(string[] args) throws ioexception,
            classnotfoundexception, interruptedexception {
        if (args.length != 2 || args == null) {
            system.out.println("please input path!");
            system.exit(0);
        }
        configuration configuration = new configuration();
        configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-snapshot.jar");
        job job = job.getinstance(configuration, wordcountmain.class.getsimplename());
        // 打jar包
        job.setjarbyclass(wordcountmain.class);
        // 通过job设置输入/输出格式
        //job.setinputformatclass(textinputformat.class);
        //job.setoutputformatclass(textoutputformat.class);
        // 设置输入/输出路径
        fileinputformat.setinputpaths(job, new path(args[0]));
        fileoutputformat.setoutputpath(job, new path(args[1]));
        // 设置处理map/reduce阶段的类
        job.setmapperclass(wordcountmap.class);
        //map combine
        //job.setcombinerclass(wordcountreduce.class);
        job.setreducerclass(wordcountreduce.class);
        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
        //job.setmapoutputkeyclass(.class)
        // 设置最终输出key/value的类型m
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(intwritable.class);
        job.setpartitionerclass(custompartitioner.class);
        job.setnumreducetasks(4);
        // 提交作业
        job.waitforcompletion(true);

    }
}

main函数参数设置:
Hadoop学习之路(6)MapReduce自定义分区实现