欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

彷徨 | MapReduce实例二 | 去重复

程序员文章站 2022-04-15 14:10:03
...

原始数据

prefix	phone	province	city	isp	post_code	city_code	area_code
130	1300000	山东	济南	联通	250000	0531	370100
130	1300001	江苏	常州	联通	213000	0519	320400
130	1300002	安徽	巢湖	联通	238000	0551	340181
130	1300003	四川	宜宾	联通	644000	0831	511500
130	1300004	四川	自贡	联通	643000	0813	510300
130	1300005	陕西	西安	联通	710000	029	610100
130	1300006	江苏	南京	联通	210000	025	320100
130	1300007	陕西	西安	联通	710000	029	610100
130	1300008	湖北	武汉	联通	430000	027	420100
130	1300009	陕西	西安	联通	710000	029	610100
130	1300010	北京	北京	联通	100000	010	110100
130	1300011	北京	北京	联通	100000	010	110100
130	1300012	天津	天津	联通	300000	022	120100
130	1300013	天津	天津	联通	300000	022	120100

去重后结果

130	联通
131	联通
132	联通
133	电信
134	移动
135	移动
136	移动
137	移动
138	移动
139	移动
145	联通
147	移动

代码实现

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DIS {
	
	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//跳过偏移量等于0的时候
			if(key.get() != 0) {
				String[] split = value.toString().split("\t");
				String prefix = split[0];
				String isp = split[4];
				context.write(new Text(prefix), new Text(isp));
			}
		}
	}
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		
		@Override
		protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			for (Text text : value) {
				context.write(key, text);
				break;
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		//设置map,reduce,以及提交的jar
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(DIS.class);
		
		//设置输入和输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//设置输入和输出目录.指定为文件夹
		FileInputFormat.addInputPath(job, new Path("E:/data/phone.txt"));
		FileOutputFormat.setOutputPath(job, new Path("E:/data/out/phone"));
		
		//判断文件是否存在
		File file = new File("E:/data/out/phone");
		if(file.exists()) {
			FileUtils.deleteDirectory(file);
		}
		
		//提交任务
		boolean completion = job.waitForCompletion(true);
		System.out.println(completion?"优秀!!":"调Bug!!");
		
	}
}

运行结果:

彷徨 | MapReduce实例二 | 去重复

彷徨 | MapReduce实例二 | 去重复

相关标签: MapReduce 去重