欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce程序寻找两个用户之间共同好友

程序员文章站 2022-06-30 09:17:06
...

需求分析

从数据集中找出两个国家之间共同样式,这个其实跟我们两个用户之间寻找共同好友没有区别。测试数据集
MapReduce程序寻找两个用户之间共同好友

拿到这个数据进行分析:例如Australia:Cup Pack
对于Australia国家拥有的两个Cup Pack样式我要如何从其他国家找到这种样式呢?问题来了,想想我们MapReduce的核心就是分而治之,先分组,在聚合,既然这样我们用样式进行分组(与共同好友同理,判断一个用户是不是两个用户之间的共同好友,查看这个用户所有的好友列表,循环遍历,利用组合的方式,好友之间不同组合(这两好友不就拥有共同好友==这个用户),实现将样式做为Key,国家做为Value

1.第一个MapReduce程序:

  1. Map程序:
package com.Same.groupby.Like;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class SameMapper extends Mapper<Text, Text, Text, Text> {
    Text v = new Text();

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//        Australia:Cup	Pack
//        Bangladesh:Pack
//        Brazil:Cup	Pack


        //1.获取一行
        String line = value.toString();

        //2.切割
        String[] fields = line.split("\t");

        //3 .此value当作key,来写(相同的key同时进入reduce方法),那么key当作value写入
        for (String var : fields) {
            v.set(var);
            context.write(v, key);
        }
    }
}

这里我使用keyvalueinputformat(我们可以不以偏移量和行文本内容来作为数据源到MapTask的输入格式,而使用键值对的形式,使用KeyValueInputFormat就可以完成这种需求。) 进行Map端数据输入

2)Reduce程序:

package com.Same.groupby.Like;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class SameReduce extends Reducer<Text, Text, Text, Text> {
    Text key = new Text();

    @Override
    protected void reduce(Text Style, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //给出一个ArrayList集合用来存储地区(Country)
        ArrayList<String> arrayList = new ArrayList();


        for (Text value : values) {
            String var = value.toString();
            arrayList.add(var);
        }
        //接下来就是对arrayList集合进行遍历(排列组合,基本思想是我先拿出集合中第一个元素分别与集合中第二,
        // 第三....第n个元素进行组合,第二个元素又分别与集合中第三,第四....第n个元素,依次如此)
        for (int i = 0; i < arrayList.size(); i++) {
            for (int j = i + 1; j < arrayList.size(); j++) {
                key.set(arrayList.get(i) + "@" + arrayList.get(j));
                context.write(key,Style);
            }
        }
    }
}

3)主程序:

package com.Same.groupby.Like;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SameDrive {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"F:\\scala\\Workerhdfs\\OutputSame\\Same.txt", "F:\\scala\\Workerhdfs\\OutputSame2"};
        //1.获取job对象
        Configuration conf = new Configuration();
        //设置切割符,如不设置默认是\t
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, ":");
        Job job = Job.getInstance(conf);

        //2.设置jar存储位置
        job.setJarByClass(SameDrive.class);


        //3关联Map类与Reduce类
        job.setMapperClass(SameMapper.class);
        job.setReducerClass(SameReduce.class);


        //4.设置Mapper阶段key与value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //5.设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        job.setInputFormatClass(KeyValueTextInputFormat.class);


        //6.设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);


    }
}

效果如下:
MapReduce程序寻找两个用户之间共同好友

这样我们只能拿到两个国家之间的一个共同样式,并且key的不同(样式不同),不同key进入的reduce task不同,在同一个reduce task中不会出现国家aaa@qq.com国家B 国家aaa@qq.com国家A,但在不同reduce task会出现这种情况,出现这种情况的原因无非就是,假如,样式1:国家A,国家B,国家C…
样式2:国家B,国家A,国家C…Map阶段进入Reduce阶段默认会进行key(样式排序)但value可不会进行一个排序,那在Reduce程序:

for (int i = 0; i < arrayList.size(); i++) {
            for (int j = i + 1; j < arrayList.size(); j++) {
                key.set(arrayList.get(i) + "@" + arrayList.get(j));
                context.write(key,Style);
            }
        }

就很容易出现国家aaa@qq.com国家B:样式1 国家aaa@qq.com国家A:样式2,进入第二个MapReduce程序处理这个文本

2.第二个MapReduce程序

  1. Map程序:
**package com.Same.groupby.Style;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class StyleMapper extends Mapper<Text, Text, Text, Text> {
    Text k = new Text();
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//        aaa@qq.com	A
//        aaa@qq.com	A
//        aaa@qq.com	A
//        aaa@qq.com	A
        String str = key.toString();
        String[] country = str.split("@");
        int result = country[0].compareTo(country[1]);
        String tmpstr;
        if (result > 0) {
            tmpstr = country[0];
            country[0] = country[1];
            country[1] = tmpstr;
        }
        k.set(country[0] + "@" + country[1]);
        context.write(k, value);
    }
}**

这块要对上一个MapReduce文本处理的结果aaa@qq.com类似数据进行一个分组,样式做聚合,有可能出现aaa@qq.com情况,使用比较字符串的方式使用compareTo方法,ASCll码小放在@字符的前面,ASCll码大则字符串放在@后面,就避免出现这个情况,视为相同的key进入同一个reduce task
2)Reduce程序:

**package com.Same.groupby.Style;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class StyleReduce extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder str = new StringBuilder();
        for (Text value : values) {
            str.append(value.toString() + "\t");
        }
        v.set(str.toString());
        context.write(key, v);
    }
}**

3)主程序:

package com.Same.groupby.Style;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class StyleDrive {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"F:\\scala\\Workerhdfs\\OutputSame2\\part-r-00000", "F:\\scala\\Workerhdfs\\OutputSame3"};
        //1.获取job对象
        Configuration conf = new Configuration();
        //设置切割符,如不设置默认是\t
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
        Job job = Job.getInstance(conf);

        //2.设置jar存储位置
        job.setJarByClass(StyleDrive.class);


        //3关联Map类与Reduce类
        job.setMapperClass(StyleMapper.class);
        job.setReducerClass(StyleReduce.class);


        //4.设置Mapper阶段key与value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //5.设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        job.setInputFormatClass(KeyValueTextInputFormat.class);


        //6.设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

效果如下:
MapReduce程序寻找两个用户之间共同好友

解释下:其实第二个MapReduce程序中Map处理aaa@qq.com,aaa@qq.com
这一类情况可以在第一个MapReduce程序中Reduce处理,但不建议这样做,
在reduce task中处理数据比map task更耗费资源,建议能在map task中完成的任务就放在map task中,否则考虑reduce task.

相关标签: Mapreduce hadoop