Java执行hadoop的基本操作实例代码
程序员文章站
2024-02-28 20:53:16
java执行hadoop的基本操作实例代码
向hdfs上传本地文件
public static void uploadinputfile(string loca...
java执行hadoop的基本操作实例代码
向hdfs上传本地文件
public static void uploadinputfile(string localfile) throws ioexception{ configuration conf = new configuration(); string hdfspath = "hdfs://localhost:9000/"; string hdfsinput = "hdfs://localhost:9000/user/hadoop/input"; filesystem fs = filesystem.get(uri.create(hdfspath), conf); fs.copyfromlocalfile(new path(localfile), new path(hdfsinput)); fs.close(); system.out.println("已经上传文件到input文件夹啦"); }
将output文件下载到本地
public static void getoutput(string outputfile) throws ioexception{ string remotefile = "hdfs://localhost:9000/user/hadoop/output/part-r-00000"; path path = new path(remotefile); configuration conf = new configuration(); string hdfspath = "hdfs://localhost:9000/"; filesystem fs = filesystem.get(uri.create(hdfspath),conf); fs.copytolocalfile(path, new path(outputfile)); system.out.println("已经将输出文件保留到本地文件"); fs.close(); }
删除hdfs中的文件
public static void deleteoutput() throws ioexception{ configuration conf = new configuration(); string hdfsoutput = "hdfs://localhost:9000/user/hadoop/output"; string hdfspath = "hdfs://localhost:9000/"; path path = new path(hdfsoutput); filesystem fs = filesystem.get(uri.create(hdfspath), conf); fs.deleteonexit(path); fs.close(); system.out.println("output文件已经删除"); }
执行mapreduce程序
创建mapper类和reducer类
public static class tokenizermapper extends mapper<object, text, text, intwritable>{ private final static intwritable one = new intwritable(1); private text word = new text(); public void map(object key, text value, context context) throws ioexception, interruptedexception{ string line = value.tostring(); line = line.replace("\\", ""); string regex = "性别:</span><span class=\"pt_detail\">(.*?)</span>"; pattern pattern = pattern.compile(regex); matcher matcher = pattern.matcher(line); while(matcher.find()){ string term = matcher.group(1); word.set(term); context.write(word, one); } } } public static class intsumreducer extends reducer<text, intwritable, text, intwritable>{ private intwritable result = new intwritable(); public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception{ int sum = 0; for(intwritable val :values){ sum+= val.get(); } result.set(sum); context.write(key, result); } }
执行mapreduce程序
public static void runmapreduce(string[] args) throws exception { configuration conf = new configuration(); string[] otherargs = new genericoptionsparser(conf, args).getremainingargs(); if(otherargs.length != 2){ system.err.println("usage: wordcount<in> <out>"); system.exit(2); } job job = new job(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); fileinputformat.addinputpath(job, new path(otherargs[0])); fileoutputformat.setoutputpath(job, new path(otherargs[1])); system.out.println("mapreduce 执行完毕!"); system.exit(job.waitforcompletion(true)?0:1); }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
上一篇: Pjblog模板制作教程 超强推荐
下一篇: Python实现注册登录系统