欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MR实现join逻辑(本地hadoop与liunx调试)

程序员文章站 2022-03-08 07:54:25
...

一、文件创建
1.建立两个文件夹order.txt与pd.txt
order:
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
1003,20150710,P0003,3
pd:
P0001,小米5,1000,2
P0002,锤子T1,1000,3
P0003,小辣椒,1001,3
二、程序源码
RJoin源码:

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.bigdata.wc.WordcountDriver;
import cn.bigdata.wc.WordcountMapper;
import cn.bigdata.wc.WordcountReducer;

public class RJoin {
           static class RJoinMapper extends Mapper<LongWritable, Text, Text, Infobean>
           {
               Infobean Bean =new Infobean();
               Text k=new Text();
               @Override
            protected void map(LongWritable key, Text value,Context context)
                    throws IOException, InterruptedException {
                   String  line=value.toString();


     FileSplit   inputSplit=(FileSplit) context.getInputSplit();
                  String name= inputSplit.getPath().getName();
                  //通过文件名判断是哪种数据
                 String pid ="";
                  if (name.startsWith("order")) {
                    String [] fields=line.split(",");
                    //id    date    pid amount 
                    pid=fields[2];
                    Bean.set(Integer.parseInt(fields[0]), fields[1],pid , Integer.parseInt(fields[3]), "", 0,0, "0");  
                }else {
                    //id    pname   category_id price
                    String [] fields=line.split(",");
                    pid=fields[0];
                    Bean.set(0,"",pid,0,fields[1],Integer.parseInt(fields[2]),Float.parseFloat(fields[3]),"1");  
                }
                  k.set(pid);
            context.write(k, Bean);   
            }

           }
           static class RJoinReducer extends Reducer<Text, Infobean,Infobean, NullWritable>
           {
                @Override
                protected void reduce(Text pid, Iterable<Infobean> beans,Context context)
                        throws IOException, InterruptedException {
                    Infobean pdBean=new Infobean();
                    ArrayList<Infobean> orderBeans=new ArrayList<Infobean>();
                    //文件复制逻辑
        for (Infobean bean : beans) {
            if ("1".equals(bean.getFlag())) {
                try {
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }   
            }else {
                Infobean odBean=new Infobean();
                try {
                    BeanUtils.copyProperties(odBean, bean);
                    orderBeans.add(odBean);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

        }
        //拼接两类数据,形成最终结果
        for (Infobean bean : orderBeans) {
            bean.setName(pdBean.getName());
            bean.setCategory_id(pdBean.getCategory_id());
            bean.setPrice(pdBean.getPrice());

            context.write(bean, NullWritable.get());
        }

                }

           }
           public static void main(String[] args) throws Exception {
               Configuration  conf =new Configuration();


                Job  job=Job.getInstance(conf);
                //jar包路径
                job.setJarByClass(RJoin.class);
                //job.setJar("D:/Java/RJoin.jar");

                //指定本业务job
                job.setMapperClass(RJoinMapper.class);
                job.setReducerClass(RJoinReducer.class);
                //指定mapper输出的kv类型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Infobean.class);

                //指定最终输出的类型
                job.setOutputKeyClass(Infobean.class);
                job.setOutputValueClass(NullWritable.class);


                //指定job的输入原始文件在目录
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                 //指定job的输出结果目录
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                boolean res =job.waitForCompletion(true);
                System.exit(res?0:1);
        }
}

Infobean:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Infobean implements Writable {
   //定义参数
    private int order_id;
   private  String dateString;
   private String p_id;
   private int amount;
   private String name;
   private int category_id;
   private float price;
    //flag=0,表示这个对象是封装订单表记录
   //flag=1,表示这个对象是封装产品记录
   private String  flag;
    public String getFlag() {
    return flag;
}

public void setFlag(String flag) {
    this.flag = flag;
}

    //设置一个空参
    public Infobean() { }

    public void set(int order_id, String dateString, String p_id, int amount,
            String name, int category_id, float price,String flag) {
        this.order_id = order_id;
        this.dateString = dateString;
        this.p_id = p_id;
        this.amount = amount;
        this.name = name;
        this.category_id = category_id;
        this.price = price;
        this.flag=flag;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public String getDateString() {
        return dateString;
    }

    public void setDateString(String dateString) {
        this.dateString = dateString;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCategory_id() {
        return category_id;
    }

    public void setCategory_id(int category_id) {
        this.category_id = category_id;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    /**
     *    private int order_id;
   private  String dateString;
   private int p_id;
   private int amount;
   private String name;
   private int category_id;
   private float price;

     */
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeInt(order_id);
        out.writeUTF(dateString);
        out.writeUTF(p_id);
        out.writeInt(amount);
        out.writeUTF(name);
        out.writeInt(category_id);
        out.writeFloat(price);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        this.order_id=in.readInt();
        this.dateString=in.readUTF();
        this.p_id=in.readUTF ();
        this.amount=in.readInt();
        this.name=in.readUTF();
        this.category_id=in.readInt();
        this.price=in.readFloat();
        this.flag=in.readUTF();
    }

    @Override
    public String toString() {
        return "order_id=" + order_id + ", dateString=" + dateString
                + ", p_id=" + p_id + ", amount=" + amount + ", name=" + name
                + ", category_id=" + category_id + ", price=" + price
                + ", flag=" + flag  ;
    }

}

源码参考:http://blog.csdn.net/yangzheng0515/article/details/78017941
三、运行程序
1.通过本地hadoop运行
首先在本地连接hadoop,通过hadoop-plugin插件来连接,在这里就不详细说明
run设置Run As—>Run Configurations

MR实现join逻辑(本地hadoop与liunx调试)
点击run as —>Run on hadoop

查看控制台输出结果(一部分):
2018-03-09 15:58:18,966 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2018-03-09 15:58:19,966 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local126275517_0001 completed successfully
2018-03-09 15:58:20,102 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38
MR实现join逻辑(本地hadoop与liunx调试)
运行成功!
2.通过liunx运行
将程序打成jar包,用rz命令将jar包上传到liunx中,运行命令 hadoop jar RJoin.jar /rjoin/input /rjoin/output5
MR实现join逻辑(本地hadoop与liunx调试)
ok!

相关标签: mr实现join逻辑