欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mapreduce 找博客共同好友案例

程序员文章站 2022-05-01 13:03:05
...

以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

输入:

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D1
L:D,E,F
M:E,F,G
O:A,H,I,J

 

输出:

A-B    E C 
A-D    E 
A-E    D C B 
A-F    C D B O E 
A-G    C 
A-H    D O C 
A-I    O 
A-J    O B 
A-K    C 
A-L    D 
B-D    E A 
B-F    C A 
B-G    C A 
B-H    C A 
B-K    C 
B-O    A 
C-A    D F 
C-B    A 
C-D    A 
C-E    D 
C-F    D A 
C-G    F A 
C-H    A D 
C-K    D 
C-L    D 
C-O    A 
D-A    F 
D-C    F 
D-E    L 
D-G    F 
E-B    C 
E-F    M C 
E-G    C 
E-H    C D 
E-K    C 
F-B    E 
F-D    E A 
F-E    B D 
F-G    C 
F-H    D A 
F-J    B 
F-K    C 
F-O    A 
G-A    E D F 
G-B    E 
G-C    D 
G-D    E A 
G-E    D 
G-F    D E A 
G-H    E A D 
G-K    D C 
G-L    D E 
G-M    E 
G-O    A 
H-A    E 
H-B    E 
H-D    A E 
H-F    C E O 
H-G    C 
H-I    O 
H-J    O 
H-K    C 
H-O    A 
I-B    A 
I-C    A 
I-D    A 
I-F    A O 
I-G    A 
I-H    A 
I-J    O 
I-K    A 
I-O    A 
J-E    B 
J-F    O 
K-A    D 
K-B    A 
K-C    A 
K-D    A 
K-E    D 
K-F    D A 
K-G    A 
K-H    D A 
K-L    D 
K-O    A 
L-A    F E 
L-B    E 
L-C    F 
L-D    E F 
L-E    D 
L-F    E D 
L-G    F 
L-H    E D 
L-M    F 
M-A    F E 
M-B    E 
M-C    F 
M-D    F E 
M-F    E 
M-G    F 
M-H    E 
M-L    E 
O-C    I 
O-D    A 
 


思路:

将其看做一副图。然后用矩阵法表示图。第2横行就是a的好友(a加了谁),也就是我们的输入数据。第2列就是。所有

加了 A的人。B->A 等于1.那么就是B加了A。那么第二列为1的他们的共同好友就是A,第三列为1的他们的共同好友就是B

以此类推下去。然后将他们按照列两两组合起来。BC  的共同好友是 A,BD的共同好友也是A。这样就的到了他们所有人的共同好友。然后在进行一个汇总。就可以了。

那么输入的数据是行。我们怎么的到每个列呢?

每一列的数据,有一个公共的  就是 每一个竖坐标 都是相同的。例如

BA 表示B的好友是A

CA

DA

FA

GA

HA

IA

KA

OA

这些的共同点就是竖坐标相同。那么我们就map端就以竖坐标做为key(也就是输入数据的冒号后面的数据) 横坐标做为value(也就是输入数据的冒号前面的数据)。

传入reduce端。这样reduce端就会自动给他们分组。相同key的被分为一组。然后我们进行一次汇总。就得到一竖行。

一竖行。就是这些人都加了A。那么这些人的共同好友就是A。这是第一次job。结果如下

A    I,K,C,B,G,F,H,O,D,
B    A,F,J,E,
C    A,E,B,H,F,G,K,
D    G,C,K,A,L,F,E,H,
E    G,M,L,H,A,F,B,D,
F    L,M,D,C,G,A,
G    M,
H    O,
I    O,C,
J    O,
K    B,
L    D,E,
M    E,F,
O    A,H,I,J,F,

第二个job:

map端:他们的公共好友已经出来了。只不过没有以两两的形式显示公共好友。所以我们对他切割 tab前面的数据为value。然后在对tab后面的数据进行切割。然后遍历他们。对他们两两组合。组合成最终结果的样子。作为key。然后输出。

