欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

如何使用Spark计算共同好友?

程序员文章站 2022-05-01 13:16:54
...

写在前面

你们好我是啊晨 ,一个大数据分享者兼一个努力成为大垃圾的小垃圾
本章介绍,使用spark计算共同好友,相信看这篇文章之前都有了解做过MapReduce的共同好友,文章后会有MapReduce的方法,大家自行比较一下哈。
如有其它需要请阅读我的其它大数据文章,谢谢
中间有什么问题请留言,请珍惜现在的时间:

如何使用Spark计算共同好友?

描述

如网站有如下关系数据
friends.txt

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

数据说明:
A:B,C,D,F,E,O
每行数据以冒号为分隔符:
1.冒号左边是网站的一个用户A;
2.冒号右边是网站A的好友列表(各好友间以逗号为分隔符);

现在需要对网站的用户进行分析,找出那些用户两两之间有共同好友,以及他俩的共同好友都有那些人。
如:A、B两个用户拥有共同好友C和E;

(F-H,D,O,A,C,E)
(A-F,B,D,O,C,E)
(B-H,A,C,E)
(F-I,O,A)
(G-O,A)
(B-C,A)
(D-F,A,E)
(B-M,E)
(C-G,F,D,A)
....

计算

完整代码如下:

package spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}



//共同好友聚合
object CommonFriend {
  def main(args: Array[String]): Unit = {
    //conf
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(this.getClass.getName)
    //sc
    val sc = new SparkContext(conf)

    //读取数据
    val source: RDD[String] = sc.textFile("file:///E:\\work\\friends.txt")
    //处理数据
    val friendAndUser: RDD[(String, List[String])] = source.flatMap(line => {
      //按照:进行切分
      val infos: Array[String] = line.split(":", -1)
      //获取用户
      val user = infos(0)
      //获取好友列表
      val friends = infos(1).split(",", -1)
      //好友为k,用户作为v
      friends.map(friend => (friend, List(user)))
    })
    // 好友,用户,聚合,如第一次mapreduce完成
    val friendAndUsers = friendAndUser.reduceByKey((list1, list2) => list1 ++ list2)

    val userAndUserAndFriend: RDD[(String, String)] = friendAndUsers.flatMap(tp => {
      //为用户正序排序,避免重复获取
      val list: List[String] = tp._2.sortBy(x => x)
      //创建容器存放用户--用户,好友
      var listFriend = List[(String, String)]()
      //遍历用户
      for (i <- 0 until list.size - 1) {
        for (j <- i + 1 until list.size) {
          listFriend :+= (list(i) + "-" + list(j), tp._1)
        }
      }
      listFriend
    })
    userAndUserAndFriend.reduceByKey((x, y) => x.concat(",").concat(y)).foreach(println)
    //关闭资源
    sc.stop()
  }
}

MapReduce计算共同好友

参考文章https://zhuanlan.zhihu.com/p/50236955

job1的mapper类

map函数中的逻辑相对简单,只需要对原始数据按分隔符切分,然后将“好友”作为key,用户作为value输出即可。

public static class CommonFriendStep1Mapper extends Mapper<LongWritable, Text, Text, Text>{
 
		// 输入数据形式如:      A:B,C,D,F,E,O
		@Override
 	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		// 将maptask所传入的一行数据按照冒号切分
		String[] split = value.toString().split(":");
		// 得到数据中的用户
		String user = split[0];
		// 得到数据中的"好友们"
		String[] friends = split[1].split(",");
 
		// 将每一个"好友"作为key,用户作为value,返回给maptask
 		for (String f : friends) {
			context.write(new Text(f), new Text(user));
		}
 
	}
}

job1的Reducer类

reduce函数中的核心处理逻辑是对拥有某个共同好友的所有用户两两拼对,然后输出各种两两对组合及他们所拥有的共同好友。

public static class CommonFriendStep1Reducer extends Reducer<Text, Text, Text, Text>{
 
