Mapreduce 找博客共同好友案例
以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
输入:
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D1
L:D,E,F
M:E,F,G
O:A,H,I,J
输出:
A-B E C
A-D E
A-E D C B
A-F C D B O E
A-G C
A-H D O C
A-I O
A-J O B
A-K C
A-L D
B-D E A
B-F C A
B-G C A
B-H C A
B-K C
B-O A
C-A D F
C-B A
C-D A
C-E D
C-F D A
C-G F A
C-H A D
C-K D
C-L D
C-O A
D-A F
D-C F
D-E L
D-G F
E-B C
E-F M C
E-G C
E-H C D
E-K C
F-B E
F-D E A
F-E B D
F-G C
F-H D A
F-J B
F-K C
F-O A
G-A E D F
G-B E
G-C D
G-D E A
G-E D
G-F D E A
G-H E A D
G-K D C
G-L D E
G-M E
G-O A
H-A E
H-B E
H-D A E
H-F C E O
H-G C
H-I O
H-J O
H-K C
H-O A
I-B A
I-C A
I-D A
I-F A O
I-G A
I-H A
I-J O
I-K A
I-O A
J-E B
J-F O
K-A D
K-B A
K-C A
K-D A
K-E D
K-F D A
K-G A
K-H D A
K-L D
K-O A
L-A F E
L-B E
L-C F
L-D E F
L-E D
L-F E D
L-G F
L-H E D
L-M F
M-A F E
M-B E
M-C F
M-D F E
M-F E
M-G F
M-H E
M-L E
O-C I
O-D A
思路:
将其看做一副图。然后用矩阵法表示图。第2横行就是a的好友(a加了谁),也就是我们的输入数据。第2列就是。所有
加了 A的人。B->A 等于1.那么就是B加了A。那么第二列为1的他们的共同好友就是A,第三列为1的他们的共同好友就是B
以此类推下去。然后将他们按照列两两组合起来。BC 的共同好友是 A,BD的共同好友也是A。这样就的到了他们所有人的共同好友。然后在进行一个汇总。就可以了。
那么输入的数据是行。我们怎么的到每个列呢?
每一列的数据,有一个公共的 就是 每一个竖坐标 都是相同的。例如
BA 表示B的好友是A
CA
DA
FA
GA
HA
IA
KA
OA
这些的共同点就是竖坐标相同。那么我们就map端就以竖坐标做为key(也就是输入数据的冒号后面的数据) 横坐标做为value(也就是输入数据的冒号前面的数据)。
传入reduce端。这样reduce端就会自动给他们分组。相同key的被分为一组。然后我们进行一次汇总。就得到一竖行。
一竖行。就是这些人都加了A。那么这些人的共同好友就是A。这是第一次job。结果如下
A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,
第二个job:
map端:他们的公共好友已经出来了。只不过没有以两两的形式显示公共好友。所以我们对他切割 tab前面的数据为value。然后在对tab后面的数据进行切割。然后遍历他们。对他们两两组合。组合成最终结果的样子。作为key。然后输出。
reduce端:因为是以 两两之间可能有很多个共同。好友。然后他们的key都是相同的,共同好友有几个。所以。他们会进入同一个组。然后一起进入reduce。接着对他们进行一个组合就可以了。
代码:第一个job
map类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.codahale.metrics.EWMA;
public class FriendsOneMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//1.获取一行
//A:B,C,D,F,E,O
String line = value.toString();
//2.先以冒号切割
String[] oneSplit = line.split(":");
//3.将第一次切割的第二部分以逗号切割
String[] twoSplit = oneSplit[1].split(",");
//4.将第一次切割的第一部分。和第二次切割的每一部分写出去
//第二次切割的每一部分为key 第一次切割的部分为value
for (String string : twoSplit) {
k.set(string);
v.set(oneSplit[0]);
context.write(k, v);
}
}
}
reduce类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FriendsOneReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer kstr = new StringBuffer();
for (Text text : values) {
kstr.append(text.toString()+",");
}
context.write(key, new Text(kstr.toString()));
}
}
driver类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FriendsOneDirver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job
Job job = Job.getInstance(new Configuration());
//2.设置jar file
job.setJarByClass(FriendsOneDirver.class);
//3.设置map class
job.setMapperClass(FriendsOneMapper.class);
//4.设置reducer class
job.setReducerClass(FriendsOneReducer.class);
//5.设置map的输出
job.setMapOutputKeyClass(Text.class);
//6设置reduce 的输出
job.setMapOutputValueClass(Text.class);
//7.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("e:/friendInput/*"));
FileOutputFormat.setOutputPath(job, new Path("e:/friendsOutput/"));
//8.提交任务
job.waitForCompletion(true);
}
}
第二个job
map类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FriendsTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 1.获取一行
String line = value.toString();
// 2.切割
String[] oneSplit = line.split("\t");
String[] twoSplit = oneSplit[1].split(",");
// 3.输出
for (int i = 0; i < twoSplit.length; i++) {
for (int j = i + 1; j < twoSplit.length; j++) {
k.set(twoSplit[i] + "-" + twoSplit[j]);
v.set(oneSplit[0]);
context.write(k, v);
}
}
}
}
reduce类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FriendsTwoReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key , Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer valuesBuffer = new StringBuffer();
for (Text text : values) {
valuesBuffer.append(text.toString()+" ");
}
Text v = new Text();
v.set(valuesBuffer.toString());
context.write(key, v);
}
}
driver类
package com.lw.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FriendsTwoDirver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job
Job job = Job.getInstance(new Configuration());
//2.设置jar file
job.setJarByClass(FriendsTwoDirver.class);
//3.设置map class
job.setMapperClass(FriendsTwoMapper.class);
//4.设置reducer class
job.setReducerClass(FriendsTwoReducer.class);
//5.设置map的输出
job.setMapOutputKeyClass(Text.class);
//6设置reduce 的输出
job.setMapOutputValueClass(Text.class);
//7.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("E:/friends2Input/*"));
FileOutputFormat.setOutputPath(job, new Path("e:/friends2Output/"));
//8.提交任务
job.waitForCompletion(true);
}
}
上一篇: [Spark共同好友查找]
下一篇: Kylin3.1.1集成CDH6.2.1