MapReduce程序寻找两个用户之间共同好友
需求分析
从数据集中找出两个国家之间共同样式,这个其实跟我们两个用户之间寻找共同好友没有区别。测试数据集
拿到这个数据进行分析:例如Australia:Cup Pack
对于Australia国家拥有的两个Cup Pack样式我要如何从其他国家找到这种样式呢?问题来了,想想我们MapReduce的核心就是分而治之,先分组,在聚合,既然这样我们用样式进行分组(与共同好友同理,判断一个用户是不是两个用户之间的共同好友,查看这个用户所有的好友列表,循环遍历,利用组合的方式,好友之间不同组合(这两好友不就拥有共同好友==这个用户),实现将样式做为Key,国家做为Value
1.第一个MapReduce程序:
- Map程序:
package com.Same.groupby.Like;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SameMapper extends Mapper<Text, Text, Text, Text> {
Text v = new Text();
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// Australia:Cup Pack
// Bangladesh:Pack
// Brazil:Cup Pack
//1.获取一行
String line = value.toString();
//2.切割
String[] fields = line.split("\t");
//3 .此value当作key,来写(相同的key同时进入reduce方法),那么key当作value写入
for (String var : fields) {
v.set(var);
context.write(v, key);
}
}
}
这里我使用keyvalueinputformat(我们可以不以偏移量和行文本内容来作为数据源到MapTask的输入格式,而使用键值对的形式,使用KeyValueInputFormat就可以完成这种需求。) 进行Map端数据输入
2)Reduce程序:
package com.Same.groupby.Like;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
public class SameReduce extends Reducer<Text, Text, Text, Text> {
Text key = new Text();
@Override
protected void reduce(Text Style, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//给出一个ArrayList集合用来存储地区(Country)
ArrayList<String> arrayList = new ArrayList();
for (Text value : values) {
String var = value.toString();
arrayList.add(var);
}
//接下来就是对arrayList集合进行遍历(排列组合,基本思想是我先拿出集合中第一个元素分别与集合中第二,
// 第三....第n个元素进行组合,第二个元素又分别与集合中第三,第四....第n个元素,依次如此)
for (int i = 0; i < arrayList.size(); i++) {
for (int j = i + 1; j < arrayList.size(); j++) {
key.set(arrayList.get(i) + "@" + arrayList.get(j));
context.write(key,Style);
}
}
}
}
3)主程序:
package com.Same.groupby.Like;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SameDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"F:\\scala\\Workerhdfs\\OutputSame\\Same.txt", "F:\\scala\\Workerhdfs\\OutputSame2"};
//1.获取job对象
Configuration conf = new Configuration();
//设置切割符,如不设置默认是\t
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, ":");
Job job = Job.getInstance(conf);
//2.设置jar存储位置
job.setJarByClass(SameDrive.class);
//3关联Map类与Reduce类
job.setMapperClass(SameMapper.class);
job.setReducerClass(SameReduce.class);
//4.设置Mapper阶段key与value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//5.设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
效果如下:
这样我们只能拿到两个国家之间的一个共同样式,并且key的不同(样式不同),不同key进入的reduce task不同,在同一个reduce task中不会出现国家aaa@qq.com国家B 国家aaa@qq.com国家A,但在不同reduce task会出现这种情况,出现这种情况的原因无非就是,假如,样式1:国家A,国家B,国家C…
样式2:国家B,国家A,国家C…Map阶段进入Reduce阶段默认会进行key(样式排序)但value可不会进行一个排序,那在Reduce程序:
for (int i = 0; i < arrayList.size(); i++) {
for (int j = i + 1; j < arrayList.size(); j++) {
key.set(arrayList.get(i) + "@" + arrayList.get(j));
context.write(key,Style);
}
}
就很容易出现国家aaa@qq.com国家B:样式1 国家aaa@qq.com国家A:样式2,进入第二个MapReduce程序处理这个文本
2.第二个MapReduce程序
- Map程序:
**package com.Same.groupby.Style;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class StyleMapper extends Mapper<Text, Text, Text, Text> {
Text k = new Text();
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// aaa@qq.com A
// aaa@qq.com A
// aaa@qq.com A
// aaa@qq.com A
String str = key.toString();
String[] country = str.split("@");
int result = country[0].compareTo(country[1]);
String tmpstr;
if (result > 0) {
tmpstr = country[0];
country[0] = country[1];
country[1] = tmpstr;
}
k.set(country[0] + "@" + country[1]);
context.write(k, value);
}
}**
这块要对上一个MapReduce文本处理的结果aaa@qq.com类似数据进行一个分组,样式做聚合,有可能出现aaa@qq.com情况,使用比较字符串的方式使用compareTo方法,ASCll码小放在@字符的前面,ASCll码大则字符串放在@后面,就避免出现这个情况,视为相同的key进入同一个reduce task
2)Reduce程序:
**package com.Same.groupby.Style;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class StyleReduce extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder str = new StringBuilder();
for (Text value : values) {
str.append(value.toString() + "\t");
}
v.set(str.toString());
context.write(key, v);
}
}**
3)主程序:
package com.Same.groupby.Style;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class StyleDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"F:\\scala\\Workerhdfs\\OutputSame2\\part-r-00000", "F:\\scala\\Workerhdfs\\OutputSame3"};
//1.获取job对象
Configuration conf = new Configuration();
//设置切割符,如不设置默认是\t
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
Job job = Job.getInstance(conf);
//2.设置jar存储位置
job.setJarByClass(StyleDrive.class);
//3关联Map类与Reduce类
job.setMapperClass(StyleMapper.class);
job.setReducerClass(StyleReduce.class);
//4.设置Mapper阶段key与value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//5.设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
效果如下:
解释下:其实第二个MapReduce程序中Map处理aaa@qq.com,aaa@qq.com
这一类情况可以在第一个MapReduce程序中Reduce处理,但不建议这样做,
在reduce task中处理数据比map task更耗费资源,建议能在map task中完成的任务就放在map task中,否则考虑reduce task.
上一篇: Linux网络笔记二(UDP Socket 编程)
下一篇: ps把相片制作成视频动画效果