欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Mapjoin和Reducejoin案例

程序员文章站 2023-11-16 11:47:22
一、Mapjoin案例 1.需求:有两个文件,分别是订单表、商品表, 订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表), 商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存), 要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。 2.解决思 ......

一、mapjoin案例

  1.需求:有两个文件,分别是订单表、商品表,

  订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),

  商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),

  要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。

  2.解决思路:

  将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要reducer,并在driver执行类中设置setcachefile和numreducetask。

  3.代码如下:

public class cachemapper extends mapper<longwritable, text, text, nullwritable>{
	
	hashmap<string, string> pdmap = new hashmap<>();
	//1.商品表加载到内存
	protected void setup(context context) throws ioexception {
		
		//加载缓存文件
		bufferedreader br = new bufferedreader(new inputstreamreader(new fileinputstream("pd.txt"), "utf-8"));
		
		string line;
		
		while(stringutils.isnotempty(line = br.readline()) ) {
			
			//切分
			string[] fields = line.split("\t");
			
			//缓存
			pdmap.put(fields[0], fields[1]);
			
		}
		
		br.close();
	
	}
		
		
		
	//2.map传输
	@override
	protected void map(longwritable key, text value, mapper<longwritable, text, text, nullwritable>.context context)
			throws ioexception, interruptedexception {
		//获取数据
		string line = value.tostring();
		
		//切割
		string[] fields = line.split("\t");
		
		//获取订单中商品id
		string pid = fields[1];
		
		//根据订单商品id获取商品名
		string pname = pdmap.get(pid);
		
		//拼接数据
		line = line + "\t" + pname;
		
		//输出
		context.write(new text(line), nullwritable.get());
	}
}

public class cachedriver {
	public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception, urisyntaxexception {
		// 1.获取job信息
		configuration conf = new configuration();
		job job = job.getinstance(conf);

		// 2.获取jar包
		job.setjarbyclass(cachedriver.class);

		// 3.获取自定义的mapper与reducer类
		job.setmapperclass(cachemapper.class);

		// 5.设置reduce输出的数据类型(最终的数据类型)
		job.setoutputkeyclass(text.class);
		job.setoutputvalueclass(nullwritable.class);

		// 6.设置输入存在的路径与处理后的结果路径
		fileinputformat.setinputpaths(job, new path("c://table1029//in"));
		fileoutputformat.setoutputpath(job, new path("c://table1029//out"));
		
		//加载缓存商品数据
		job.addcachefile(new uri("file:///c:/inputcache/pd.txt"));
		
		//设置一下reducetask的数量
		job.setnumreducetasks(0);

		// 7.提交任务
		boolean rs = job.waitforcompletion(true);
		system.out.println(rs ? 0 : 1);
	}
}

  

二、reducejoin案例

  1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。

  2.解决思路:封装tablebean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),

    使用mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到tablebean对象,最后以产品id为key、tablebean对象为value传输到reducer端;

    reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个tablebean对象中,然后将订单表的中的数据读入到tablebean类型的arraylist对象中,然后将arraylist中的每个tablebean的商品id替换为商品名称,然后遍历该数组以tablebean为key输出。

  3.代码如下:

/**
 * @author: princesshug
 * @date: 2019/3/30, 2:37
 * @blog: https://www.cnblogs.com/hellobigtable/
 */
public class tablebean implements writable {
    private string timestamp;
    private string productid;
    private string orderid;
    private string productname;
    private string flag;

    public tablebean() {
    }

    public string gettimestamp() {
        return timestamp;
    }

    public void settimestamp(string timestamp) {
        this.timestamp = timestamp;
    }

    public string getproductid() {
        return productid;
    }

    public void setproductid(string productid) {
        this.productid = productid;
    }

    public string getorderid() {
        return orderid;
    }

    public void setorderid(string orderid) {
        this.orderid = orderid;
    }

    public string getproductname() {
        return productname;
    }

    public void setproductname(string productname) {
        this.productname = productname;
    }

    public string getflag() {
        return flag;
    }

    public void setflag(string flag) {
        this.flag = flag;
    }

    @override
    public void write(dataoutput out) throws ioexception {
        out.writeutf(timestamp);
        out.writeutf(productid);
        out.writeutf(orderid);
        out.writeutf(productname);
        out.writeutf(flag);
    }

    @override
    public void readfields(datainput in) throws ioexception {
        timestamp = in.readutf();
        productid = in.readutf();
        orderid = in.readutf();
        productname = in.readutf();
        flag = in.readutf();
    }

    @override
    public string tostring() {
        return timestamp + "\t" + productname + "\t" + orderid;
    }
}


public class tablemapper extends mapper<longwritable, text,text,tablebean> {
    @override
    protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        //通过切片获取文件信息
        filesplit split = (filesplit) context.getinputsplit();
        string name = split.getpath().getname();

        //获取一行数据、定义tablebean对象
        string line = value.tostring();
        tablebean tb = new tablebean();
        text t = new text();

        //判断是哪一张表
        if (name.contains("order.txt")){
            string[] fields = line.split("\t");
            tb.settimestamp(fields[0]);
            tb.setproductid(fields[1]);
            tb.setorderid(fields[2]);
            tb.setproductname("");
            tb.setflag("0");
            t.set(fields[1]);
        }else {
            string[] fields = line.split("\t");
            tb.settimestamp("");
            tb.setproductid(fields[0]);
            tb.setorderid("");
            tb.setproductname(fields[1]);
            tb.setflag("1");
            t.set(fields[0]);
        }
        context.write(t,tb);
    }
}

public class tablereducer extends reducer<text,tablebean,tablebean, nullwritable> {
    @override
    protected void reduce(text key, iterable<tablebean> values, context context) throws ioexception, interruptedexception {
        //分别创建用来存储订单表和产品表的集合
        arraylist<tablebean> orderbean = new arraylist<>();
        tablebean productbean = new tablebean();

        //遍历values,通过flag判断是产品表还是订单表
        for (tablebean v:values){
            if (v.getflag().equals("0")){
                tablebean tablebean = new tablebean();
                try {
                    beanutils.copyproperties(tablebean,v);
                } catch (illegalaccessexception e) {
                    e.printstacktrace();
                } catch (invocationtargetexception e) {
                    e.printstacktrace();
                }
                orderbean.add(tablebean);
            }else {
                try {
                    beanutils.copyproperties(productbean,v);
                } catch (illegalaccessexception e) {
                    e.printstacktrace();
                } catch (invocationtargetexception e) {
                    e.printstacktrace();
                }
            }
        }
        //拼接表
        for (tablebean ob:orderbean) {
            ob.setproductname(productbean.getproductname());
            context.write(ob,nullwritable.get());
        }
    }
}

public class tabledriver {
    public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
        //job信息
        configuration conf = new configuration();
        job job = job.getinstance(conf);

        //jar包
        job.setjarbyclass(tabledriver.class);

        //mapper、reducer
        job.setmapperclass(tablemapper.class);
        job.setreducerclass(tablereducer.class);

        //mapper输出数据类型
        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(tablebean.class);

        //reducer输出数据类型
        job.setoutputkeyclass(tablebean.class);
        job.setoutputvalueclass(nullwritable.class);

        //输入输出路径
        fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\reducejoin\\in"));
        fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\reducejoin\\out"));

        //提交任务
        if (job.waitforcompletion(true)){
            system.out.println("运行完成!");
        }else {
            system.out.println("运行失败!");
        }
    }
}