欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

Hadoop : 一个目录下的数据只由一个map处理

程序员文章站 2024-04-04 23:29:59
...

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在文

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

/path/to/directory1
/path/to/directory2
/path/to/directory3

代码里面按行读取:

 @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            FileSystem fs = FileSystem.get(context.getConfiguration());
            for (FileStatus status : fs.listStatus(new Path(value.toString()))) {
                // process file
            }
        }

都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

import java.io.IOException;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
 * 一个map处理一个目录的数据
 */
public abstract class OneMapOneDirectoryInputFormat extends CombineFileInputFormat {
    private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class);
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    @Override
    public List getSplits(JobContext job) throws IOException {
        // get all the files in input path
        List stats = listStatus(job);
        List splits = new ArrayList();
        if (stats.size() == 0) {
            return splits;
        }
        LOG.info("fileNums=" + stats.size());
        Map> map = new HashMap>();
        for (FileStatus stat : stats) {
            String directory = stat.getPath().getParent().toString();
            if (map.containsKey(directory)) {
                map.get(directory).add(stat);
            } else {
                List fileList = new ArrayList();
                fileList.add(stat);
                map.put(directory, fileList);
            }
        }
        // 设置inputSplit
        long currentLen = 0;
        List pathLst = new ArrayList();
        List offsetLst = new ArrayList();
        List lengthLst = new ArrayList();
        Iterator itr = map.keySet().iterator();
        while (itr.hasNext()) {
            String dir = itr.next();
            List fileList = map.get(dir);
            for (int i = 0; i  path[" + i + "]=" + pathArray[i].toString());
            }
            splits.add(thissplit);
            pathLst.clear();
            offsetLst.clear();
            lengthLst.clear();
            currentLen = 0;
        }
        return splits;
    }
    private long[] getLongArray(List lst) {
        long[] rst = new long[lst.size()];
        for (int i = 0; i 

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。