MR案例 之 重复数据处理
程序员文章站
2022-04-13 23:18:07
...
1 数据:
2 需求:
使用MR 算法 将相同 手机号 的数据整理到一个文本中,文本 名称为 手机号。
3 代码实现 :
package Test02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MultipleOutputTest {
// map 类
static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
// map 的 重写方法
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
// 数据 分割
String[] fields = value.toString().split("\t");
// 数据 输出
context.write(new Text(fields[1]), value);
}
}
//Reduce 类
static class MyReducer extends Reducer<Text,Text,NullWritable,Text>{
private MultipleOutputs<NullWritable, Text> multipleOutputs;
protected void setup(Context context){
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
// Reduce 重写方法
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException{
multipleOutputs.close();
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MultipleOutputTest.class);
job.setJobName("MultipleOutputTest");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("D:\\HDFS001\\src\\data_flow3.1.txt"));
FileOutputFormat.setOutputPath(job, new Path("G:\\phone"));
job.waitForCompletion(true);
}
}