详解HDFS多文件Join操作的实例
程序员文章站
2024-04-01 22:50:10
详解hdfs多文件join操作的实例
最近在做hdfs文件处理之时,遇到了多文件join操作,其中包括:all join以及常用的left join操作,
下面是个...
详解hdfs多文件join操作的实例
最近在做hdfs文件处理之时,遇到了多文件join操作,其中包括:all join以及常用的left join操作,
下面是个简单的例子;采用两个表来做left join其中数据结构如下:
a 文件:
a|1b|2|c
b文件:
a|b|1|2|c
即:a文件中的第一、二列与b文件中的第一、三列对应;类似数据库中table的主键/外键
代码如下:
import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.contrib.utils.join.datajoinmapperbase; import org.apache.hadoop.contrib.utils.join.datajoinreducerbase; import org.apache.hadoop.contrib.utils.join.taggedmapoutput; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.io.writable; import org.apache.hadoop.mapred.fileinputformat; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.textinputformat; import org.apache.hadoop.mapred.textoutputformat; import org.apache.hadoop.util.reflectionutils; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import cn.eshore.traffic.hadoop.util.commutil; import cn.eshore.traffic.hadoop.util.stringutil; /** * @classname: datajoin * @description: hdfs join操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class installjoin extends configured implements tool { private string static ensplitcode = "\\|"; private string static splitcode = "|"; // 自定义reducer public static class reduceclass extends datajoinreducerbase { @override protected taggedmapoutput combine(object[] tags, object[] values) { string joinedstr = ""; //该段判断用户生成left join限制【其中tags表示文件的路径,install表示文件名称前缀】 //去掉则为all join if (tags.length == 1 && tags[0].tostring().contains("install")) { return null; } map<string, string> map = new hashmap<string, string>(); for (int i = 0; i < values.length; i++) { taggedwritable tw = (taggedwritable) values[i]; string line = ((text) tw.getdata()).tostring(); string[] tokens = line.split(ensplitcode, 8); string groupvalue = tokens[6]; string type = tokens[7]; map.put(type, groupvalue); } joinedstr += stringutil.getcount(map.get("7"))+"|"+stringutil.getcount(map.get("30")); taggedwritable retv = new taggedwritable(new text(joinedstr)); retv.settag((text) tags[0]); return retv; } } // 自定义mapper public static class mapclass extends datajoinmapperbase { //自定义key【类似数据库中的主键/外键】 @override protected text generategroupkey(taggedmapoutput arecord) { string line = ((text) arecord.getdata()).tostring(); string[] tokens = line.split(commutil.ensplitcode); string key = ""; string type = tokens[7]; //由于不同文件中的key所在列有可能不同,所以需要动态生成key,其中type为不同文件中的数据标识;如:a文件最后一列为a用于表示此数据为a文件数据 if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new text(key); } @override protected text generateinputtag(string inputfile) { return new text(inputfile); } @override protected taggedmapoutput generatetaggedmapoutput(object value) { taggedwritable retv = new taggedwritable((text) value); retv.settag(this.inputtag); return retv; } } public static class taggedwritable extends taggedmapoutput { private writable data; // 自定义 public taggedwritable() { this.tag = new text(""); } public taggedwritable(writable data) { this.tag = new text(""); this.data = data; } @override public writable getdata() { return data; } @override public void write(dataoutput out) throws ioexception { this.tag.write(out); out.writeutf(this.data.getclass().getname()); this.data.write(out); } @override public void readfields(datainput in) throws ioexception { this.tag.readfields(in); string dataclz = in.readutf(); if (this.data == null || !this.data.getclass().getname().equals(dataclz)) { try { this.data = (writable) reflectionutils.newinstance( class.forname(dataclz), null); } catch (classnotfoundexception e) { e.printstacktrace(); } } this.data.readfields(in); } } /** * job运行 */ @override public int run(string[] paths) throws exception { int no = 0; try { configuration conf = getconf(); jobconf job = new jobconf(conf, installjoin.class); fileinputformat.setinputpaths(job, new path(paths[0])); fileoutputformat.setoutputpath(job, new path(paths[1])); job.setjobname("join_data_test"); job.setmapperclass(mapclass.class); job.setreducerclass(reduceclass.class); job.setinputformat(textinputformat.class); job.setoutputformat(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(taggedwritable.class); job.set("mapred.textoutputformat.separator", commutil.splitcode); jobclient.runjob(job); no = 1; } catch (exception e) { throw new exception(); } return no; } //测试 public static void main(string[] args) { string[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = toolrunner.run(new configuration(), new installjoin(), paths); } catch (exception e) { e.printstacktrace(); } system.exit(res); } }
如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
上一篇: 浅谈对于DAO设计模式的理解