第一个MapReduce任务
程序员文章站
2022-03-10 19:17:09
...
前两天在公司内网上搭了个2个节点hadoop集群,暂时没有多大实际意义,仅用作自己的测试。遇到的问题在阿里巴巴这位仁兄的《Hadoop集群配置和使用技巧
》都有提到的。也遇到了reduce任务卡住的问题,只需要在每个节点的/etc/hosts将集群中的机器都配置上即可解决。
今天将一个日志统计任务用Hadoop
MapReduce框架重新实现了一次,数据量并不大,每天分析一个2G多的日志文件罢了。先前是用Ruby配合cat、grep命令搞定,运行一次在
50多秒左右,如果纯粹采用Ruby的话CPU占用率非常高而且慢的无法忍受,利用IO.popen调用linux的cat、grep命令先期处理就好多
了。看看这个MapReduce任务:
public class GameCount extends Configured implements org.apache.hadoop.util.Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Pattern pattern; public void configure(JobConf job) { String gameName = job.get("mapred.mapper.game"); pattern = Pattern.compile("play\\sgame\\s" + gameName + ".*uid=(\\d+),score=(-?\\d+),money=(-?\\d+)"); } @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String text = value.toString(); Matcher matcher = pattern.matcher(text); int total = 0; // 总次数 while (matcher.find()) { int record = Integer.parseInt(matcher.group(2)); output.collect(new Text(matcher.group(1)), new IntWritable( record)); total += 1; } output.collect(new Text("total"), new IntWritable(total)); } } public static class ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out .println("gamecount [-m <maps>] [-r <reduces>] <input> <output> <gamename>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), GameCount.class); conf.setJobName("gamecount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i - 1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 3) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); conf.set("mapred.mapper.game", args[2]); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { long start = System.nanoTime(); int res = ToolRunner.run(new Configuration(), new GameCount(), args); System.out.println("running time:" + (System.nanoTime() - start) / 1000000 + " ms"); System.exit(res); } }
代码没啥好解释的,就是分析类似"play game DouDiZhu result:uid=1871653,score=-720,money=0"这样的字符串,分析每天玩家玩游戏的次数、分数等。打包成GameCount.jar,执行:
hadoop jar GameCount.jar test.GameCount /usr/logs/test.log /usr/output DouDiZhu
统计的运行时间在100多秒,适当增加map和reduce任务个数没有多大改善,不过CPU占用率还是挺高的。
上一篇: jps 进程状态工具
下一篇: html之百度地图接口的使用(代码实例)