		// f:某个"好友"
		// users:拥有这个f好友的一堆用户
	@Override
 	protected void reduce(Text f, Iterable<Text> users, Context context) throws IOException, InterruptedException {
 
		ArrayList<Text> userList = new ArrayList<>();
		// 将这一组拥有共同好友f的user们从迭代器中取出,放入一个arraylist暂存
 		for (Text u : users) {
			userList.add(new Text(u));
		}
 
		// 对users排个序,以免拼俩俩对时出现A-F 又有F-A的现象
		Collections.sort(userList);
 
		// 把这一对user进行两两组合,并将:
		//1.组合作为key
		//2.共同的好友f作为value
		//返回给reduce task作为本job的最终结果
 		for(int i=0;i<userList.size()-1;i++) {
 			for(int j=i+1;j<userList.size();j++) {
				// 输出 "用户-用户" 两两对,及他俩的共同好友
				context.write(new Text(userList.get(i)+"-"+userList.get(j)), f);
			}
		}
	}
}

job1的客户端

public static void main(String[] args) throws Exception {
 
		Configuration conf = new Configuration();
 
		Job job = Job.getInstance(conf);
 
		job.setJarByClass(CommonFriendStep1.class);
		// 设置job的mapper类
		job.setMapperClass(CommonFriendStep1Mapper.class);
		// 设置job的reducer类
		job.setReducerClass(CommonFriendStep1Reducer.class);
 
		// 设置map阶段输出的key:value数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		// 设置reduce阶段输出的key:value数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
 
		// 判断结果输出路径是否已存在,如果已经存在,则删除。以免在测试阶段需要反复手动删除输出目录
		FileSystem fs = FileSystem.get(conf);
		Path out = new Path(args[0]);
 		if(fs.exists(out)) {
			fs.delete(out, true);
		}
 
		// 设置数据输入输出路径
		FileInputFormat.setInputPaths(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job,out);
 
		// 提交job给yarn或者local runner来运行
		job.waitForCompletion(true);
 
	}

job2的Mapper类

map()方法逻辑相对简单,只需要对上一个步骤所产生的数据切分,然后以“两两对”作为key,他们共同的好友作为value输出即可。

public static class CommonFriendStep2Mapper extends Mapper<LongWritable, Text, Text, Text>{
 
	// 上一个job所产生的数据是本次job读取的数据: B-C	A
	@Override
 	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 将数据按制表符切分
		String[] split = value.toString().split("\t");
		// 将切出来的B-C用户对作为key,共同好友A作为value
		// 返回给map task
		context.write(new Text(split[0]), new Text(split[1]));
	}
}

job2的Reducer类

reduce()方法会获得“两两对”用户组合所拥有的所有好友value,从而只需要迭代每一组value进行字符串拼接,即可得到最终结果

public static class CommonFriendStep2Reducer extends Reducer<Text, Text, Text, Text>{
 
	@Override
 	protected void reduce(Text pair, Iterable<Text> friends, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		// 构造一个StringBuilder用于拼接字符串
		StringBuilder sb = new StringBuilder();
 
		// 将这个用户对的所有共同好友拼接在一起
 		for (Text f : friends) {
			sb.append(f).append(",");
		}
 
		// 将用户对作为key,拼接好的所有共同好友作为value,返回给reduce task
		context.write(pair, new Text(sb.substring(0, sb.length()-1)));
	}
}

job2的客户端

public static void main(String[] args) throws Exception {
 
		Configuration conf = new Configuration();
 
		Job job = Job.getInstance(conf);
 
		job.setJarByClass(CommonFriendStep2.class);
 
		// 设置job的mapper类和reducer类
		job.setMapperClass(CommonFriendStep2Mapper.class);
		job.setReducerClass(CommonFriendStep2Reducer.class);
 
		// 设置map阶段输出key:value数据的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
 
		// 设置reudce阶段输出key:value数据的类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
 
		// 检测输出目录是否已存在,如果已存在则删除,以免在测试阶段需要反复手动删除输出目录
		FileSystem fs = FileSystem.get(conf);
		Path out = new Path("F:\\mrdata\\friends\\out-2");
		if(fs.exists(out)) {
			fs.delete(out, true);
		}
		// 设置数据输入输出目录
		FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\friends\\out-1"));
		FileOutputFormat.setOutputPath(job,out);
		// 提交job到yarn或者local runner执行
		job.waitForCompletion(true);
 
}

如何使用Spark计算共同好友?

本篇完结,Spark比MR少了超级多代码,很是舒服,一定敲敲敲着去理解,下篇继续更新大数据其他内容,记得关注点赞支持哈,谢谢观看

单是说不行,要紧的是做。——鲁迅

相关标签: 大数据