您好,登錄后才能下訂單哦!
這篇文章給大家介紹Nutch如何解析Html文檔,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
解析Html文檔 MapReduce任務描述
一、主程序調用
ParseSegment parseSegment = new ParseSegment(getConf());
if (!Fetcher.isParsing(job)) {
parseSegment.parse(segs[0]); // parse it, if needed
}
(1)、isParseing方法的實現
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
}
(2)、參數segs[0]
Path[] segs = generator.generate(
crawlDb,
segments,
-1,
topN,
System.currentTimeMillis());
generate方法中 generatedSegments的生成過程。
這里疑點比較多,先放一放
// read the subdirectories generated in the temp 讀取temp中生成的子文件夾
// output and turn them into segments 輸出并且把他們轉到segments中
//1、創建Path的集合對象
List<Path> generatedSegments = new ArrayList<Path>();
//2、
FileStatus[] status = fs.listStatus(tempDir);// 這里讀取上面生成的多個fetchlist的segment
try {
for (FileStatus stat : status) {
Path subfetchlist = stat.getPath();
if (!subfetchlist.getName().startsWith("fetchlist-"))
continue;// 過濾不是以fetchlist-開頭的文件
// start a new partition job for this segment
Path newSeg = partitionSegment(fs, segments, subfetchlist,
numLists);
// 對segment進行Partition操作,產生一個新的目錄
generatedSegments.add(newSeg);
}
} catch (Exception e) {
LOG.warn("Generator: exception while partitioning segments, exiting ...");
fs.delete(tempDir, true);
return null;
}
二、job任務配置
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ParseSegment.class);
job.setReducerClass(ParseSegment.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(ParseOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ParseImpl.class);
JobClient.runJob(job);
三、Map、reduce任務的輸入和輸出
map任務輸入和輸出
輸入: WritableComparable/ Content
輸出: Text/ ParseImpl
public void map(WritableComparable<?> key, Content content,
OutputCollector<Text, ParseImpl> output, Reporter reporter)
reduce任務輸入和輸出
輸入: Text/Iterator<Writable>
輸出: Text/Writable
public void reduce(Text key, Iterator<Writable> values,
OutputCollector<Text, Writable> output, Reporter reporter)
四、job任務輸入類SequenceFileInputFormat
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDir()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
public RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
}
五、map()方法和reduce()方法中的實現
(1)、map任務
org.apache.nutch.parse.ParseSegment
public void map(WritableComparable<?> key, Content content,
OutputCollector<Text, ParseImpl> output, Reporter reporter)
throws IOException {
// convert on the fly from old UTF8 keys
if (key instanceof Text) {
newKey.set(key.toString());
key = newKey;
}
//2、 獲取抓取狀態,
//Nutch.FETCH_STATUS_KEY)——> _fst_
int status =
Integer.parseInt(content.getMetadata().get(Nutch.FETCH_STATUS_KEY));
//3、如果成功抓取,如果沒有抓取成功,就跳過這條記錄
if (status != CrawlDatum.STATUS_FETCH_SUCCESS) {
// content not fetched successfully, skip document
LOG.debug("Skipping " + key + " as content is not fetched successfully");
return;
}
//4、判斷是否試過截斷,文檔中是否被截斷,如果要跳過截斷,且文檔是被截斷了,也跳過這條記錄
if (skipTruncated && isTruncated(content)) {
return;
}
ParseResult parseResult = null;
try {
//5、創建一個ParseUtil對象,調用解析方法parse,并返回一個解析結果ParseResult
parseResult = new ParseUtil(getConf()).parse(content);
} catch (Exception e) {
LOG.warn("Error parsing: " + key + ": " +StringUtils.stringifyException(e));
return;
}
//以上主要是解析,一下是對解析的處理
//————————————————————————————————————————————————————————————————————————————————
//6、遍歷上一步解析得到的解析結果,
for (Entry<Text, Parse> entry : parseResult) {
//7、獲取鍵和值
Text url = entry.getKey();
Parse parse = entry.getValue();
//8、獲取狀態
ParseStatus parseStatus = parse.getData().getStatus();
long start = System.currentTimeMillis();
//9、計數器+1
reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[parseStatus.getMajorCode()], 1);
//10、如果解析不成功,將parse持有的所有對象都置為null。
if (!parseStatus.isSuccess()) {
LOG.warn("Error parsing: " + key + ": " + parseStatus);
parse = parseStatus.getEmptyParse(getConf());
}
// pass segment name to parse data
//11、將segment名稱賦值給parse data
parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
getConf().get(Nutch.SEGMENT_NAME_KEY));
// compute the new signature
//12、計算新的分值
byte[] signature =
SignatureFactory.getSignature(getConf()).calculate(content, parse);
//13、設置digest的值
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
try {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error passing score: "+ url +": "+e.getMessage());
}
}
long end = System.currentTimeMillis();
LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url);
output.collect(url, new ParseImpl(new ParseText(parse.getText()),
parse.getData(), parse.isCanonical()));
}
}
ParseResult對象的特點
a、實現了Iterable接口 ,可迭代;迭代對象是Entry,entry的key是Text,值是Parse,如下
public class ParseResult implements Iterable<Map.Entry<Text, Parse>>
迭代方法如下,
public Iterator<Entry<Text, Parse>> iterator() {
return parseMap.entrySet().iterator();
}
可以看到是使用了Map結合的迭代器
b、持有一個HashMap用于存放解析結果,另持有當前的url,代碼如下
private Map<Text, Parse> parseMap;
private String originalUrl;
Parse和ParseImpl的說明
Parse是一個接口,其中的3個方法如下
/** The textual(正文) content of the page. This is indexed, searched, and used when generating snippets.*/
//網頁正文內容,將被索引,搜索,生成快照
String getText();
/** Other data extracted from the page. */
//沖網頁中提取的其他數據
ParseData getData();
/** Indicates if the parse is coming from a url or a sub-url */
//標識這個Parse是否來自于url或者一個子url
boolean isCanonical();//Canonical:正規的
ParseImpl實現了Parse和Writable接口
ParseImpl中有3個字段,如下所示,其中isCanonical是在構造的時候傳入的,默認是true
private ParseText text;
private ParseData data;
private boolean isCanonical;//規則
用于去重的digest的計算
//12、計算新的分值
byte[] signature =
SignatureFactory.getSignature(getConf()).calculate(content, parse);
//13、設置digest的值
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
Signature:n.簽名; 署名; 識別標志,鮮明特征; [醫]藥的用法說明;
SignatureFactory類中的getSignature方法
該方法看ObjectCache中有沒有Signature的實現,如果沒有就利用反射創建一個并返回。
/** Return the default Signature implementation. */
public static Signature getSignature(Configuration conf) {
String clazz = conf.get("db.signature.class", MD5Signature.class.getName());
ObjectCache objectCache = ObjectCache.get(conf);
Signature impl = (Signature)objectCache.getObject(clazz);
if (impl == null) {
try {
if (LOG.isInfoEnabled()) {
LOG.info("Using Signature impl: " + clazz);
}
Class<?> implClass = Class.forName(clazz);
impl = (Signature)implClass.newInstance();
impl.setConf(conf);
objectCache.setObject(clazz, impl);
} catch (Exception e) {
throw new RuntimeException("Couldn't create " + clazz, e);
}
}
return impl;
}
***** 重要
計算網頁特征,最后是調用了Signature的calculate方法,以下是Signature實現類MD5Signaure的類代碼
/**
* Default implementation of a page signature.
默認的Signature的實現類
* It calculates an MD5 hash of the raw binary content of a page.
它計算page內容的原始的二進制代碼的md5 hash值
* In case there is no content,
* it calculates a hash from the page's URL.
*
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class MD5Signature extends Signature {
public byte[] calculate(Content content, Parse parse) {
byte[] data = content.getContent();
if (data == null) data = content.getUrl().getBytes();
return MD5Hash.digest(data).getDigest();
}
}
(2)、reduce任務
未完待續
多線程解析
//解析是多線程地
private ParseResult runParser(Parser p, Content content) {
ParseCallable pc = new ParseCallable(p, content);
Future<ParseResult> task = executorService.submit(pc);
ParseResult res = null;
try {
res = task.get(maxParseTime, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error parsing " + content.getUrl() + " with " + p, e);
task.cancel(true);
} finally {
pc = null;
}
return res;
}
關于Nutch如何解析Html文檔就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。