欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce读取HDFS,将结果写入MongoDB

程序员文章站 2022-05-30 20:16:12
...

参考:http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

附件是我编译和下载好的jar包(hadoop版本:hadoop-0.20.2-cdh3u3),需要的可下载直接使用。

 

首先,编译MongoDB Adapter

1.下载源码:

https://github.com/mongodb/mongo-hadoop

 

2.修改build.sbt

hadoopRelease in ThisBuild := "cdh3"

 

3.编译:

./sbt package

 

4.编译好后

在target目录生成mongo-hadoop_cdh3u3-1.1.0-SNAPSHOT.jar

在core/target目录生成mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar

 

5.下载mongo-2.7.3.jar:

wget --no-check-certificate https://github.com/downloads/mongodb/mongo-java-driver/mongo-2.7.3.jar

 

6. 将这两个jar包拷贝至hadoop集群每个节点的$HADOOP_HOME/lib目录

cp mongo-2.7.3.jar $HADOOP_HOME/lib/

cp core/target/mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar $HADOOP_HOME/lib/

 

7.准备input文件:hadoop fs -put input.txt /tmp/input/

vi input.txt

Hello,MongoDB!
MongoDB
Hello
Hello,Hadoop
Hadoop with MongoDB

 

8.wordcount程序:(我是在windows上通过eclipse连接hadoop的)

 

import java.io.*;
import java.util.*;

import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.bson.*;

import com.mongodb.hadoop.*;
import com.mongodb.hadoop.util.*;

public class WordCount {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		private final IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			int sum = 0;
			for (final IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {

		final Configuration conf = new Configuration();
		conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
		conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
		conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
		conf.set("mapred.job.tracker", "10.133.103.21:50021");
		MongoConfigUtil.setOutputURI(conf, "mongodb://10.133.103.23/test.out");
		System.out.println("Conf: " + conf);

		final Job job = new Job(conf, "word count");

		job.setJarByClass(WordCount.class);

		job.setMapperClass(TokenizerMapper.class);

		job.setReducerClass(IntSumReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setOutputFormatClass(MongoOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path("/tmp/input/"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

9.运行结果:


MapReduce读取HDFS,将结果写入MongoDB
            
    
    博客分类: hadoopNoSQL hadoopmongodb 
 

10. 在MongoDB中查看结果:

 

> use test
switched to db test
> show collections
blog
out
system.indexes
> db.out.find()
{ "_id" : "Hadoop", "value" : 2 }
{ "_id" : "Hello", "value" : 4 }
{ "_id" : "MongoDB", "value" : 2 }
{ "_id" : "MongoDB!", "value" : 1 }
{ "_id" : "with", "value" : 1 }
{ "_id" : "world!", "value" : 1 }

 

  • MapReduce读取HDFS,将结果写入MongoDB
            
    
    博客分类: hadoopNoSQL hadoopmongodb 
  • 大小: 402.9 KB
相关标签: hadoop mongodb