reduce端:因为是以 两两之间可能有很多个共同。好友。然后他们的key都是相同的,共同好友有几个。所以。他们会进入同一个组。然后一起进入reduce。接着对他们进行一个组合就可以了。

 

Mapreduce 找博客共同好友案例

代码:第一个job


map类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.codahale.metrics.EWMA;

public class FriendsOneMapper extends Mapper<LongWritable, Text, Text, Text>{

	Text k = new Text();
	Text v = new Text();
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		//1.获取一行
		//A:B,C,D,F,E,O
		String line = value.toString();
		//2.先以冒号切割
		String[] oneSplit = line.split(":");
		//3.将第一次切割的第二部分以逗号切割
		String[] twoSplit = oneSplit[1].split(",");
		//4.将第一次切割的第一部分。和第二次切割的每一部分写出去
		//第二次切割的每一部分为key  第一次切割的部分为value
		for (String string : twoSplit) {
			k.set(string);
			v.set(oneSplit[0]);
			context.write(k, v);
		}
	}
}

 

reduce类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FriendsOneReducer  extends Reducer<Text, Text, Text, Text>{
	
	@Override
	protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		StringBuffer kstr = new StringBuffer();
		
		for (Text text : values) {
			
			kstr.append(text.toString()+",");
		}
		
		context.write(key, new Text(kstr.toString()));
	}
}

driver类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FriendsOneDirver  {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		
		//1.获取job
		
		Job job = Job.getInstance(new Configuration());
		//2.设置jar file
		job.setJarByClass(FriendsOneDirver.class);
		//3.设置map class
		job.setMapperClass(FriendsOneMapper.class);
		//4.设置reducer class
		job.setReducerClass(FriendsOneReducer.class);
		
		//5.设置map的输出
		job.setMapOutputKeyClass(Text.class);
		//6设置reduce 的输出
		job.setMapOutputValueClass(Text.class);
		
		//7.设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path("e:/friendInput/*"));
		FileOutputFormat.setOutputPath(job, new Path("e:/friendsOutput/"));
		
		//8.提交任务
		job.waitForCompletion(true);
		
		
	}
}

第二个job


map类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FriendsTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
	Text k = new Text();
	Text v = new Text();

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {

		// 1.获取一行
		String line = value.toString();
		// 2.切割
		String[] oneSplit = line.split("\t");

		String[] twoSplit = oneSplit[1].split(",");

		// 3.输出
		for (int i = 0; i < twoSplit.length; i++) {
			for (int j = i + 1; j < twoSplit.length; j++) {
				k.set(twoSplit[i] + "-" + twoSplit[j]);
				v.set(oneSplit[0]);
				context.write(k, v);
			}
		}

	}
}

reduce类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FriendsTwoReducer extends Reducer<Text, Text, Text, Text> {
	
	@Override
	protected void reduce(Text key	, Iterable<Text> values,
			Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		StringBuffer valuesBuffer = new StringBuffer();
		
		for (Text text : values) {
			
			valuesBuffer.append(text.toString()+" ");
		}
		Text v = new Text();
		
		v.set(valuesBuffer.toString());
		
		context.write(key, v);
	}
}

driver类

package com.lw.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FriendsTwoDirver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		//1.获取job
		
		Job job = Job.getInstance(new Configuration());
		//2.设置jar file
		job.setJarByClass(FriendsTwoDirver.class);
		//3.设置map class
		job.setMapperClass(FriendsTwoMapper.class);
		//4.设置reducer class
		job.setReducerClass(FriendsTwoReducer.class);
		
		//5.设置map的输出
		job.setMapOutputKeyClass(Text.class);
		//6设置reduce 的输出
		job.setMapOutputValueClass(Text.class);
		
		//7.设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path("E:/friends2Input/*"));
		FileOutputFormat.setOutputPath(job, new Path("e:/friends2Output/"));
		
		//8.提交任务
		job.waitForCompletion(true);
		
	}
}