您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Java/Web如何調用Hadoop進行MapReduce”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Java/Web如何調用Hadoop進行MapReduce”這篇文章吧。
Hadoop環境搭建詳見此文章https://www.jb51.net/article/33649.htm。
我們已經知道Hadoop能夠通過Hadoop jar ***.jar input output的形式通過命令行來調用,那么如何將其封裝成一個服務,讓Java/Web來調用它?使得用戶可以用方便的方式上傳文件到Hadoop并進行處理,獲得結果。首先,***.jar是一個Hadoop任務類的封裝,我們可以在沒有jar的情況下運行該類的main方法,將必要的參數傳遞給它。input 和output則將用戶上傳的文件使用Hadoop的JavaAPI put到Hadoop的文件系統中。然后再通過Hadoop的JavaAPI 從文件系統中取得結果文件。
搭建JavaWeb工程。本文使用Spring、SpringMVC、MyBatis框架, 當然,這不是重點,就算沒有使用任何框架也能實現。
項目框架如下:
項目中使用到的jar包如下:
在Spring的配置文件中,加入
<bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver"> <property name="defaultEncoding" value="utf-8" /> <property name="maxUploadSize" value="10485760000" /> <property name="maxInMemorySize" value="40960" /> </bean>
使得項目支持文件上傳。
新建一個login.jsp 點擊登錄后進入user/login
user/login中處理登錄,登錄成功后,【在Hadoop文件系統中創建用戶文件夾】,然后跳轉到console.jsp
package com.chenjie.controller; import java.io.IOException; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import com.chenjie.pojo.JsonResult; import com.chenjie.pojo.User; import com.chenjie.service.UserService; import com.chenjie.util.AppConfig; import com.google.gson.Gson; /** * 用戶請求控制器 * * @author Chen * */ @Controller // 聲明當前類為控制器 @RequestMapping("/user") // 聲明當前類的路徑 public class UserController { @Resource(name = "userService") private UserService userService;// 由Spring容器注入一個UserService實例 /** * 登錄 * * @param user * 用戶 * @param request * @param response * @throws IOException */ @RequestMapping("/login") // 聲明當前方法的路徑 public String login(User user, HttpServletRequest request, HttpServletResponse response) throws IOException { response.setContentType("application/json");// 設置響應內容格式為json User result = userService.login(user);// 調用UserService的登錄方法 request.getSession().setAttribute("user", result); if (result != null) { createHadoopFSFolder(result); return "console"; } return "login"; } public void createHadoopFSFolder(User user) throws IOException { Configuration conf = new Configuration(); conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml")); conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); FileSystem fileSystem = FileSystem.get(conf); System.out.println(fileSystem.getUri()); Path file = new Path("/user/" + user.getU_username()); if (fileSystem.exists(file)) { System.out.println("haddop hdfs user foler exists."); fileSystem.delete(file, true); System.out.println("haddop hdfs user foler delete success."); } fileSystem.mkdirs(file); System.out.println("haddop hdfs user foler creat success."); } }
console.jsp中進行文件上傳和任務提交、
文件上傳和任務提交:
package com.chenjie.controller; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RunningJob; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartHttpServletRequest; import org.springframework.web.multipart.commons.CommonsMultipartResolver; import com.chenjie.pojo.User; import com.chenjie.util.Utils; @Controller // 聲明當前類為控制器 @RequestMapping("/hadoop") // 聲明當前類的路徑 public class HadoopController { @RequestMapping("/upload") // 聲明當前方法的路徑 //文件上傳 public String upload(HttpServletRequest request, HttpServletResponse response) throws IOException { List<String> fileList = (List<String>) request.getSession() .getAttribute("fileList");//得到用戶已上傳文件列表 if (fileList == null) fileList = new ArrayList<String>();//如果文件列表為空,則新建 User user = (User) request.getSession().getAttribute("user"); if (user == null) return "login";//如果用戶未登錄,則跳轉登錄頁面 CommonsMultipartResolver multipartResolver = new CommonsMultipartResolver( request.getSession().getServletContext());//得到在Spring配置文件中注入的文件上傳組件 if (multipartResolver.isMultipart(request)) {//如果請求是文件請求 MultipartHttpServletRequest multiRequest = (MultipartHttpServletRequest) request; Iterator<String> iter = multiRequest.getFileNames();//得到文件名迭代器 while (iter.hasNext()) { MultipartFile file = multiRequest.getFile((String) iter.next()); if (file != null) { String fileName = file.getOriginalFilename(); File folder = new File("/home/chenjie/CJHadoopOnline/" + user.getU_username()); if (!folder.exists()) { folder.mkdir();//如果文件不目錄存在,則在服務器本地創建 } String path = "/home/chenjie/CJHadoopOnline/" + user.getU_username() + "/" + fileName; File localFile = new File(path); file.transferTo(localFile);//將上傳文件拷貝到服務器本地目錄 // fileList.add(path); } handleUploadFiles(user, fileList);//處理上傳文件 } } request.getSession().setAttribute("fileList", fileList);//將上傳文件列表保存在Session中 return "console";//返回console.jsp繼續上傳文件 } @RequestMapping("/wordcount") //調用Hadoop進行mapreduce public void wordcount(HttpServletRequest request, HttpServletResponse response) { System.out.println("進入controller wordcount "); User user = (User) request.getSession().getAttribute("user"); System.out.println(user); // if(user == null) // return "login"; WordCount c = new WordCount();//新建單詞統計任務 String username = user.getU_username(); String input = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountinput";//指定Hadoop文件系統的輸入文件夾 String output = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountoutput";//指定Hadoop文件系統的輸出文件夾 String reslt = output + "/part-r-00000";//默認輸出文件 try { Thread.sleep(3*1000); c.main(new String[] { input, output });//調用單詞統計任務 Configuration conf = new Configuration();//新建Hadoop配置 conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加Hadoop配置,找到Hadoop部署信息 conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//Hadoop配置,找到文件系統 FileSystem fileSystem = FileSystem.get(conf);//得打文件系統 Path file = new Path(reslt);//找到輸出結果文件 FSDataInputStream inStream = fileSystem.open(file);//打開 URI uri = file.toUri();//得到輸出文件路徑 System.out.println(uri); String data = null; while ((data = inStream.readLine()) != null) { //System.out.println(data); response.getOutputStream().println(data);//講結果文件寫回用戶網頁 } // InputStream in = fileSystem.open(file); // OutputStream out = new FileOutputStream("result.txt"); // IOUtils.copyBytes(in, out, 4096, true); inStream.close(); } catch (Exception e) { System.err.println(e.getMessage()); } } @RequestMapping("/MapReduceStates") //得到MapReduce的狀態 public void mapreduce(HttpServletRequest request, HttpServletResponse response) { float[] progress=new float[2]; try { Configuration conf1=new Configuration(); conf1.set("mapred.job.tracker", Utils.JOBTRACKER); JobStatus jobStatus = Utils.getJobStatus(conf1); // while(!jobStatus.isJobComplete()){ // progress = Utils.getMapReduceProgess(jobStatus); // response.getOutputStream().println("map:" + progress[0] + "reduce:" + progress[1]); // Thread.sleep(1000); // } JobConf jc = new JobConf(conf1); JobClient jobClient = new JobClient(jc); JobStatus[] jobsStatus = jobClient.getAllJobs(); //這樣就得到了一個JobStatus數組,隨便取出一個元素取名叫jobStatus jobStatus = jobsStatus[0]; JobID jobID = jobStatus.getJobID(); //通過JobStatus獲取JobID RunningJob runningJob = jobClient.getJob(jobID); //通過JobID得到RunningJob對象 runningJob.getJobState();//可以獲取作業狀態,狀態有五種,為JobStatus.Failed 、JobStatus.KILLED、JobStatus.PREP、JobStatus.RUNNING、JobStatus.SUCCEEDED jobStatus.getUsername();//可以獲取運行作業的用戶名。 runningJob.getJobName();//可以獲取作業名。 jobStatus.getStartTime();//可以獲取作業的開始時間,為UTC毫秒數。 float map = runningJob.mapProgress();//可以獲取Map階段完成的比例,0~1, System.out.println("map=" + map); float reduce = runningJob.reduceProgress();//可以獲取Reduce階段完成的比例。 System.out.println("reduce="+reduce); runningJob.getFailureInfo();//可以獲取失敗信息。 runningJob.getCounters();//可以獲取作業相關的計數器,計數器的內容和作業監控頁面上看到的計數器的值一樣。 } catch (IOException e) { progress[0] = 0; progress[1] = 0; } request.getSession().setAttribute("map", progress[0]); request.getSession().setAttribute("reduce", progress[1]); } //處理文件上傳 public void handleUploadFiles(User user, List<String> fileList) { File folder = new File("/home/chenjie/CJHadoopOnline/" + user.getU_username()); if (!folder.exists()) return; if (folder.isDirectory()) { File[] files = folder.listFiles(); for (File file : files) { System.out.println(file.getName()); try { putFileToHadoopFSFolder(user, file, fileList);//將單個文件上傳到Hadoop文件系統 } catch (IOException e) { System.err.println(e.getMessage()); } } } } //將單個文件上傳到Hadoop文件系統 private void putFileToHadoopFSFolder(User user, File file, List<String> fileList) throws IOException { Configuration conf = new Configuration(); conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml")); conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); FileSystem fileSystem = FileSystem.get(conf); System.out.println(fileSystem.getUri()); Path localFile = new Path(file.getAbsolutePath()); Path foler = new Path("/user/" + user.getU_username() + "/wordcountinput"); if (!fileSystem.exists(foler)) { fileSystem.mkdirs(foler); } Path hadoopFile = new Path("/user/" + user.getU_username() + "/wordcountinput/" + file.getName()); // if (fileSystem.exists(hadoopFile)) { // System.out.println("File exists."); // } else { // fileSystem.mkdirs(hadoopFile); // } fileSystem.copyFromLocalFile(true, true, localFile, hadoopFile); fileList.add(hadoopFile.toUri().toString()); } }
啟動Hadoop:
運行結果:
可以在任意平臺下,登錄該項目地址,上傳文件,得到結果。
運行成功。
以上是“Java/Web如何調用Hadoop進行MapReduce”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。