MapReduce对输入多文件的处理(MultipleInputs类、指定map)
程序员文章站
2022-05-28 11:42:05
...
MultipleInputs类指定不同的输入文件路径以及输入文化格式(两个文件,指定哪个map处理哪个文件)
现有两份数据
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
现在需要把user和phone按照phone number连接起来。得到下面的结果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路
还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值
join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序
本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位
1.对value实现JavaBean(实现writablecomparable接口)
-
package test.mr.multiinputs;
-
import java.io.DataInput;
-
import java.io.DataOutput;
-
import java.io.IOException;
-
import org.apache.hadoop.io.WritableComparable;
-
/*
-
* 自定义的JavaBean
-
*/
-
public class FlagString implements WritableComparable<FlagString> {
-
private String value;
-
private int flag; // 标记 0:表示phone表 1:表示user表
-
public FlagString() {
-
super();
-
// TODO Auto-generated constructor stub
-
}
-
public FlagString(String value, int flag) {
-
super();
-
this.value = value;
-
this.flag = flag;
-
}
-
public String getValue() {
-
return value;
-
}
-
public void setValue(String value) {
-
this.value = value;
-
}
-
public int getFlag() {
-
return flag;
-
}
-
public void setFlag(int flag) {
-
this.flag = flag;
-
}
-
@Override
-
public void write(DataOutput out) throws IOException {
-
out.writeInt(flag);
-
out.writeUTF(value);
-
}
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
this.flag = in.readInt();
-
this.value = in.readUTF();
-
}
-
@Override
-
public int compareTo(FlagString o) {
-
if (this.flag >= o.getFlag()) {
-
if (this.flag > o.getFlag()) {
-
return 1;
-
}
-
} else {
-
return -1;
-
}
-
return this.value.compareTo(o.getValue());
-
}
-
}
2.多map类,map1(实现对phone表文件操作)
-
package test.mr.multiinputs;
-
import java.io.IOException;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Mapper;
-
public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> {
-
private String delimiter; // 定义分隔符,由job端设置
-
@Override
-
protected void setup(
-
Mapper<LongWritable, Text, Text, FlagString>.Context context)
-
throws IOException, InterruptedException {
-
delimiter = context.getConfiguration().get("delimiter", ",");
-
}
-
@Override
-
protected void map(LongWritable key, Text value,
-
Mapper<LongWritable, Text, Text, FlagString>.Context context)
-
throws IOException, InterruptedException {
-
String line = value.toString().trim();
-
if (line.length() > 0) {
-
String[] str = line.split(delimiter);
-
if (str.length == 2) {
-
context.write(new Text(str[0].trim()),
-
new FlagString(str[1].trim(), 0)); // flag=0,表示phone表
-
}
-
}
-
}
-
}
2.map2(实现对user表文件操作)
-
package test.mr.multiinputs;
-
import java.io.IOException;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Mapper;
-
public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> {
-
private String delimiter; // 设置分隔符
-
@Override
-
protected void setup(
-
Mapper<LongWritable, Text, Text, FlagString>.Context context)
-
throws IOException, InterruptedException {
-
delimiter = context.getConfiguration().get("delimiter", ",");
-
}
-
@Override
-
protected void map(LongWritable key, Text value,
-
Mapper<LongWritable, Text, Text, FlagString>.Context context)
-
throws IOException, InterruptedException {
-
String line = value.toString().trim();
-
if (line.length() > 0) {
-
String[] str = line.split(delimiter);
-
if (str.length == 2) {
-
context.write(new Text(str[1].trim()),
-
new FlagString(str[0].trim(), 1)); // flag=1为user表
-
}
-
}
-
}
-
}
3.reduce类
-
package test.mr.multiinputs;
-
import java.io.IOException;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Reducer;
-
public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> {
-
private String delimiter; // 设置分隔符
-
@Override
-
protected void setup(
-
Reducer<Text, FlagString, NullWritable, Text>.Context context)
-
throws IOException, InterruptedException {
-
delimiter = context.getConfiguration().get("delimiter", ",");
-
}
-
@Override
-
protected void reduce(Text key, Iterable<FlagString> values,
-
Reducer<Text, FlagString, NullWritable, Text>.Context context)
-
throws IOException, InterruptedException {
-
// 最后输出的格式为: uservalue,key,phonevalue
-
String phoneValue = "";
-
String userValue = "";
-
int num = 0;
-
for (FlagString value : values) {
-
// 第一个即为phone表
-
if (num == 0) {
-
phoneValue = value.getValue();
-
num++;
-
} else {
-
userValue = value.getValue();
-
context.write(NullWritable.get(),
-
new Text(userValue + key.toString() + phoneValue));
-
}
-
}
-
}
-
}
4.job类(关键!!实现多文件的输入格式等)
-
package test.mr.multiinputs;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.Tool;
-
import org.apache.hadoop.util.ToolRunner;
-
/*
-
* MultipleInputs类指定不同的输入文件路径以及输入文化格式
-
现有两份数据
-
phone
-
123,good number
-
124,common number
-
123,bad number
-
-
user
-
zhangsan,123
-
lisi,124
-
wangwu,125
-
-
现在需要把user和phone按照phone number连接起来。得到下面的结果
-
zhangsan,123,good number
-
lisi,123,common number
-
wangwu,125,bad number
-
*/
-
public class MultiMapMain extends Configuration implements Tool {
-
private String input1 = null; // 定义的多个输入文件
-
private String input2 = null;
-
private String output = null;
-
private String delimiter = null;
-
@Override
-
public void setConf(Configuration conf) {
-
}
-
@Override
-
public Configuration getConf() {
-
return new Configuration();
-
}
-
@Override
-
public int run(String[] args) throws Exception {
-
setArgs(args);
-
checkParam();// 对参数进行检测
-
Configuration conf = new Configuration();
-
Job job = new Job(conf);
-
job.setJarByClass(MultiMapMain.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(FlagString.class);
-
job.setReducerClass(MultiRedu.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
// MultipleInputs类添加文件路径
-
MultipleInputs.addInputPath(job, new Path(input1),
-
TextInputFormat.class, MultiMap1.class);
-
MultipleInputs.addInputPath(job, new Path(input2),
-
TextInputFormat.class, MultiMap2.class);
-
FileOutputFormat.setOutputPath(job, new Path(output));
-
job.waitForCompletion(true);
-
return 0;
-
}
-
private void checkParam() {
-
if (input1 == null || "".equals(input1.trim())) {
-
System.out.println("no input phone-data path");
-
userMaunel();
-
System.exit(-1);
-
}
-
if (input2 == null || "".equals(input2.trim())) {
-
System.out.println("no input user-data path");
-
userMaunel();
-
System.exit(-1);
-
}
-
if (output == null || "".equals(output.trim())) {
-
System.out.println("no output path");
-
userMaunel();
-
System.exit(-1);
-
}
-
if (delimiter == null || "".equals(delimiter.trim())) {
-
System.out.println("no delimiter");
-
userMaunel();
-
System.exit(-1);
-
}
-
}
-
// 用户手册
-
private void userMaunel() {
-
System.err.println("Usage:");
-
System.err.println("-i1 input \t phone data path.");
-
System.err.println("-i2 input \t user data path.");
-
System.err.println("-o output \t output data path.");
-
System.err.println("-delimiter data delimiter \t default comma.");
-
}
-
// 对属性进行赋值
-
// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符)
-
private void setArgs(String[] args) {
-
for (int i = 0; i < args.length; i++) {
-
if ("-i1".equals(args[i])) {
-
input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
-
} else if ("-i2".equals(args[i])) {
-
input2 = args[++i];
-
} else if ("-o".equals(args[i])) {
-
output = args[++i];
-
} else if ("-delimiter".equals(args[i])) {
-
delimiter = args[++i];
-
}
-
}
-
}
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
ToolRunner.run(conf, new MultiMapMain(), args); // 调用run方法
-
}
-
}