Mapjoin和Reducejoin案例
一、mapjoin案例
1.需求:有两个文件,分别是订单表、商品表,
订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),
商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),
要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。
2.解决思路:
将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要reducer,并在driver执行类中设置setcachefile和numreducetask。
3.代码如下:
public class cachemapper extends mapper<longwritable, text, text, nullwritable>{
hashmap<string, string> pdmap = new hashmap<>();
//1.商品表加载到内存
protected void setup(context context) throws ioexception {
//加载缓存文件
bufferedreader br = new bufferedreader(new inputstreamreader(new fileinputstream("pd.txt"), "utf-8"));
string line;
while(stringutils.isnotempty(line = br.readline()) ) {
//切分
string[] fields = line.split("\t");
//缓存
pdmap.put(fields[0], fields[1]);
}
br.close();
}
//2.map传输
@override
protected void map(longwritable key, text value, mapper<longwritable, text, text, nullwritable>.context context)
throws ioexception, interruptedexception {
//获取数据
string line = value.tostring();
//切割
string[] fields = line.split("\t");
//获取订单中商品id
string pid = fields[1];
//根据订单商品id获取商品名
string pname = pdmap.get(pid);
//拼接数据
line = line + "\t" + pname;
//输出
context.write(new text(line), nullwritable.get());
}
}
public class cachedriver {
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception, urisyntaxexception {
// 1.获取job信息
configuration conf = new configuration();
job job = job.getinstance(conf);
// 2.获取jar包
job.setjarbyclass(cachedriver.class);
// 3.获取自定义的mapper与reducer类
job.setmapperclass(cachemapper.class);
// 5.设置reduce输出的数据类型(最终的数据类型)
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(nullwritable.class);
// 6.设置输入存在的路径与处理后的结果路径
fileinputformat.setinputpaths(job, new path("c://table1029//in"));
fileoutputformat.setoutputpath(job, new path("c://table1029//out"));
//加载缓存商品数据
job.addcachefile(new uri("file:///c:/inputcache/pd.txt"));
//设置一下reducetask的数量
job.setnumreducetasks(0);
// 7.提交任务
boolean rs = job.waitforcompletion(true);
system.out.println(rs ? 0 : 1);
}
}
二、reducejoin案例
1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。
2.解决思路:封装tablebean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),
使用mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到tablebean对象,最后以产品id为key、tablebean对象为value传输到reducer端;
reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个tablebean对象中,然后将订单表的中的数据读入到tablebean类型的arraylist对象中,然后将arraylist中的每个tablebean的商品id替换为商品名称,然后遍历该数组以tablebean为key输出。
3.代码如下:
/**
* @author: princesshug
* @date: 2019/3/30, 2:37
* @blog: https://www.cnblogs.com/hellobigtable/
*/
public class tablebean implements writable {
private string timestamp;
private string productid;
private string orderid;
private string productname;
private string flag;
public tablebean() {
}
public string gettimestamp() {
return timestamp;
}
public void settimestamp(string timestamp) {
this.timestamp = timestamp;
}
public string getproductid() {
return productid;
}
public void setproductid(string productid) {
this.productid = productid;
}
public string getorderid() {
return orderid;
}
public void setorderid(string orderid) {
this.orderid = orderid;
}
public string getproductname() {
return productname;
}
public void setproductname(string productname) {
this.productname = productname;
}
public string getflag() {
return flag;
}
public void setflag(string flag) {
this.flag = flag;
}
@override
public void write(dataoutput out) throws ioexception {
out.writeutf(timestamp);
out.writeutf(productid);
out.writeutf(orderid);
out.writeutf(productname);
out.writeutf(flag);
}
@override
public void readfields(datainput in) throws ioexception {
timestamp = in.readutf();
productid = in.readutf();
orderid = in.readutf();
productname = in.readutf();
flag = in.readutf();
}
@override
public string tostring() {
return timestamp + "\t" + productname + "\t" + orderid;
}
}
public class tablemapper extends mapper<longwritable, text,text,tablebean> {
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
//通过切片获取文件信息
filesplit split = (filesplit) context.getinputsplit();
string name = split.getpath().getname();
//获取一行数据、定义tablebean对象
string line = value.tostring();
tablebean tb = new tablebean();
text t = new text();
//判断是哪一张表
if (name.contains("order.txt")){
string[] fields = line.split("\t");
tb.settimestamp(fields[0]);
tb.setproductid(fields[1]);
tb.setorderid(fields[2]);
tb.setproductname("");
tb.setflag("0");
t.set(fields[1]);
}else {
string[] fields = line.split("\t");
tb.settimestamp("");
tb.setproductid(fields[0]);
tb.setorderid("");
tb.setproductname(fields[1]);
tb.setflag("1");
t.set(fields[0]);
}
context.write(t,tb);
}
}
public class tablereducer extends reducer<text,tablebean,tablebean, nullwritable> {
@override
protected void reduce(text key, iterable<tablebean> values, context context) throws ioexception, interruptedexception {
//分别创建用来存储订单表和产品表的集合
arraylist<tablebean> orderbean = new arraylist<>();
tablebean productbean = new tablebean();
//遍历values,通过flag判断是产品表还是订单表
for (tablebean v:values){
if (v.getflag().equals("0")){
tablebean tablebean = new tablebean();
try {
beanutils.copyproperties(tablebean,v);
} catch (illegalaccessexception e) {
e.printstacktrace();
} catch (invocationtargetexception e) {
e.printstacktrace();
}
orderbean.add(tablebean);
}else {
try {
beanutils.copyproperties(productbean,v);
} catch (illegalaccessexception e) {
e.printstacktrace();
} catch (invocationtargetexception e) {
e.printstacktrace();
}
}
}
//拼接表
for (tablebean ob:orderbean) {
ob.setproductname(productbean.getproductname());
context.write(ob,nullwritable.get());
}
}
}
public class tabledriver {
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
//job信息
configuration conf = new configuration();
job job = job.getinstance(conf);
//jar包
job.setjarbyclass(tabledriver.class);
//mapper、reducer
job.setmapperclass(tablemapper.class);
job.setreducerclass(tablereducer.class);
//mapper输出数据类型
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(tablebean.class);
//reducer输出数据类型
job.setoutputkeyclass(tablebean.class);
job.setoutputvalueclass(nullwritable.class);
//输入输出路径
fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\reducejoin\\in"));
fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\reducejoin\\out"));
//提交任务
if (job.waitforcompletion(true)){
system.out.println("运行完成!");
}else {
system.out.println("运行失败!");
}
}
}
上一篇: 使用淘宝IP库获取用户ip地理位置
下一篇: Python中对列表排序实例