MapReduce案例之寻找共同好友
以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
输出格式:
A-B:C,E
(用户-用户:共同好友…)
需求分析
分为两个job
第一次输出结果,先求出A、B、C、….等是谁的好友
Job1:Mapper
:keyin-valuein
: (A:B,C,D,F,E,O)map()
: 将valuein拆分为若干好友,作为keyout写出
将keyin作为valueoutkeyout-valueout
: (友:用户)
(c:A),(C:B),(C:E)
Reducer
:keyin-valuein
: (友:用户)
(c:A),(C:B),(C:E)reduce()
: keyout-valueout
:(友:用户,用户,用户,用户)
A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,
第二次输出结果,输出每两个人的共同好友
Job2
:Mapper
:keyin-valuein
: (友:用户,用户,用户,用户)map()
: 使用keyin作为valueout
将valuein切分后,两两拼接,作为keyoutkeyout-valueout
: (用户-用户,友)
(A-B,C),(A-B,E)
(A-E,C), (A-G,C), (A-F,C), (A-K,C)
(B-E,C ),(B-G,C)
--------------------
(B-E,C)
(E-B,G)
B-E: C,G
A-B E C
A-C D F
A-D E F
A-E D B C
A-F O B C D E
A-G F E C D
A-H E C D O
A-I O
A-J O B
A-K D C
A-L F E D
A-M E F
B-C A
B-D A E
B-E C
B-F E A C
B-G C E A
B-H A E C
B-I A
B-K C A
B-L E
B-M E
B-O A
C-D A F
C-E D
C-F D A
C-G D F A
C-H D A
C-I A
C-K A D
C-L D F
C-M F
C-O I A
D-E L
D-F A E
D-G E A F
D-H A E
D-I A
D-K A
D-L E F
D-M F E
D-O A
E-F D M C B
E-G C D
E-H C D
E-J B
E-K C D
E-L D
F-G D C A E
F-H A D O E C
F-I O A
F-J B O
F-K D C A
F-L E D
F-M E
F-O A
G-H D C E A
G-I A
G-K D A C
G-L D F E
G-M E F
G-O A
H-I O A
H-J O
H-K A C D
H-L D E
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M E F
Reducer
:keyin-valuein
: (A-B,C),(A-B,E)reduce()
: keyout-valueout
: (A-B:C,E)
代码实现
(1)第一次Mapper类
public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 1 获取一行 A:B,C,D,F,E,O
String line = value.toString();
// 2 切割,冒号前面的是用户,冒号后面的是好友
String[] fields = line.split(":");
// 3 获取person和好友
String person = fields[0];
String[] friends = fields[1].split(",");
// 4写出
for(String friend: friends){
// 输出 <好友,用户>
context.write(new Text(friend), new Text(person));
}
}
}
(2)第一次Reducer类
public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
//拼接
for(Text person: values){
sb.append(person).append(",");
}
//写出
context.write(key, new Text(sb.toString()));
}
}
(3)第一次Driver类
public class OneShareFriendsDriver {
public static void main(String[] args) throws Exception {
// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定jar包运行的路径
job.setJarByClass(OneShareFriendsDriver.class);
// 3 指定map/reduce使用的类
job.setMapperClass(OneShareFriendsMapper.class);
job.setReducerClass(OneShareFriendsReducer.class);
// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
(4)第二次Mapper类
public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// A I,K,C,B,G,F,H,O,D,
// 友 人,人,人
String line = value.toString();
String[] friend_persons = line.split("\t");
String friend = friend_persons[0];
String[] persons = friend_persons[1].split(",");
Arrays.sort(persons);
for (int i = 0; i < persons.length - 1; i++) {
for (int j = i + 1; j < persons.length; j++) {
// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去
context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
}
}
}
}
(5)第二次Reducer类
public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text friend : values) {
sb.append(friend).append(" ");
}
context.write(key, new Text(sb.toString()));
}
}
(6)第二次Driver类
public class TwoShareFriendsDriver {
public static void main(String[] args) throws Exception {
// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定jar包运行的路径
job.setJarByClass(TwoShareFriendsDriver.class);
// 3 指定map/reduce使用的类
job.setMapperClass(TwoShareFriendsMapper.class);
job.setReducerClass(TwoShareFriendsReducer.class);
// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
代码实现方案二
mapper1.java
/*
* keyin-valuein: (A:B,C,D,F,E,O)
map(): 将valuein拆分为若干好友,作为keyout写出
将keyin作为valueout
keyout-valueout: (友:用户)
(c:A),(C:B),(C:E)
*/
public class Example3Mapper1 extends Mapper<Text, Text, Text, Text>{
private Text out_key=new Text();
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] friends = value.toString().split(",");
for (String friend : friends) {
out_key.set(friend);
context.write(out_key, key);
}
}
}
reducer1.java
/*
* keyin-valuein : (友:用户)
(c:A),(C:B),(C:E)
reduce():
keyout-valueout :(友:用户,用户,用户,用户)
*/
public class Example3Reducer extends Reducer<Text, Text, Text, Text>{
private Text out_value=new Text();
@Override
protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text text : value) {
sb.append(text.toString()+",");
}
out_value.set(sb.toString());
context.write(key, out_value);
}
}
mapper2.java
/*
keyin-valuein: (友\t用户,用户,用户,用户)
map(): 使用keyin作为valueout
将valuein切分后,两两拼接,作为keyout
keyout-valueout: (用户-用户,友)
(A-B,C),(A-B,E)
(A-E,C), (A-G,C), (A-F,C), (A-K,C)
(B-E,C),(B-G,C)
*/
public class Example3Mapper2 extends Mapper<Text, Text, Text, Text>{
private Text out_key=new Text();
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] users = value.toString().split(",");
//保证数组中的用户名有序
Arrays.sort(users);
//将valuein切分后,两两拼接,作为keyout
for (int i = 0; i < users.length-1; i++) {
for (int j = i+1; j < users.length; j++) {
out_key.set(users[i]+"-"+users[j]);
context.write(out_key, key);
}
}
}
}
reducer2.java
/*
*keyin-valuein : (A-B,C),(A-B,E)
reduce():
keyout-valueout : (A-B:C,E)
*/
public class Example3Reducer2 extends Reducer<Text, Text, Text, Text>{
private Text out_value=new Text();
@Override
protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text text : value) {
sb.append(text.toString()+",");
}
out_value.set(sb.toString());
context.write(key, out_value);
}
}
driver.java
/*
* 1. Example1Driver 提交两个Job
* Job2 必须 依赖于 Job1,必须在Job1已经运行完成之后,生成结果后,才能运行!
*
* 2. JobControl: 定义一组MR jobs,还可以指定其依赖关系
* 可以通过addJob(ControlledJob aJob)向一个JobControl中添加Job对象!
*
* 3. ControlledJob: 可以指定依赖关系的Job对象
* addDependingJob(ControlledJob dependingJob): 为当前Job添加依赖的Job
* public ControlledJob(Configuration conf) : 基于配置构建一个ControlledJob
*
*/
public class Example3Driver {
public static void main(String[] args) throws Exception {
//定义路径
Path inputPath=new Path("e:/mrinput/friend");
Path outputPath=new Path("e:/mroutput/friend");
Path finalOutputPath=new Path("e:/mroutput/finalfriend");
//作为整个Job的配置
Configuration conf1 = new Configuration();
conf1.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":");
Configuration conf2 = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf1);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(finalOutputPath)) {
fs.delete(finalOutputPath, true);
}
// ①创建Job
Job job1 = Job.getInstance(conf1);
Job job2 = Job.getInstance(conf2);
// 设置Job名称
job1.setJobName("index1");
job2.setJobName("index2");
// ②设置Job1
job1.setMapperClass(Example3Mapper1.class);
job1.setReducerClass(Example3Reducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job1, inputPath);
FileOutputFormat.setOutputPath(job1, outputPath);
job1.setInputFormatClass(KeyValueTextInputFormat.class);
// ②设置Job2
job2.setMapperClass(Example3Mapper2.class);
job2.setReducerClass(Example3Reducer2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job2, outputPath);
FileOutputFormat.setOutputPath(job2, finalOutputPath);
// 设置job2的输入格式
job2.setInputFormatClass(KeyValueTextInputFormat.class);
//--------------------------------------------------------
//构建JobControl
JobControl jobControl = new JobControl("friends");
//创建运行的Job
ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
//指定依赖关系
controlledJob2.addDependingJob(controlledJob1);
// 向jobControl设置要运行哪些job
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);
//运行JobControl
Thread jobControlThread = new Thread(jobControl);
//设置此线程为守护线程
jobControlThread.setDaemon(true);
jobControlThread.start();
//获取JobControl线程的运行状态
while(true) {
//判断整个jobControl是否全部运行结束
if (jobControl.allFinished()) {
System.out.println(jobControl.getSuccessfulJobList());
return;
}
}
}
}
推荐阅读
-
MapReduce编程实战之WordCount简单案例分析
-
Hadoop 之Mapreduce wordcount词频统计案例
-
MapReduce程序寻找两个用户之间共同好友
-
MapReduce案例之寻找共同好友
-
彷徨 | MapReduce实例三 | 求共同好友
-
Mapreduce 找博客共同好友案例
-
mapreduce求共同好友案例示例
-
hadoop求共同好友案例
-
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客案例
-
MapReduce编程实战之WordCount简单案例分析