您好,登錄后才能下訂單哦!
setup 函數原碼:(摘自《hadoop實戰》)
*Called once at the start of the task.
protected void setup(Context context) throws IOException,InterruptedException{}
從注釋可得知,setup函數在Task啟動時就調用。
在MapReduce中作業會被組織成MapTask和ReduceTask。
每個Task都以Map類或Reduce類為處理方法主體,
輸入分片為處理方法的輸入,自己的分片處理完后Task就銷毀了。
從這里看出,setup函數在task啟動后數據處理前就調用一次
而覆蓋的Map函數和Reduce函數會針對輸入分片的每個Key調用一次,
所以setup函數可以看作Task上一個全局處理。
利用setup函數的特性,可以將Map或Reduce函數中的的重復處理放到setup函數中。
如老師給的Exercise_2中的"name"
但需要注意的是,調用setup函數只是對應的Task上全局操作,而不是整個作業的全局操作。
可以先用api把本地的文件傳到hdfs中的 /user/hadoop/test 里去
//本地文件上傳到HDFS上
public static void upload(String src,String dst) throws FileNotFoundException,IOException{
InputStream in = new BufferedInputStream(new FileInputStream(src));
//得到配置對象
Configuration conf = new Configuration();
//文件系統
FileSystem fs = FileSystem.get(URI.create(dst), conf);
//輸出流
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.println("上傳完一個設定緩存區大小容量的文件!");
}
});
//連接兩個流,形成通道,使輸入流向輸出流傳輸數據
IOUtils.copyBytes(in, out, 4096,true);
}
上傳的時候調用這個函數就可以了
例如
upload("/home/jack/test/test.txt","/user/hadoop/test/test");
前面的是本地目錄中的文件,后面是hdfs中的文件
注意 必須兩者都必須是“路徑+文件名” 不能沒有文件名
Configuration conf = new Configuration();
conf.setStrings("job_parms", "aaabbc"); //關鍵就是這一句
Job job = new Job(conf, "load analysis");
job.setJarByClass(LoadAnalysis.class);
job.setMapperClass(LoadMapper.class);
job.setReducerClass(LoadIntoHbaseReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
//從全局配置獲取配置參數
Configuration conf = context.getConfiguration();
String parmStr = conf.get("job_parms"); //這樣就拿到了
......
} catch (SQLException e) {
e.printStackTrace();
}
}
全局文件:hadoop有distributed cache來保存全局文件,保證所有node都可以訪問,使用類名為DistributedCache
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。