您好,登錄后才能下訂單哦!
如何使用spark-core實現廣度優先搜索,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
數據源是一批網絡日志數據,每條數據都有兩個字段srcip和dstip,字段之間以逗號分隔,問題的需求是給定一個srcip和dstip,在給定的搜索深度下檢索這兩個ip之間所有的通聯路徑。這個問題是網絡日志處理中的一個實際需求,之前在單機的程序中實現過,但是需要將所有的ip對加載到內存中。考慮到如果數據量太大的情況,可能單節點的內存無法支撐這樣的操作,但是如果不將ip對全加載內存中,使用深度優先遍歷的方法,搜索過程又會很慢。最近在學習spark框架,剛接觸RDD,就是這用RDD來解決這個問題。以下是scala代碼
package com.pxu.spark.core import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * pxu * 2021-01-29 16:57 */ object FindIpRel { def main(args: Array[String]): Unit = { val srcIp = args(0) // 源ip val dstIp = args(1) // 目標ip val depth = args(2).toInt //搜索深度 val resPath = args(3) //搜索結果的輸出位置 val conf = new SparkConf().setAppName("findIpRel") val sc = new SparkContext(conf) /** * 從數據源中構建原始rdd,每一行的數據形式為a,b */ val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv") /** * 對原始Rdd進行元組形式轉化,現在每一行的數據形式為(a,b) * 除此之外還對數據進行了去重處理,并顯示使用hash分區器對RDD中的數據進行分區 * 為后面的join操作,做一些優化 */ val base = ori.map(a => { val tmpArr = a.split(",") (tmpArr(0), tmpArr(1)) }).distinct().partitionBy(new HashPartitioner(10)) /** * 這是一個用于保存結果的RDD,其中每一行的形式為(dstIp,List(ip on path)) * 在查找過程中,發現了搜索結果后,就會將其并入到res中 */ var res = sc.makeRDD[(String,List[String])](List()) /** * 這是一個用于迭代的RDD,其初始化的內容是,首先從baseRdd中過濾出元組第一個元素a是參數SrcIp的, * 然后將其轉化成(b,List(a))的格式,其中b總是代表當前搜索路徑上的尾ip,list中的其他內容代表搜索 * 路徑上其他的ip */ var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1))) for(i <- 2 to depth){ /** * 1.首先iteration和base按照key進行join,這個操作的意義就是更深一層的搜索,結果RDD的格式是(b,(List(ip on path),c)) * 2.對數據進行一次過濾,過去掉那些路徑已經形成環的元素,成環的判據就是List(ip on path)中的數據已經包含c了 * 3.進行map操作,b并入到List(ip on path),將c作為新的key,因此此時更深一層的搜索,導致c成為了當前搜索路徑中的尾節點, * 此時RDD中的每一個元素的格式應該是(c,(List(ip on path)) */ val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1)) /** * 將tmp中已經成功搜索的路徑篩選出來,成功搜索的判據是(c,(List(ip on path)),c與dstIp相等 */ val success = tmp.filter(a => a._1.equals(dstIp)) /** * 將成功搜索的數據合并到res中 */ res = res.union(success) /** * 更新iteration */ iteration = tmp.subtract(success) } /** * 將成功搜索的路徑并入到res中 */ res.union(iteration.filter(a => a._1.equals(dstIp))) /** * 執行一次轉換操作,將res中的元素從(c,(List(ip on path))格式轉換成List(all ip on path) */ val finalResult = res.map(a => a._2 :+ a._1) finalResult.saveAsTextFile(resPath) } }
關于如何使用spark-core實現廣度優先搜索問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。