MR实现join逻辑(本地hadoop与liunx调试)
一、文件创建
1.建立两个文件夹order.txt与pd.txt
order:
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
1003,20150710,P0003,3
pd:
P0001,小米5,1000,2
P0002,锤子T1,1000,3
P0003,小辣椒,1001,3
二、程序源码
RJoin源码:
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.bigdata.wc.WordcountDriver;
import cn.bigdata.wc.WordcountMapper;
import cn.bigdata.wc.WordcountReducer;
public class RJoin {
static class RJoinMapper extends Mapper<LongWritable, Text, Text, Infobean>
{
Infobean Bean =new Infobean();
Text k=new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
FileSplit inputSplit=(FileSplit) context.getInputSplit();
String name= inputSplit.getPath().getName();
//通过文件名判断是哪种数据
String pid ="";
if (name.startsWith("order")) {
String [] fields=line.split(",");
//id date pid amount
pid=fields[2];
Bean.set(Integer.parseInt(fields[0]), fields[1],pid , Integer.parseInt(fields[3]), "", 0,0, "0");
}else {
//id pname category_id price
String [] fields=line.split(",");
pid=fields[0];
Bean.set(0,"",pid,0,fields[1],Integer.parseInt(fields[2]),Float.parseFloat(fields[3]),"1");
}
k.set(pid);
context.write(k, Bean);
}
}
static class RJoinReducer extends Reducer<Text, Infobean,Infobean, NullWritable>
{
@Override
protected void reduce(Text pid, Iterable<Infobean> beans,Context context)
throws IOException, InterruptedException {
Infobean pdBean=new Infobean();
ArrayList<Infobean> orderBeans=new ArrayList<Infobean>();
//文件复制逻辑
for (Infobean bean : beans) {
if ("1".equals(bean.getFlag())) {
try {
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
Infobean odBean=new Infobean();
try {
BeanUtils.copyProperties(odBean, bean);
orderBeans.add(odBean);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//拼接两类数据,形成最终结果
for (Infobean bean : orderBeans) {
bean.setName(pdBean.getName());
bean.setCategory_id(pdBean.getCategory_id());
bean.setPrice(pdBean.getPrice());
context.write(bean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf =new Configuration();
Job job=Job.getInstance(conf);
//jar包路径
job.setJarByClass(RJoin.class);
//job.setJar("D:/Java/RJoin.jar");
//指定本业务job
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
//指定mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Infobean.class);
//指定最终输出的类型
job.setOutputKeyClass(Infobean.class);
job.setOutputValueClass(NullWritable.class);
//指定job的输入原始文件在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res =job.waitForCompletion(true);
System.exit(res?0:1);
}
}
Infobean:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Infobean implements Writable {
//定义参数
private int order_id;
private String dateString;
private String p_id;
private int amount;
private String name;
private int category_id;
private float price;
//flag=0,表示这个对象是封装订单表记录
//flag=1,表示这个对象是封装产品记录
private String flag;
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
//设置一个空参
public Infobean() { }
public void set(int order_id, String dateString, String p_id, int amount,
String name, int category_id, float price,String flag) {
this.order_id = order_id;
this.dateString = dateString;
this.p_id = p_id;
this.amount = amount;
this.name = name;
this.category_id = category_id;
this.price = price;
this.flag=flag;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public String getDateString() {
return dateString;
}
public void setDateString(String dateString) {
this.dateString = dateString;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCategory_id() {
return category_id;
}
public void setCategory_id(int category_id) {
this.category_id = category_id;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
/**
* private int order_id;
private String dateString;
private int p_id;
private int amount;
private String name;
private int category_id;
private float price;
*/
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(order_id);
out.writeUTF(dateString);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(name);
out.writeInt(category_id);
out.writeFloat(price);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.order_id=in.readInt();
this.dateString=in.readUTF();
this.p_id=in.readUTF ();
this.amount=in.readInt();
this.name=in.readUTF();
this.category_id=in.readInt();
this.price=in.readFloat();
this.flag=in.readUTF();
}
@Override
public String toString() {
return "order_id=" + order_id + ", dateString=" + dateString
+ ", p_id=" + p_id + ", amount=" + amount + ", name=" + name
+ ", category_id=" + category_id + ", price=" + price
+ ", flag=" + flag ;
}
}
源码参考:http://blog.csdn.net/yangzheng0515/article/details/78017941
三、运行程序
1.通过本地hadoop运行
首先在本地连接hadoop,通过hadoop-plugin插件来连接,在这里就不详细说明
run设置Run As—>Run Configurations
点击run as —>Run on hadoop
查看控制台输出结果(一部分):
2018-03-09 15:58:18,966 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2018-03-09 15:58:19,966 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local126275517_0001 completed successfully
2018-03-09 15:58:20,102 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38
运行成功!
2.通过liunx运行
将程序打成jar包,用rz命令将jar包上传到liunx中,运行命令 hadoop jar RJoin.jar /rjoin/input /rjoin/output5
ok!
上一篇: 02 - Python 安装
下一篇: ios动态库和静态库的区别