彷徨 | MapReduce实例二 | 去重复
程序员文章站
2022-04-15 14:10:03
...
原始数据
prefix phone province city isp post_code city_code area_code
130 1300000 山东 济南 联通 250000 0531 370100
130 1300001 江苏 常州 联通 213000 0519 320400
130 1300002 安徽 巢湖 联通 238000 0551 340181
130 1300003 四川 宜宾 联通 644000 0831 511500
130 1300004 四川 自贡 联通 643000 0813 510300
130 1300005 陕西 西安 联通 710000 029 610100
130 1300006 江苏 南京 联通 210000 025 320100
130 1300007 陕西 西安 联通 710000 029 610100
130 1300008 湖北 武汉 联通 430000 027 420100
130 1300009 陕西 西安 联通 710000 029 610100
130 1300010 北京 北京 联通 100000 010 110100
130 1300011 北京 北京 联通 100000 010 110100
130 1300012 天津 天津 联通 300000 022 120100
130 1300013 天津 天津 联通 300000 022 120100
去重后结果
130 联通
131 联通
132 联通
133 电信
134 移动
135 移动
136 移动
137 移动
138 移动
139 移动
145 联通
147 移动
代码实现
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DIS {
public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//跳过偏移量等于0的时候
if(key.get() != 0) {
String[] split = value.toString().split("\t");
String prefix = split[0];
String isp = split[4];
context.write(new Text(prefix), new Text(isp));
}
}
}
public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text text : value) {
context.write(key, text);
break;
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置map,reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(DIS.class);
//设置输入和输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出目录.指定为文件夹
FileInputFormat.addInputPath(job, new Path("E:/data/phone.txt"));
FileOutputFormat.setOutputPath(job, new Path("E:/data/out/phone"));
//判断文件是否存在
File file = new File("E:/data/out/phone");
if(file.exists()) {
FileUtils.deleteDirectory(file);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"优秀!!":"调Bug!!");
}
}
运行结果:
上一篇: 什么是哈希hash 算法
下一篇: 每日LeetCode——数组去重