您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關Nutch如何實現HDFS文件輸出的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
以1.7為例,之前Nutch的輸出可以自定義其它存儲系統中,具體原理不贅述。
項目有個需求,就是文件仍然保存在HDFS中,而不是索引到其它存儲系統中。
也就是說,不用寫
public class XXX implements IndexWriter
這樣的插件了,
那么,問題來了,怎么修改Nutch的源碼,使得結果順利存在HDFS中呢?
----------那就讓我們從源頭一步一步來修改,碰到問題就解決問題。
首先Crawl.java中,原先在索引階段有這么一些代碼。
if (i > 0) { linkDbTool.invert(linkDb, segments, true, true, false); // invert links if (solrUrl != null) { // index, dedup & merge FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs)); IndexingJob indexer = new IndexingJob(getConf()); indexer.index(crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats))); SolrDeleteDuplicates dedup = new SolrDeleteDuplicates(); dedup.setConf(getConf()); dedup.dedup(solrUrl); }
最關鍵的是
IndexingJob indexer = new IndexingJob(getConf()); indexer.index(crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats)));
也就是說,這里是索引的入口處。
這里把這些代碼屏蔽掉,我個人的方法是 if (solrUrl != null) {------》if (false) {
這樣還能保持原先的代碼存在,這樣如果后面的代碼有問題還可以恢復此代碼。
---------------接下來呢?添加我們自己的索引任務代碼如下:
if (true) { // add my index job // index, dedup & merge FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs)); IndexingJob indexer = new IndexingJob(getConf()); indexer.index(crawlDb, linkDb,Arrays.asList(HadoopFSUtil.getPaths(fstats)), true,false, null); }
這樣,就完成了索引任務外圍的改造,這里只是改了個外觀,還沒傷筋動骨。
下面我們開始對內部進行改造!
-------------------------------------------------------------------------------
首先,我們得找到MR的方法吧,入口在哪呢?
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
這句話,跟進去,就能看到具體的MR類,如下:
job.setMapperClass(IndexerMapReduce.class); job.setReducerClass(IndexerMapReduce.class);
也就是說,MR類都是IndexerMapReduce.class.
那么我們就開始分析這個類的map和reduce函數。
備注: 我的URL文件的格式是 url \t sender=xxx \t receiver=xxx \t oldname=xxx \t newname=xxx \n
---------------
改動的幾個地方如下:
1 對于reduce的函數聲明
從
public void reduce(Text key, Iterator<NutchWritable> values, OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
修改為
public void reduce(Text key, Iterator<NutchWritable> values, OutputCollector<Text, Text> output, Reporter reporter)
這會導致出現3個錯誤,把這3個地方屏蔽掉即可。
2 看reduce的最后兩行
NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD); output.collect(key, action);
這里需要做一個改動如下:
// NutchIndexAction action = new NutchIndexAction(doc, // NutchIndexAction.ADD); // output.collect(key, action); Object senderObject = doc.getFieldValue("sender"); Object receiverObject = doc.getFieldValue("receiver"); Object singerObject = doc.getFieldValue("singer"); if (null != senderObject && null != receiverObject && null != singerObject) { String sender = senderObject.toString(); String receiver = receiverObject.toString(); String singer = singerObject.toString(); // output it output.collect(new Text(sender), new Text(singer)); output.collect(new Text(receiver), new Text(singer)); }
如果此時進行ant編譯,自然會報錯,如下:
[javac] /usr/local/music_Name_to_Singer/nutch-1.7/src/java/org/apache/nutch/indexer/IndexerMapReduce.java:53: error: IndexerMapReduce is not abstract and does not override abstract method reduce(Text,Iterator<NutchWritable>,OutputCollector<Text,NutchIndexAction>,Reporter) in Reducer [javac] public class IndexerMapReduce extends Configured implements [javac] ^ [javac] 1 error [javac] 1 warning
那是因為我們需要修改一個地方:
IndexerMapReduce.java中的
原先的代碼:
job.setOutputFormat(IndexerOutputFormat.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NutchWritable.class); job.setOutputValueClass(NutchWritable.class);
現在要修改為:
job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NutchWritable.class); job.setOutputValueClass(Text.class);
以及
public class IndexerMapReduce extends Configured implements
Mapper<Text, Writable, Text, NutchWritable>,
Reducer<Text, NutchWritable, Text, NutchIndexAction> {
修改為
public class IndexerMapReduce extends Configured implements
Mapper<Text, Writable, Text, NutchWritable>,
Reducer<Text, NutchWritable, Text, Text> {
然后ant
就可以看到
BUILD SUCCESSFUL
Total time: 15 seconds
表明編譯成功!
別急著運行,還有一個地方需要修改!
---------------------- 在InexingJob中有如下一些代碼:
final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-" + new Random().nextInt()); FileOutputFormat.setOutputPath(job, tmp); try { JobClient.runJob(job); // do the commits once and for all the reducers in one go if (!noCommit) { writers.open(job,"commit"); writers.commit(); } long end = System.currentTimeMillis(); LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); } finally { FileSystem.get(job).delete(tmp, true); }
表明,Nutch2.7默認是把輸出導向到其它輸出的,而不是本地HDFS.
所以FileSystem.get(job).delete(tmp, true);是用來刪除此文件的,此時我們需要修改這個地方來保留文件。
不然咱辛辛苦苦寫的文件,全被一句話任性的刪掉了。
------------------------代碼如下:
注意:我這里的需求是輸出為當天的目錄。所以代碼為:
//final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-" // + new Random().nextInt()); Calendar cal = Calendar.getInstance(); int year = cal.get(Calendar.YEAR); int month = cal.get(Calendar.MONTH) + 1; int day = cal.get(Calendar.DAY_OF_MONTH); final Path tmp = new Path(getConf().get("pathPrefix"),"year="+year+"/month="+month+"/day="+day); FileOutputFormat.setOutputPath(job, tmp); try { JobClient.runJob(job); // do the commits once and for all the reducers in one go if (!noCommit) { writers.open(job,"commit"); writers.commit(); } long end = System.currentTimeMillis(); LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); } finally { //FileSystem.get(job).delete(tmp, true); }
此時編譯是可以通過的。
好,暫時就是這樣,效果圖:
感謝各位的閱讀!關于“Nutch如何實現HDFS文件輸出”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。