如何使用Spark计算共同好友?
文章目录
写在前面
你们好我是啊晨 ,一个大数据分享者兼一个努力成为大垃圾的小垃圾
本章介绍,使用spark计算共同好友,相信看这篇文章之前都有了解做过MapReduce的共同好友,文章后会有MapReduce的方法,大家自行比较一下哈。
如有其它需要请阅读我的其它大数据文章,谢谢
中间有什么问题请留言,请珍惜现在的时间:
描述
如网站有如下关系数据:
friends.txt
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
数据说明:
A:B,C,D,F,E,O
每行数据以冒号为分隔符:
1.冒号左边是网站的一个用户A;
2.冒号右边是网站A的好友列表(各好友间以逗号为分隔符);
现在需要对网站的用户进行分析,找出那些用户两两之间有共同好友,以及他俩的共同好友都有那些人。
如:A、B两个用户拥有共同好友C和E;
(F-H,D,O,A,C,E)
(A-F,B,D,O,C,E)
(B-H,A,C,E)
(F-I,O,A)
(G-O,A)
(B-C,A)
(D-F,A,E)
(B-M,E)
(C-G,F,D,A)
....
计算
完整代码如下:
package spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//共同好友聚合
object CommonFriend {
def main(args: Array[String]): Unit = {
//conf
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getName)
//sc
val sc = new SparkContext(conf)
//读取数据
val source: RDD[String] = sc.textFile("file:///E:\\work\\friends.txt")
//处理数据
val friendAndUser: RDD[(String, List[String])] = source.flatMap(line => {
//按照:进行切分
val infos: Array[String] = line.split(":", -1)
//获取用户
val user = infos(0)
//获取好友列表
val friends = infos(1).split(",", -1)
//好友为k,用户作为v
friends.map(friend => (friend, List(user)))
})
// 好友,用户,聚合,如第一次mapreduce完成
val friendAndUsers = friendAndUser.reduceByKey((list1, list2) => list1 ++ list2)
val userAndUserAndFriend: RDD[(String, String)] = friendAndUsers.flatMap(tp => {
//为用户正序排序,避免重复获取
val list: List[String] = tp._2.sortBy(x => x)
//创建容器存放用户--用户,好友
var listFriend = List[(String, String)]()
//遍历用户
for (i <- 0 until list.size - 1) {
for (j <- i + 1 until list.size) {
listFriend :+= (list(i) + "-" + list(j), tp._1)
}
}
listFriend
})
userAndUserAndFriend.reduceByKey((x, y) => x.concat(",").concat(y)).foreach(println)
//关闭资源
sc.stop()
}
}
MapReduce计算共同好友
参考文章https://zhuanlan.zhihu.com/p/50236955
job1的mapper类
map函数中的逻辑相对简单,只需要对原始数据按分隔符切分,然后将“好友”作为key,用户作为value输出即可。
public static class CommonFriendStep1Mapper extends Mapper<LongWritable, Text, Text, Text>{
// 输入数据形式如: A:B,C,D,F,E,O
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 将maptask所传入的一行数据按照冒号切分
String[] split = value.toString().split(":");
// 得到数据中的用户
String user = split[0];
// 得到数据中的"好友们"
String[] friends = split[1].split(",");
// 将每一个"好友"作为key,用户作为value,返回给maptask
for (String f : friends) {
context.write(new Text(f), new Text(user));
}
}
}
job1的Reducer类
reduce函数中的核心处理逻辑是对拥有某个共同好友的所有用户两两拼对,然后输出各种两两对组合及他们所拥有的共同好友。
public static class CommonFriendStep1Reducer extends Reducer<Text, Text, Text, Text>{
// f:某个"好友"
// users:拥有这个f好友的一堆用户
@Override
protected void reduce(Text f, Iterable<Text> users, Context context) throws IOException, InterruptedException {
ArrayList<Text> userList = new ArrayList<>();
// 将这一组拥有共同好友f的user们从迭代器中取出,放入一个arraylist暂存
for (Text u : users) {
userList.add(new Text(u));
}
// 对users排个序,以免拼俩俩对时出现A-F 又有F-A的现象
Collections.sort(userList);
// 把这一对user进行两两组合,并将:
//1.组合作为key
//2.共同的好友f作为value
//返回给reduce task作为本job的最终结果
for(int i=0;i<userList.size()-1;i++) {
for(int j=i+1;j<userList.size();j++) {
// 输出 "用户-用户" 两两对,及他俩的共同好友
context.write(new Text(userList.get(i)+"-"+userList.get(j)), f);
}
}
}
}
job1的客户端
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommonFriendStep1.class);
// 设置job的mapper类
job.setMapperClass(CommonFriendStep1Mapper.class);
// 设置job的reducer类
job.setReducerClass(CommonFriendStep1Reducer.class);
// 设置map阶段输出的key:value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置reduce阶段输出的key:value数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 判断结果输出路径是否已存在,如果已经存在,则删除。以免在测试阶段需要反复手动删除输出目录
FileSystem fs = FileSystem.get(conf);
Path out = new Path(args[0]);
if(fs.exists(out)) {
fs.delete(out, true);
}
// 设置数据输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job,out);
// 提交job给yarn或者local runner来运行
job.waitForCompletion(true);
}
job2的Mapper类
map()方法逻辑相对简单,只需要对上一个步骤所产生的数据切分,然后以“两两对”作为key,他们共同的好友作为value输出即可。
public static class CommonFriendStep2Mapper extends Mapper<LongWritable, Text, Text, Text>{
// 上一个job所产生的数据是本次job读取的数据: B-C A
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将数据按制表符切分
String[] split = value.toString().split("\t");
// 将切出来的B-C用户对作为key,共同好友A作为value
// 返回给map task
context.write(new Text(split[0]), new Text(split[1]));
}
}
job2的Reducer类
reduce()方法会获得“两两对”用户组合所拥有的所有好友value,从而只需要迭代每一组value进行字符串拼接,即可得到最终结果
public static class CommonFriendStep2Reducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text pair, Iterable<Text> friends, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 构造一个StringBuilder用于拼接字符串
StringBuilder sb = new StringBuilder();
// 将这个用户对的所有共同好友拼接在一起
for (Text f : friends) {
sb.append(f).append(",");
}
// 将用户对作为key,拼接好的所有共同好友作为value,返回给reduce task
context.write(pair, new Text(sb.substring(0, sb.length()-1)));
}
}
job2的客户端
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommonFriendStep2.class);
// 设置job的mapper类和reducer类
job.setMapperClass(CommonFriendStep2Mapper.class);
job.setReducerClass(CommonFriendStep2Reducer.class);
// 设置map阶段输出key:value数据的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置reudce阶段输出key:value数据的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 检测输出目录是否已存在,如果已存在则删除,以免在测试阶段需要反复手动删除输出目录
FileSystem fs = FileSystem.get(conf);
Path out = new Path("F:\\mrdata\\friends\\out-2");
if(fs.exists(out)) {
fs.delete(out, true);
}
// 设置数据输入输出目录
FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\friends\\out-1"));
FileOutputFormat.setOutputPath(job,out);
// 提交job到yarn或者local runner执行
job.waitForCompletion(true);
}
本篇完结,Spark比MR少了超级多代码,很是舒服,一定敲敲敲着去理解,下篇继续更新大数据其他内容,记得关注点赞支持哈,谢谢观看
单是说不行,要紧的是做。——鲁迅
上一篇: 晒被子不能拍打,六步骤科学晾褥
下一篇: aspnet core运行后台任务