欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  网络运营

Hadoop对文本文件的快速全局排序实现方法及分析

程序员文章站 2022-04-06 09:41:48
一、背景 hadoop中实现了用于全局排序的inputsampler类和totalorderpartitioner类,调用示例是org.apache.hadoop.e...

一、背景

hadoop中实现了用于全局排序的inputsampler类和totalorderpartitioner类,调用示例是org.apache.hadoop.examples.sort。

但是当我们以text文件作为输入时,结果并非按text中的string列排序,而且输出结果是sequencefile。

原因:

1) hadoop在处理text文件时,key是行号longwritable类型,inputsampler抽样的是key,totalorderpartitioner也是用key去查找分区。这样,抽样得到的partition文件是对行号的抽样,结果自然是根据行号来排序。

2)大数据量时,inputsampler抽样速度会非常慢。比如,randomsampler需要遍历所有数据,intervalsampler需要遍历文件数与splits数一样。splitsampler效率比较高,但它只抽取每个文件前面的记录,不适合应用于文件内有序的情况。

二、功能

1. 实现了一种局部抽样方法partialsampler,适用于输入数据各文件是独立同分布的情况

2. 使randomsampler、intervalsampler、splitsampler支持对文本的抽样

3. 实现了针对text文件string列的totalorderpartitioner

三、实现

1. partialsampler

partialsampler从第一份输入数据中随机抽取第一列文本数据。partialsampler有两个属性:freq(采样频率),numsamples(采样总数)。

public k[] getsample(inputformat<k,v> inf, jobconf job) throws ioexception {
   inputsplit[] splits = inf.getsplits(job, job.getnummaptasks());
   arraylist<k> samples = new arraylist<k>(numsamples);
   random r = new random();
   long seed = r.nextlong();
   r.setseed(seed);
   log.debug("seed: " + seed);   
   // 对splits【0】抽样
   for (int i = 0; i < 1; i++) {
    system.out.println("partialsampler will getsample splits["+i+"]");
    recordreader<k,v> reader = inf.getrecordreader(splits[i], job,
      reporter.null);
    k key = reader.createkey();
    v value = reader.createvalue();
    while (reader.next(key, value)) {
     if (r.nextdouble() <= freq) {
      if (samples.size() < numsamples) {
        // 选择value中的第一列抽样
        text value0 = new text(value.tostring().split("\t")[0]);     
        samples.add((k) value0);        
      } else {
       // when exceeding the maximum number of samples, replace a
       // random element with this one, then adjust the frequency
       // to reflect the possibility of existing elements being
       // pushed out
       int ind = r.nextint(numsamples);
       if (ind != numsamples) {
        text value0 = new text(value.tostring().split("\t")[0]); 
        samples.set(ind, (k) value0);
       }
       freq *= (numsamples - 1) / (double) numsamples;
      }
      key = reader.createkey();
     }
    }    
    reader.close();
   }
   return (k[])samples.toarray();
  }

首先通过inputformat的getsplits方法得到所有的输入分区;

然后扫描第一个分区中的记录进行采样。

记录采样的具体过程如下:

从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq

  如果大于则放弃这条记录;

  如果小于,则判断当前的采样数是否小于最大采样数,

    如果小于则这条记录被选中,被放进采样集合中;

    否则从【0,numsamples】中选择一个随机数,如果这个随机数不等于最大采样数numsamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numsamples-1)/numsamples。

然后依次遍历分区中的其它记录。

note:

1)partialsampler只适用于输入数据各文件是独立同分布的情况。

2)自带的三种sampler通过修改samples.add(key)为samples.add((k) value0); 也可以实现对第一列的抽样。

2. totalorderpartitioner

totalorderpartitioner主要改进了两点:

1)读partition时指定keyclass为text.class

因为partition文件中的key类型为text

在configure函数中,修改:

//class<k> keyclass = (class<k>)job.getmapoutputkeyclass();
class<k> keyclass = (class<k>)text.class;

2)查找分区时,改用value查

public int getpartition(k key, v value, int numpartitions) {
  text value0 = new text(value.tostring().split("\t")[0]); 
  return partitions.findpartition((k) value0);
 }

3. sort

1)设置inputformat、outputformat、outputkeyclass、outputvalueclass、mapoutputkeyclass

2)初始化inputsampler对象,抽样

3)partitionfile通过cachefile传给totalorderpartitioner,执行mapreduce任务

 class<? extends inputformat> inputformatclass = textinputformat.class;
  class<? extends outputformat> outputformatclass = textoutputformat.class;
  class<? extends writablecomparable> outputkeyclass = text.class;
  class<? extends writable> outputvalueclass = text.class;
  jobconf.setmapoutputkeyclass(longwritable.class);
  // set user-supplied (possibly default) job configs
  jobconf.setnumreducetasks(num_reduces);
  jobconf.setinputformat(inputformatclass);
  jobconf.setoutputformat(outputformatclass);
  jobconf.setoutputkeyclass(outputkeyclass);
  jobconf.setoutputvalueclass(outputvalueclass);
  if (sampler != null) {
   system.out.println("sampling input to effect total-order sort...");
   jobconf.setpartitionerclass(totalorderpartitioner.class);
   path inputdir = fileinputformat.getinputpaths(jobconf)[0];
   inputdir = inputdir.makequalified(inputdir.getfilesystem(jobconf));
   //path partitionfile = new path(inputdir, "_sortpartitioning");
   totalorderpartitioner.setpartitionfile(jobconf, partitionfile);
   inputsampler.<k,v>writepartitionfile(jobconf, sampler);
   uri partitionuri = new uri(partitionfile.tostring() + "#" + "_sortpartitioning");
   distributedcache.addcachefile(partitionuri, jobconf);
   distributedcache.createsymlink(jobconf);
  }
  filesystem hdfs = filesystem.get(jobconf);
  hdfs.delete(outputpath);
  hdfs.close();
  system.out.println("running on " +
    cluster.gettasktrackers() +
    " nodes to sort from " + 
    fileinputformat.getinputpaths(jobconf)[0] + " into " +
    fileoutputformat.getoutputpath(jobconf) +
    " with " + num_reduces + " reduces.");
  date starttime = new date();
  system.out.println("job started: " + starttime);
  jobresult = jobclient.runjob(jobconf);

四、执行

usage:

hadoop jar yitengfei.jar com.yitengfei.sort [-m <maps>] [-r <reduces>]
[-splitrandom <double pcnt> <numsamples> <maxsplits> | // sample from random splits at random (general)
-splitsample <numsamples> <maxsplits> | // sample from first records in splits (random data)
-splitinterval <double pcnt> <maxsplits>] // sample from splits at intervals (sorted data)
-splitpartial <double pcnt> <numsamples> <maxsplits> | // sample from partial splits at random (general) ]
<input> <output> <partitionfile>

example:

hadoop jar yitengfei.jar com.yitengfei.sort -r 10 -splitpartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

五、性能

200g输入数据,15亿条url,1000个分区,排序时间只用了6分钟

总结

以上就是本文关于hadoop对文本文件的快速全局排序实现方法及分析的全部内容,希望对大家有所帮助 ,感兴趣的朋友可以继续参阅本站:hadoop重新格式化hdfs步骤解析浅谈七种常见的hadoop和spark项目案例。如有不足之处,欢迎留言指出,感谢朋友们对本站的支持!