您好,登錄后才能下訂單哦!
本篇內容主要講解“如何構建MapReduce程序的基礎模板”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何構建MapReduce程序的基礎模板”吧!
什么是開發數據集?
一個流行的開發策略是為生產環境中的大數據集建立一個較小的、抽樣的數據子集,稱為開發數據集。這個開發數據集可能只有幾百兆字節。當你以單機或者偽分布式模式編寫程序來處理它們時,你會發現開發周期很短,在自己的機器上運行程序也很方便,而且還可以在獨立的環境中進行調試。
為什么選擇專利引用數據做測試?
1、因為它們與你將來會遇到的大多數數據類型相似
2、專利引用數據所構成的關系圖與網頁鏈接以及社會網絡圖可謂大同小異
3、專利發布以時間為序,有些特性類似于時間序列
4、每個專利關聯到一個人 (發明人) 和一個位置 (發明人的國家),你可以將之視為個人信息或地理數據
5、你可以將這些數據視為具有明確模式的普通數據庫關系,而格式上簡單地以逗號分開
數據集采用標準
數據集采用標準的逗號分隔取值 (comma-separated values, CSV) 格式。
構建MapReduce程序的基礎模板
大多數MapReduce程序的編寫都可以簡單地依賴于一個模板及其變種,當撰寫一個新得MapReduce程序時,我們通常會采用一個現有的MapReduce程序,并將其修改成我們所希望的樣子。
典型的Hadoop程序的模板
public class MyJob extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
public void map (Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(value, key);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String csv = "";
while (values.hasNext()) {
if (csv.length() > 0) csv += ",";
csv += values.next().toString();
}
output.collect(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
1、我們習慣用單個類來完整地定義每個MapReduce作業,這里成為MyJob類
2、Hadoop要求Mapper和Reducer必須是它們自身的靜態類,這些類非常小,模板將它們包含在MyJob類中作為內部類,這樣做的好處是可以把所有的東西放在一個文件內,簡化代碼管理
3、但是需要記住這些內部類是獨立的,通常不與MyJob類進行交互
4、在作業執行期間,采用不同JVM的各類節點復制并運行Mapper和Reducer,而其他的作業類僅在客戶機上執行
解釋下run()方法
1、框架的核心在run()方法中,也稱為driver
2、它實例化、配置并傳遞一個JobConf對象命名的作業給JobClient.runJob()以啟動MapReduce作業(反過來,JobClient類與JobTracker通信讓該作業在集群上啟動)
3、JobConf對象將保持作業運行所需的全部配置參數
4、Driver需要在作業中為每個作業定制基本參數,包括輸入路徑、輸出路徑、Mapper類和Reducer類
5、每個作業可以重置默認的作業屬性,例如,InputFormat、OutputFormat等,也可以調用JobConf對象中的set()方法填充任意的配置參數
6、一旦傳遞JobConf對象到JobClient.runJob(),他就被視為決定這個作業如何運行的藍本
關于driver的配置的一些說明
1、JobConf對象有許多參數,但我們并不希望全部的參數都通過編寫driver來設置,可以把Hadoop安裝時的配置文件作為一個很好的起點
2、用戶可能希望在命令行啟動一個作業時傳遞額外的參數來改變作業配置
3、Driver可以通過自定義一組命令并自行處理用戶參數,來支持用戶修改其中的一些配置
4、因為經常需要做這樣的任務,Hadoop框架便提供了ToolRunner、Tool和Configured來簡化其實現。
5、當它們在上面的MyJob框架中被同時使用時,這些類使得作業可以理解用戶提供的被GenericOptionParser支持的選項
比如下面的命令:
bin/hadoop jar playgroup/MyJob.jar MyJob input/cite75-99.txt output
如果我們運行作業僅僅是想看到mapper的輸出 (處于調試的目的), 可以用選項 -D mapred.reduce.tasks=0將reducer的數目設置為0
bin/hadoop jar playgroup/MyJob.jar MyJob -D mapred.reduce.tasks=0 input/cite75-99.txt output
通過使用ToolRunner、MyJob可以自動支持一下選項
GenericOptionsParser支持的選項
選項 | 描述 |
-conf <configuration file> | 指定一個配置文件 |
-D <property=value> | 給JobConf屬性賦值 |
-fs <local | namenode:port> | 指定一個NameNode,可以是 "local" |
-jt <local | jobtracker:port> | 指定一個JobTracker |
-files <list of files> | 指定一個以逗號分隔的文件列表,用于MapReduce作業。這些文件自動地分布到所有節點,使之可從本地獲取 |
-libjars <list of jars> | 指定一個以逗號分隔的jar文件,使之包含在所有任務JVM的classpath中 |
-archives <list of archives> | 指定一個以逗號分隔的存檔文件列表,使之可以在所有任務節點上打開 |
模板代碼Mappper與Reducer
模板中習慣將Mapper類稱為MapClass,而將Reducer類稱為Reduce
Mapper和Reducer都是MapReduceBase的擴展
MapReduceBase是個小類,包含configure()和close(),我們使用上兩個方法來建立和清除map(reduce)任務,除非是更高級的作業,通常我們并不需要覆蓋它們
Mapper類和Reducer類模板說明
public static class MapClass extends MapReduceBase
implements Mapper<K1, V1, K2, V2> {
public void map (K1 key, V1 value,
OutputCollector<K2, V2> output,
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer<K1, V2, K3, V3> {
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter) throws IOException { }
}
Mapper類的核心操作為map()方法,Reduce類為reduce()方法。每一個map()方法的調用分別被賦予一個類型為K1和V1的鍵/值對。這個鍵/值對由mapper生成,并通過OutputCollector對象的collect()方法來輸出。你需要在map()方法中的合適位置調用:
output.collect((K2) k, (V2) v);
在Reducer中reduce()方法的每次調用均被賦予K2類型的鍵,以及V2類型的一組值。注意它必須與Mapper中使用的K2和V2類型相同。Reduce()方法可能會循環遍歷V2類型的所有值。
while (values.hasNext()) {
V2 v = values.next();
}
Reduce()方法還使用OutputCollector來搜集其鍵/值的輸出,它們的類型為K3/V3。在reudce()方法中可以調用
output.collect((K3) k, (V3) v);
除了在Mapper和Reducer之間保持K2與V3的類型一致,還需要確保在Mapper和Reducer中使用的鍵值類型與在driver中設置的輸入格式、輸出鍵的類,以及輸出值的類保持一致
使用KeyValueTextInputFormat意味著K1和V1必須均為Text類型
Driver則必須調用setOutputKeyClass()和setOutputValueClass()分別指定K2和V2的類
最終:
1、所有的鍵與值的類型必須是Writable的子類型,來確保Hadoop的序列化接口可以把數據在分布式集群上發送
2、鍵的類型實現了WritableComparable,它是Writable的子接口,鍵的類型還需額外支持compareTo()方法,因為在MapReduce框架中鍵會被用來進行排序
到此,相信大家對“如何構建MapReduce程序的基礎模板”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。