MapReduce读取HDFS,将结果写入MongoDB
参考:http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/
附件是我编译和下载好的jar包(hadoop版本:hadoop-0.20.2-cdh3u3),需要的可下载直接使用。
首先,编译MongoDB Adapter
1.下载源码:
https://github.com/mongodb/mongo-hadoop
2.修改build.sbt
hadoopRelease in ThisBuild := "cdh3"
3.编译:
./sbt package
4.编译好后
在target目录生成mongo-hadoop_cdh3u3-1.1.0-SNAPSHOT.jar
在core/target目录生成mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar
5.下载mongo-2.7.3.jar:
wget --no-check-certificate https://github.com/downloads/mongodb/mongo-java-driver/mongo-2.7.3.jar
6. 将这两个jar包拷贝至hadoop集群每个节点的$HADOOP_HOME/lib目录
cp mongo-2.7.3.jar $HADOOP_HOME/lib/
cp core/target/mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar $HADOOP_HOME/lib/
7.准备input文件:hadoop fs -put input.txt /tmp/input/
vi input.txt
Hello,MongoDB! MongoDB Hello Hello,Hadoop Hadoop with MongoDB
8.wordcount程序:(我是在windows上通过eclipse连接hadoop的)
import java.io.*; import java.util.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.bson.*; import com.mongodb.hadoop.*; import com.mongodb.hadoop.util.*; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private final IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (final IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/core-site.xml")); conf.set("mapred.job.tracker", "10.133.103.21:50021"); MongoConfigUtil.setOutputURI(conf, "mongodb://10.133.103.23/test.out"); System.out.println("Conf: " + conf); final Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(MongoOutputFormat.class); FileInputFormat.addInputPath(job, new Path("/tmp/input/")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
9.运行结果:
10. 在MongoDB中查看结果:
> use test switched to db test > show collections blog out system.indexes > db.out.find() { "_id" : "Hadoop", "value" : 2 } { "_id" : "Hello", "value" : 4 } { "_id" : "MongoDB", "value" : 2 } { "_id" : "MongoDB!", "value" : 1 } { "_id" : "with", "value" : 1 } { "_id" : "world!", "value" : 1 }