手机自定义分区
程序员文章站
2022-07-13 15:53:16
...
Phionpartition
java
package phion;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.Arrays;
public class PhionPartition extends Partitioner<Text,NullWritable> {
public static final String[]YD={
"134","135","136",
"137","138","139",
"150","151","152",
"157","158","159",
"188","187","182",
"183","184","178",
"147","172","198"
};
//联通前三位
public static final String[]LT={
"130","131","132",
"145","155","156",
"166","171","175",
"176","185","186","166"
};
//电信前三位
public static final String[]DX={
"133","149","153",
"173","177","180",
"181","189","199"};
public int getPartition(Text text, NullWritable nullWritable, int i) {
String phone=text.toString();
String phone1=phone.substring(0,3);
if(Arrays.asList(YD).contains(phone1)){
return 0;
}else if(Arrays.asList(LT).contains(phone1)){
return 1;
}else{
return 2;
}
}
};
Mapper
java
package phion;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取数据
String line = value.toString();
//切分“\t”
String[] phions = line.split("\t");
for (String phion : phions){
context.write(new Text(phion), NullWritable.get());
}
}
}
Reduce
java
package phion;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PhionReduce extends Reducer<Text, NullWritable,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
Driver
java
package phion;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PhionDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
//实例化配置文件
Configuration configuration = new Configuration();
//定义一个job任务
Job job = Job.getInstance(configuration);
//配置job的信息
job.setJarByClass(Partitioner.class);
//指定自定义的mapper类以及mapper的输出数据类型到job
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//设置map预合并
job.setCombinerClass(PhionReduce.class);
//指定自定义的reduce以及reduce的输出数据类型(总输出的类型)到job
job.setReducerClass(PhionReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(3);
//配置输入数据的路径 查看继承树的快捷键 Ctrl+h
FileInputFormat.setInputPaths(job,new Path("G:\\a\\phone.txt"));
//配置输出数据的路径
FileOutputFormat.setOutputPath(job,new Path("G:\\a\\phone1.txt"));
//提交任务
job.waitForCompletion(true);
}
}