欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce对输入多文件的处理(MultipleInputs类、指定map)

程序员文章站 2022-05-28 11:42:05
...

MultipleInputs类指定不同的输入文件路径以及输入文化格式(两个文件,指定哪个map处理哪个文件)
 现有两份数据
 phone
 123,good number
 124,common number
 125,bad number

 user
 zhangsan,123
 lisi,124
 wangwu,125

 现在需要把user和phone按照phone number连接起来。得到下面的结果
 zhangsan,123,good number
 lisi,123,common number
 wangwu,125,bad number

 

分析思路

还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值

join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序

本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位

 

1.对value实现JavaBean(实现writablecomparable接口)

  1. package test.mr.multiinputs;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. /*
  7. * 自定义的JavaBean
  8. */
  9. public class FlagString implements WritableComparable<FlagString> {
  10. private String value;
  11. private int flag; // 标记 0:表示phone表 1:表示user表
  12. public FlagString() {
  13. super();
  14. // TODO Auto-generated constructor stub
  15. }
  16. public FlagString(String value, int flag) {
  17. super();
  18. this.value = value;
  19. this.flag = flag;
  20. }
  21. public String getValue() {
  22. return value;
  23. }
  24. public void setValue(String value) {
  25. this.value = value;
  26. }
  27. public int getFlag() {
  28. return flag;
  29. }
  30. public void setFlag(int flag) {
  31. this.flag = flag;
  32. }
  33. @Override
  34. public void write(DataOutput out) throws IOException {
  35. out.writeInt(flag);
  36. out.writeUTF(value);
  37. }
  38. @Override
  39. public void readFields(DataInput in) throws IOException {
  40. this.flag = in.readInt();
  41. this.value = in.readUTF();
  42. }
  43. @Override
  44. public int compareTo(FlagString o) {
  45. if (this.flag >= o.getFlag()) {
  46. if (this.flag > o.getFlag()) {
  47. return 1;
  48. }
  49. } else {
  50. return -1;
  51. }
  52. return this.value.compareTo(o.getValue());
  53. }
  54. }


2.多map类,map1(实现对phone表文件操作)

  1. package test.mr.multiinputs;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> {
  7. private String delimiter; // 定义分隔符,由job端设置
  8. @Override
  9. protected void setup(
  10. Mapper<LongWritable, Text, Text, FlagString>.Context context)
  11. throws IOException, InterruptedException {
  12. delimiter = context.getConfiguration().get("delimiter", ",");
  13. }
  14. @Override
  15. protected void map(LongWritable key, Text value,
  16. Mapper<LongWritable, Text, Text, FlagString>.Context context)
  17. throws IOException, InterruptedException {
  18. String line = value.toString().trim();
  19. if (line.length() > 0) {
  20. String[] str = line.split(delimiter);
  21. if (str.length == 2) {
  22. context.write(new Text(str[0].trim()),
  23. new FlagString(str[1].trim(), 0)); // flag=0,表示phone表
  24. }
  25. }
  26. }
  27. }


2.map2(实现对user表文件操作)

  1. package test.mr.multiinputs;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> {
  7. private String delimiter; // 设置分隔符
  8. @Override
  9. protected void setup(
  10. Mapper<LongWritable, Text, Text, FlagString>.Context context)
  11. throws IOException, InterruptedException {
  12. delimiter = context.getConfiguration().get("delimiter", ",");
  13. }
  14. @Override
  15. protected void map(LongWritable key, Text value,
  16. Mapper<LongWritable, Text, Text, FlagString>.Context context)
  17. throws IOException, InterruptedException {
  18. String line = value.toString().trim();
  19. if (line.length() > 0) {
  20. String[] str = line.split(delimiter);
  21. if (str.length == 2) {
  22. context.write(new Text(str[1].trim()),
  23. new FlagString(str[0].trim(), 1)); // flag=1为user表
  24. }
  25. }
  26. }
  27. }


