您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何理解TopK算法及其實現,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
1、問題描述
在大數據規模中,經常遇到一類需要求出現頻率最高的K個數,這類問題稱為“TOPK”問題!例如:統計歌曲中最熱門的前10首歌曲,統計訪問流量最高的前5個網站等。
2、例如統計訪問流量最高的前5個網站:
數據test.data文件:
數據格式解釋:域名 上行流量 下行流量
思路:
1、Mapper每解析一行內容,按照"\t"獲取各個字段
2、因為URL有很多重復記錄,所以將URL放到key(通過分析MapReduce原理),流量放在value
3、在reduce統計總流量,通過TreeMap進行對數據進行緩存,最后一并輸出(值得注意的是要一次性輸出必須要用到Reduce類的cleanup方法)
程序如下:
Mapper類:
package com.itheima.hadoop.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Counter; import com.itheima.hadoop.mapreduce.bean.FlowBean; public class TopKURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> { /** * @param key * : 每一行偏移量 * @param value * : 每一行的內容 * @param context * : 環境上下文 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 該計數器是org.apache.hadoop.mapreduce.Counter */ Counter counter = context .getCounter("ExistProblem", "ExistProblemLine"); // 自定義存在問題的行錯誤計數器 String line = value.toString(); // 讀取一行數據 String[] fields = line.split("\t"); // 獲取各個字段,按照\t劃分 try { String url = fields[0]; // 獲取URL字段 long upFlow = Long.parseLong(fields[1]); // 獲取上行流量(upFlow)字段 long downFlow = Long.parseLong(fields[2]); // 獲取下行流量(downFlow)字段 FlowBean bean = new FlowBean(upFlow, downFlow); // 將上行流量和下行流量封裝到bean中 Text tUrl = new Text(url); // 將java數據類型轉換hadoop數據類型 context.write(tUrl, bean); // 傳遞的數據較多,封裝到bean進行傳輸(tips:bean傳輸時需要注意序列化問題) } catch (Exception e) { e.printStackTrace(); counter.increment(1); // 記錄錯誤行數 } } }
Reduce類:
package com.itheima.hadoop.mapreduce.reducer; import java.io.IOException; import java.util.Map.Entry; import java.util.TreeMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.itheima.hadoop.mapreduce.bean.FlowBean; public class TopKURLReducer extends Reducer<Text, FlowBean, FlowBean, Text> { private TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean, Text>(); /** * @param key * : 每一行相同URL * @param values * : 總流量bean */ @Override public void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long countUpFlow = 0; long countDownFlow = 0; /* * 1、取出每個bean的總流量 2、統計多個bean的總流量 3、緩存到treeMap中 */ for (FlowBean bean : values) { countUpFlow += bean.getUpFlow(); // 統計上行流量 countDownFlow += bean.getDownFlow(); // 統計下行總流量 } // 封裝統計的流量 FlowBean bean = new FlowBean(countUpFlow, countDownFlow); treeMap.put(bean, new Text(key)); // 緩存到treeMap中 } @Override public void cleanup(Context context) throws IOException, InterruptedException { //遍歷緩存 for (Entry<FlowBean,Text> entry : treeMap.entrySet()) { context.write(entry.getKey(), entry.getValue()); } super.cleanup(context); // 不能動原本的銷毀操作 } }
FlowBean類:
package com.itheima.hadoop.mapreduce.bean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable, Comparable<FlowBean> { private long upFlow; private long downFlow; private long maxFlow; @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + maxFlow; } /** * 1、序列化注意的問題,序列化需要默認的構造方法(反射) 2、在readFields()和write()方法中,應該遵循按照順序寫出和讀入 */ public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.maxFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getMaxFlow() { return maxFlow; } public void setMaxFlow(long maxFlow) { this.maxFlow = maxFlow; } @Override public void readFields(DataInput dataIn) throws IOException { upFlow = dataIn.readLong(); downFlow = dataIn.readLong(); maxFlow = dataIn.readLong(); } @Override public void write(DataOutput dataOut) throws IOException { dataOut.writeLong(upFlow); dataOut.writeLong(downFlow); dataOut.writeLong(maxFlow); } @Override public int compareTo(FlowBean o) { return this.maxFlow > o.maxFlow ? -1 : this.maxFlow < o.maxFlow ? 1 : 0; } }
驅動類:
package com.itheima.hadoop.drivers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import com.itheima.hadoop.mapreduce.bean.FlowBean; import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper; import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer; public class TopKURLDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { /** * 1、創建job作業 * 2、設置job提交的Class * 3、設置MapperClass,設置ReduceClass * 4、設置Mapper和Reduce各自的OutputKey和OutputValue類型 * 5、設置處理文件的路徑,輸出結果的路徑 * 6、提交job */ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopKURLRunner.class); job.setMapperClass(TopKURLMapper.class); job.setReducerClass(TopKURLReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //參數true為打印進度 return job.waitForCompletion(true)?0:1; } }
package com.itheima.hadoop.runner; import org.apache.hadoop.util.ToolRunner; import com.itheima.hadoop.runner.TopKURLRunner; public class TopKURLRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TopKURLRunner(), args); System.exit(res); } }
運行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
運行結果:
看完上述內容,你們對如何理解TopK算法及其實現有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。