欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

彷徨 | MapReduce实例三 | 求共同好友

程序员文章站 2022-05-01 13:32:18
...

原始数据 : 

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

代码实现  :

CommonFriendsOne

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class CommonFriendsOne {
	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split(":");
			String user = split[0];
			String[] friends = split[1].split(",");
			for (String f : friends) {
				context.write(new Text(f), new Text(user));
			}
		}
	}
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text f, Iterable<Text> users, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringBuilder sb = new StringBuilder();
			for (Text user : users) {
				sb.append(user.toString()).append(",");
			}
			context.write(f, new Text(sb.toString()));
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		//设置map和reduce以及提交的jar
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(CommonFriendsOne.class);
		
		//设置输入和输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//设置输入和输出目录
		FileInputFormat.addInputPath(job, new Path("E:/data/friend.txt"));
		FileOutputFormat.setOutputPath(job, new Path("E:/data/out/friendOne"));
		
		//判断目录是否存在
		File file = new File("E:/data/out/friendOne");
		if(file.exists()) {
			FileUtils.deleteDirectory(file);
		}
		
		//提交任务
		boolean completion = job.waitForCompletion(true);
		System.out.println(completion?"优秀":"失败");
	}
}

CommonFriendsTwo

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import hadoop_day06.friends.ConmmonFriendsTwo;

public class CommonFriendsTwo {
	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			String f = split[0];
			String[] users = split[1].split(",");
			Arrays.sort(users);
			for (int i = 0; i < users.length-1; i++) {
				for (int j = i+1; j < users.length; j++) {
					context.write(new Text(users[i]+"---"+users[j]), new Text(f));
				}
			}
			
		}
	}
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text userPair, Iterable<Text> friends, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringBuilder sb = new StringBuilder();
			for (Text f : friends) {
				sb.append(f.toString()).append(",");
			}
			context.write(userPair, new Text(sb.deleteCharAt(sb.length()-1).toString()));
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		//设置map和reduce,以及提交的jar
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(ConmmonFriendsTwo.class);
		
		//设置输入输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//输入和输出目录
		FileInputFormat.addInputPath(job, new Path("E:\\data\\out\\friendOne"));
		FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\friendTwo"));
		
		//判断文件是否存在
		File file = new File("E:\\data\\out\\friendTwo");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		
		//提交任务
		boolean completion = job.waitForCompletion(true);
		System.out.println(completion?"优秀!!!":"滚去调bug!!");
	}
}

运行结果:

CommonFriendsOne运行结果:

彷徨 | MapReduce实例三 | 求共同好友
 彷徨 | MapReduce实例三 | 求共同好友

CommonFriendsTwo运行结果:

彷徨 | MapReduce实例三 | 求共同好友

彷徨 | MapReduce实例三 | 求共同好友

最终详细运行结果 : 

A---B	E,C
A---C	D,F
A---D	E,F
A---E	D,B,C
A---F	O,B,C,D,E
A---G	F,E,C,D
A---H	E,C,D,O
A---I	O
A---J	O,B
A---K	D,C
A---L	F,E,D
A---M	E,F
B---C	A
B---D	A,E
B---E	C
B---F	E,A,C
B---G	C,E,A
B---H	A,E,C
B---I	A
B---K	C,A
B---L	E
B---M	E
B---O	A
C---D	A,F
C---E	D
C---F	D,A
C---G	D,F,A
C---H	D,A
C---I	A
C---K	A,D
C---L	D,F
C---M	F
C---O	I,A
D---E	L
D---F	A,E
D---G	E,A,F
D---H	A,E
D---I	A
D---K	A
D---L	E,F
D---M	F,E
D---O	A
E---F	D,M,C,B
E---G	C,D
E---H	C,D
E---J	B
E---K	C,D
E---L	D
F---G	D,C,A,E
F---H	A,D,O,E,C
F---I	O,A
F---J	B,O
F---K	D,C,A
F---L	E,D
F---M	E
F---O	A
G---H	D,C,E,A
G---I	A
G---K	D,A,C
G---L	D,F,E
G---M	E,F
G---O	A
H---I	O,A
H---J	O
H---K	A,C,D
H---L	D,E
H---M	E
H---O	A
I---J	O
I---K	A
I---O	A
K---L	D
K---O	A
L---M	E,F