3.reduce类

  1. package test.mr.multiinputs;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> {
  7. private String delimiter; // 设置分隔符
  8. @Override
  9. protected void setup(
  10. Reducer<Text, FlagString, NullWritable, Text>.Context context)
  11. throws IOException, InterruptedException {
  12. delimiter = context.getConfiguration().get("delimiter", ",");
  13. }
  14. @Override
  15. protected void reduce(Text key, Iterable<FlagString> values,
  16. Reducer<Text, FlagString, NullWritable, Text>.Context context)
  17. throws IOException, InterruptedException {
  18. // 最后输出的格式为: uservalue,key,phonevalue
  19. String phoneValue = "";
  20. String userValue = "";
  21. int num = 0;
  22. for (FlagString value : values) {
  23. // 第一个即为phone表
  24. if (num == 0) {
  25. phoneValue = value.getValue();
  26. num++;
  27. } else {
  28. userValue = value.getValue();
  29. context.write(NullWritable.get(),
  30. new Text(userValue + key.toString() + phoneValue));
  31. }
  32. }
  33. }
  34. }


4.job类(关键!!实现多文件的输入格式等)

  1. package test.mr.multiinputs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.util.Tool;
  11. import org.apache.hadoop.util.ToolRunner;
  12. /*
  13. * MultipleInputs类指定不同的输入文件路径以及输入文化格式
  14. 现有两份数据
  15. phone
  16. 123,good number
  17. 124,common number
  18. 123,bad number
  19. user
  20. zhangsan,123
  21. lisi,124
  22. wangwu,125
  23. 现在需要把user和phone按照phone number连接起来。得到下面的结果
  24. zhangsan,123,good number
  25. lisi,123,common number
  26. wangwu,125,bad number
  27. */
  28. public class MultiMapMain extends Configuration implements Tool {
  29. private String input1 = null; // 定义的多个输入文件
  30. private String input2 = null;
  31. private String output = null;
  32. private String delimiter = null;
  33. @Override
  34. public void setConf(Configuration conf) {
  35. }
  36. @Override
  37. public Configuration getConf() {
  38. return new Configuration();
  39. }
  40. @Override
  41. public int run(String[] args) throws Exception {
  42. setArgs(args);
  43. checkParam();// 对参数进行检测
  44. Configuration conf = new Configuration();
  45. Job job = new Job(conf);
  46. job.setJarByClass(MultiMapMain.class);
  47. job.setMapOutputKeyClass(Text.class);
  48. job.setMapOutputValueClass(FlagString.class);
  49. job.setReducerClass(MultiRedu.class);
  50. job.setOutputKeyClass(NullWritable.class);
  51. job.setOutputValueClass(Text.class);
  52. // MultipleInputs类添加文件路径
  53. MultipleInputs.addInputPath(job, new Path(input1),
  54. TextInputFormat.class, MultiMap1.class);
  55. MultipleInputs.addInputPath(job, new Path(input2),
  56. TextInputFormat.class, MultiMap2.class);
  57. FileOutputFormat.setOutputPath(job, new Path(output));
  58. job.waitForCompletion(true);
  59. return 0;
  60. }
  61. private void checkParam() {
  62. if (input1 == null || "".equals(input1.trim())) {
  63. System.out.println("no input phone-data path");
  64. userMaunel();
  65. System.exit(-1);
  66. }
  67. if (input2 == null || "".equals(input2.trim())) {
  68. System.out.println("no input user-data path");
  69. userMaunel();
  70. System.exit(-1);
  71. }
  72. if (output == null || "".equals(output.trim())) {
  73. System.out.println("no output path");
  74. userMaunel();
  75. System.exit(-1);
  76. }
  77. if (delimiter == null || "".equals(delimiter.trim())) {
  78. System.out.println("no delimiter");
  79. userMaunel();
  80. System.exit(-1);
  81. }
  82. }
  83. // 用户手册
  84. private void userMaunel() {
  85. System.err.println("Usage:");
  86. System.err.println("-i1 input \t phone data path.");
  87. System.err.println("-i2 input \t user data path.");
  88. System.err.println("-o output \t output data path.");
  89. System.err.println("-delimiter data delimiter \t default comma.");
  90. }
  91. // 对属性进行赋值
  92. // 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符)
  93. private void setArgs(String[] args) {
  94. for (int i = 0; i < args.length; i++) {
  95. if ("-i1".equals(args[i])) {
  96. input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
  97. } else if ("-i2".equals(args[i])) {
  98. input2 = args[++i];
  99. } else if ("-o".equals(args[i])) {
  100. output = args[++i];
  101. } else if ("-delimiter".equals(args[i])) {
  102. delimiter = args[++i];
  103. }
  104. }
  105. }
  106. public static void main(String[] args) throws Exception {
  107. Configuration conf = new Configuration();
  108. ToolRunner.run(conf, new MultiMapMain(), args); // 调用run方法
  109. }
  110. }