Java/Web调用Hadoop进行MapReduce示例代码
程序员文章站
2023-12-13 19:56:46
hadoop环境搭建详见此文章。
我们已经知道hadoop能够通过hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封...
hadoop环境搭建详见此文章。
我们已经知道hadoop能够通过hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让java/web来调用它?使得用户可以用方便的方式上传文件到hadoop并进行处理,获得结果。首先,***.jar是一个hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它。input 和output则将用户上传的文件使用hadoop的javaapi put到hadoop的文件系统中。然后再通过hadoop的javaapi 从文件系统中取得结果文件。
搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 当然,这不是重点,就算没有使用任何框架也能实现。
项目框架如下:
项目中使用到的jar包如下:
在spring的配置文件中,加入
<bean id="multipartresolver" class="org.springframework.web.multipart.commons.commonsmultipartresolver"> <property name="defaultencoding" value="utf-8" /> <property name="maxuploadsize" value="10485760000" /> <property name="maxinmemorysize" value="40960" /> </bean>
使得项目支持文件上传。
新建一个login.jsp 点击登录后进入user/login
user/login中处理登录,登录成功后,【在hadoop文件系统中创建用户文件夹】,然后跳转到console.jsp
package com.chenjie.controller; import java.io.ioexception; import javax.annotation.resource; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletresponse; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import com.chenjie.pojo.jsonresult; import com.chenjie.pojo.user; import com.chenjie.service.userservice; import com.chenjie.util.appconfig; import com.google.gson.gson; /** * 用户请求控制器 * * @author chen * */ @controller // 声明当前类为控制器 @requestmapping("/user") // 声明当前类的路径 public class usercontroller { @resource(name = "userservice") private userservice userservice;// 由spring容器注入一个userservice实例 /** * 登录 * * @param user * 用户 * @param request * @param response * @throws ioexception */ @requestmapping("/login") // 声明当前方法的路径 public string login(user user, httpservletrequest request, httpservletresponse response) throws ioexception { response.setcontenttype("application/json");// 设置响应内容格式为json user result = userservice.login(user);// 调用userservice的登录方法 request.getsession().setattribute("user", result); if (result != null) { createhadoopfsfolder(result); return "console"; } return "login"; } public void createhadoopfsfolder(user user) throws ioexception { configuration conf = new configuration(); conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml")); conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); filesystem filesystem = filesystem.get(conf); system.out.println(filesystem.geturi()); path file = new path("/user/" + user.getu_username()); if (filesystem.exists(file)) { system.out.println("haddop hdfs user foler exists."); filesystem.delete(file, true); system.out.println("haddop hdfs user foler delete success."); } filesystem.mkdirs(file); system.out.println("haddop hdfs user foler creat success."); } }
console.jsp中进行文件上传和任务提交、
文件上传和任务提交:
package com.chenjie.controller; import java.io.file; import java.io.ioexception; import java.net.inetsocketaddress; import java.net.uri; import java.util.arraylist; import java.util.iterator; import java.util.list; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletresponse; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.jobid; import org.apache.hadoop.mapred.jobstatus; import org.apache.hadoop.mapred.runningjob; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.multipart.multipartfile; import org.springframework.web.multipart.multiparthttpservletrequest; import org.springframework.web.multipart.commons.commonsmultipartresolver; import com.chenjie.pojo.user; import com.chenjie.util.utils; @controller // 声明当前类为控制器 @requestmapping("/hadoop") // 声明当前类的路径 public class hadoopcontroller { @requestmapping("/upload") // 声明当前方法的路径 //文件上传 public string upload(httpservletrequest request, httpservletresponse response) throws ioexception { list<string> filelist = (list<string>) request.getsession() .getattribute("filelist");//得到用户已上传文件列表 if (filelist == null) filelist = new arraylist<string>();//如果文件列表为空,则新建 user user = (user) request.getsession().getattribute("user"); if (user == null) return "login";//如果用户未登录,则跳转登录页面 commonsmultipartresolver multipartresolver = new commonsmultipartresolver( request.getsession().getservletcontext());//得到在spring配置文件中注入的文件上传组件 if (multipartresolver.ismultipart(request)) {//如果请求是文件请求 multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request; iterator<string> iter = multirequest.getfilenames();//得到文件名迭代器 while (iter.hasnext()) { multipartfile file = multirequest.getfile((string) iter.next()); if (file != null) { string filename = file.getoriginalfilename(); file folder = new file("/home/chenjie/cjhadooponline/" + user.getu_username()); if (!folder.exists()) { folder.mkdir();//如果文件不目录存在,则在服务器本地创建 } string path = "/home/chenjie/cjhadooponline/" + user.getu_username() + "/" + filename; file localfile = new file(path); file.transferto(localfile);//将上传文件拷贝到服务器本地目录 // filelist.add(path); } handleuploadfiles(user, filelist);//处理上传文件 } } request.getsession().setattribute("filelist", filelist);//将上传文件列表保存在session中 return "console";//返回console.jsp继续上传文件 } @requestmapping("/wordcount") //调用hadoop进行mapreduce public void wordcount(httpservletrequest request, httpservletresponse response) { system.out.println("进入controller wordcount "); user user = (user) request.getsession().getattribute("user"); system.out.println(user); // if(user == null) // return "login"; wordcount c = new wordcount();//新建单词统计任务 string username = user.getu_username(); string input = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountinput";//指定hadoop文件系统的输入文件夹 string output = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountoutput";//指定hadoop文件系统的输出文件夹 string reslt = output + "/part-r-00000";//默认输出文件 try { thread.sleep(3*1000); c.main(new string[] { input, output });//调用单词统计任务 configuration conf = new configuration();//新建hadoop配置 conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加hadoop配置,找到hadoop部署信息 conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//hadoop配置,找到文件系统 filesystem filesystem = filesystem.get(conf);//得打文件系统 path file = new path(reslt);//找到输出结果文件 fsdatainputstream instream = filesystem.open(file);//打开 uri uri = file.touri();//得到输出文件路径 system.out.println(uri); string data = null; while ((data = instream.readline()) != null) { //system.out.println(data); response.getoutputstream().println(data);//讲结果文件写回用户网页 } // inputstream in = filesystem.open(file); // outputstream out = new fileoutputstream("result.txt"); // ioutils.copybytes(in, out, 4096, true); instream.close(); } catch (exception e) { system.err.println(e.getmessage()); } } @requestmapping("/mapreducestates") //得到mapreduce的状态 public void mapreduce(httpservletrequest request, httpservletresponse response) { float[] progress=new float[2]; try { configuration conf1=new configuration(); conf1.set("mapred.job.tracker", utils.jobtracker); jobstatus jobstatus = utils.getjobstatus(conf1); // while(!jobstatus.isjobcomplete()){ // progress = utils.getmapreduceprogess(jobstatus); // response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]); // thread.sleep(1000); // } jobconf jc = new jobconf(conf1); jobclient jobclient = new jobclient(jc); jobstatus[] jobsstatus = jobclient.getalljobs(); //这样就得到了一个jobstatus数组,随便取出一个元素取名叫jobstatus jobstatus = jobsstatus[0]; jobid jobid = jobstatus.getjobid(); //通过jobstatus获取jobid runningjob runningjob = jobclient.getjob(jobid); //通过jobid得到runningjob对象 runningjob.getjobstate();//可以获取作业状态,状态有五种,为jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded jobstatus.getusername();//可以获取运行作业的用户名。 runningjob.getjobname();//可以获取作业名。 jobstatus.getstarttime();//可以获取作业的开始时间,为utc毫秒数。 float map = runningjob.mapprogress();//可以获取map阶段完成的比例,0~1, system.out.println("map=" + map); float reduce = runningjob.reduceprogress();//可以获取reduce阶段完成的比例。 system.out.println("reduce="+reduce); runningjob.getfailureinfo();//可以获取失败信息。 runningjob.getcounters();//可以获取作业相关的计数器,计数器的内容和作业监控页面上看到的计数器的值一样。 } catch (ioexception e) { progress[0] = 0; progress[1] = 0; } request.getsession().setattribute("map", progress[0]); request.getsession().setattribute("reduce", progress[1]); } //处理文件上传 public void handleuploadfiles(user user, list<string> filelist) { file folder = new file("/home/chenjie/cjhadooponline/" + user.getu_username()); if (!folder.exists()) return; if (folder.isdirectory()) { file[] files = folder.listfiles(); for (file file : files) { system.out.println(file.getname()); try { putfiletohadoopfsfolder(user, file, filelist);//将单个文件上传到hadoop文件系统 } catch (ioexception e) { system.err.println(e.getmessage()); } } } } //将单个文件上传到hadoop文件系统 private void putfiletohadoopfsfolder(user user, file file, list<string> filelist) throws ioexception { configuration conf = new configuration(); conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml")); conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); filesystem filesystem = filesystem.get(conf); system.out.println(filesystem.geturi()); path localfile = new path(file.getabsolutepath()); path foler = new path("/user/" + user.getu_username() + "/wordcountinput"); if (!filesystem.exists(foler)) { filesystem.mkdirs(foler); } path hadoopfile = new path("/user/" + user.getu_username() + "/wordcountinput/" + file.getname()); // if (filesystem.exists(hadoopfile)) { // system.out.println("file exists."); // } else { // filesystem.mkdirs(hadoopfile); // } filesystem.copyfromlocalfile(true, true, localfile, hadoopfile); filelist.add(hadoopfile.touri().tostring()); } }
启动hadoop:
运行结果:
可以在任意平台下,登录该项目地址,上传文件,得到结果。
运行成功。
源代码:https://github.com/tudoupaisimalingshu/cjhadooponline
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。