AggregateWordCount源代码注释
程序员文章站
2022-06-06 17:51:03
...
package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.StringTokenizer; import java.util.Map.Entry; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor; import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob; /** 这个是hadoop的map/reduce的例子,是对例子WordCount利用系统已经实现的map/reduce类进行简化。系统已经实现的ValueAggregatorBaseDescriptor 和ValueAggregatorJob已经实现各种数据类型的求和最大值,最小值的算法。类型如下: UniqValueCount LongValueSum DoubleValueSum ValueHistogram LongValueMax LongValueMin StringValueMax StringValueMin 具体请看相关的源代码。 这个job的执行必须用-jarlibs执行,不然会报configured错误。 执行命令如下: hadoop jar hadoop-example.jar -libjars hadoop-example.jar shakepoems.text out_aggregate_his 3 textinputformat * This is an example Aggregated Hadoop Map/Reduce application. It reads the * text input files, breaks each line into words and counts them. The output is * a locally sorted list of words and the count of how often they occurred. * * To run: bin/hadoop jar hadoop-*-examples.jar aggregatewordcount <i>in-dir</i> * <i>out-dir</i> <i>numOfReducers</i> textinputformat * */ public class AggregateWordCount { /*继承类ValueAggregatorBaseDescriptor */ public static class WordCountPlugInClass extends ValueAggregatorBaseDescriptor { @Override public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key, Object val) { String countType = LONG_VALUE_SUM;//指定算法类型是long类型的求和 ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>(); String line = val.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { Entry<Text, Text> e = generateEntry(countType, itr.nextToken(), ONE); if (e != null) { retv.add(e); } } return retv; } } /**用静态类ValueAggregatorJob执行job * The main driver for word count map/reduce program. Invoke this method to * submit the map/reduce job. * * @throws IOException * When there is communication problems with the job tracker. */ @SuppressWarnings("unchecked") public static void main(String[] args) throws IOException { JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args , new Class[] {WordCountPlugInClass.class}); JobClient.runJob(conf); } }
推荐阅读