欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

详解HDFS多文件Join操作的实例

程序员文章站 2024-04-01 22:50:10
详解hdfs多文件join操作的实例 最近在做hdfs文件处理之时,遇到了多文件join操作,其中包括:all join以及常用的left join操作, 下面是个...

详解hdfs多文件join操作的实例

最近在做hdfs文件处理之时,遇到了多文件join操作,其中包括:all join以及常用的left join操作,

下面是个简单的例子;采用两个表来做left join其中数据结构如下:

a 文件:

a|1b|2|c

b文件:

a|b|1|2|c

即:a文件中的第一、二列与b文件中的第一、三列对应;类似数据库中table的主键/外键

代码如下:

import java.io.datainput;
import java.io.dataoutput;
import java.io.ioexception;
import java.util.hashmap;
import java.util.map;


import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.conf.configured;
import org.apache.hadoop.contrib.utils.join.datajoinmapperbase;
import org.apache.hadoop.contrib.utils.join.datajoinreducerbase;
import org.apache.hadoop.contrib.utils.join.taggedmapoutput;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.text;
import org.apache.hadoop.io.writable;
import org.apache.hadoop.mapred.fileinputformat;
import org.apache.hadoop.mapred.fileoutputformat;
import org.apache.hadoop.mapred.jobclient;
import org.apache.hadoop.mapred.jobconf;
import org.apache.hadoop.mapred.textinputformat;
import org.apache.hadoop.mapred.textoutputformat;
import org.apache.hadoop.util.reflectionutils;
import org.apache.hadoop.util.tool;
import org.apache.hadoop.util.toolrunner;


import cn.eshore.traffic.hadoop.util.commutil;
import cn.eshore.traffic.hadoop.util.stringutil;




/**
 * @classname: datajoin
 * @description: hdfs join操作
 * @author hadoop
 * @date 2012-12-18 下午5:51:32
 */
public class installjoin extends configured implements tool {
private string static ensplitcode = "\\|";
private string static splitcode = "|";


// 自定义reducer
public static class reduceclass extends datajoinreducerbase {


@override
protected taggedmapoutput combine(object[] tags, object[] values) {
string joinedstr = "";
//该段判断用户生成left join限制【其中tags表示文件的路径,install表示文件名称前缀】
//去掉则为all join
if (tags.length == 1 && tags[0].tostring().contains("install")) {
return null;
}

map<string, string> map = new hashmap<string, string>();
for (int i = 0; i < values.length; i++) {
taggedwritable tw = (taggedwritable) values[i];
string line = ((text) tw.getdata()).tostring();


string[] tokens = line.split(ensplitcode, 8);
string groupvalue = tokens[6];

string type = tokens[7];

map.put(type, groupvalue);
}

joinedstr += stringutil.getcount(map.get("7"))+"|"+stringutil.getcount(map.get("30"));
taggedwritable retv = new taggedwritable(new text(joinedstr));
retv.settag((text) tags[0]);
return retv;
}
}


// 自定义mapper
public static class mapclass extends datajoinmapperbase {


//自定义key【类似数据库中的主键/外键】
@override
protected text generategroupkey(taggedmapoutput arecord) {
string line = ((text) arecord.getdata()).tostring();
string[] tokens = line.split(commutil.ensplitcode);


string key = "";
string type = tokens[7];
//由于不同文件中的key所在列有可能不同,所以需要动态生成key,其中type为不同文件中的数据标识;如:a文件最后一列为a用于表示此数据为a文件数据
if ("7".equals(type)) {
key = tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key = tokens[0]+"|"+tokens[2];
}
return new text(key);
}


@override
protected text generateinputtag(string inputfile) {
return new text(inputfile);
}


@override
protected taggedmapoutput generatetaggedmapoutput(object value) {
taggedwritable retv = new taggedwritable((text) value);
retv.settag(this.inputtag);
return retv;
}


}


public static class taggedwritable extends taggedmapoutput {


private writable data;


// 自定义
public taggedwritable() {
this.tag = new text("");
}


public taggedwritable(writable data) {
this.tag = new text("");
this.data = data;
}


@override
public writable getdata() {
return data;
}


@override
public void write(dataoutput out) throws ioexception {
this.tag.write(out);
out.writeutf(this.data.getclass().getname());
this.data.write(out);
}


@override
public void readfields(datainput in) throws ioexception {
this.tag.readfields(in);
string dataclz = in.readutf();
if (this.data == null
|| !this.data.getclass().getname().equals(dataclz)) {
try {
this.data = (writable) reflectionutils.newinstance(
class.forname(dataclz), null);
} catch (classnotfoundexception e) {
e.printstacktrace();
}
}
this.data.readfields(in);
}


}


/**
* job运行
*/
@override
public int run(string[] paths) throws exception {
int no = 0;
try {
configuration conf = getconf();
jobconf job = new jobconf(conf, installjoin.class);
fileinputformat.setinputpaths(job, new path(paths[0]));
fileoutputformat.setoutputpath(job, new path(paths[1]));
job.setjobname("join_data_test");
job.setmapperclass(mapclass.class);
job.setreducerclass(reduceclass.class);
job.setinputformat(textinputformat.class);
job.setoutputformat(textoutputformat.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(taggedwritable.class);
job.set("mapred.textoutputformat.separator", commutil.splitcode);
jobclient.runjob(job);
no = 1;
} catch (exception e) {
throw new exception();
}
return no;
}


//测试
public static void main(string[] args) {
string[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }

int res = 0;
try {
res = toolrunner.run(new configuration(), new installjoin(), paths);
} catch (exception e) {
e.printstacktrace();
}
system.exit(res);
}
}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!