中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用spark-core實現廣度優先搜索

發布時間:2021-12-17 10:13:11 來源:億速云 閱讀:158 作者:柒染 欄目:大數據

如何使用spark-core實現廣度優先搜索,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

需求描述

數據源是一批網絡日志數據,每條數據都有兩個字段srcip和dstip,字段之間以逗號分隔,問題的需求是給定一個srcip和dstip,在給定的搜索深度下檢索這兩個ip之間所有的通聯路徑。這個問題是網絡日志處理中的一個實際需求,之前在單機的程序中實現過,但是需要將所有的ip對加載到內存中。考慮到如果數據量太大的情況,可能單節點的內存無法支撐這樣的操作,但是如果不將ip對全加載內存中,使用深度優先遍歷的方法,搜索過程又會很慢。最近在學習spark框架,剛接觸RDD,就是這用RDD來解決這個問題。以下是scala代碼

package com.pxu.spark.core

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 *  pxu
 *  2021-01-29 16:57
 */
object FindIpRel {


  def main(args: Array[String]): Unit = {

    val srcIp = args(0) // 源ip
    val dstIp = args(1) // 目標ip
    val depth = args(2).toInt //搜索深度
    val resPath = args(3) //搜索結果的輸出位置

    val conf = new SparkConf().setAppName("findIpRel")
    val sc = new SparkContext(conf)


    /**
     * 從數據源中構建原始rdd,每一行的數據形式為a,b
     */

    val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv")

    /**
     * 對原始Rdd進行元組形式轉化,現在每一行的數據形式為(a,b)
     * 除此之外還對數據進行了去重處理,并顯示使用hash分區器對RDD中的數據進行分區
     * 為后面的join操作,做一些優化
     */
    val base = ori.map(a => {
      val tmpArr = a.split(",")
      (tmpArr(0), tmpArr(1))
    }).distinct().partitionBy(new HashPartitioner(10))


    /**
     * 這是一個用于保存結果的RDD,其中每一行的形式為(dstIp,List(ip on path))
     * 在查找過程中,發現了搜索結果后,就會將其并入到res中
     */
    var res = sc.makeRDD[(String,List[String])](List())

    /**
     * 這是一個用于迭代的RDD,其初始化的內容是,首先從baseRdd中過濾出元組第一個元素a是參數SrcIp的,
     * 然后將其轉化成(b,List(a))的格式,其中b總是代表當前搜索路徑上的尾ip,list中的其他內容代表搜索
     * 路徑上其他的ip
     */
    var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1)))

    for(i <- 2 to depth){

      /**
       * 1.首先iteration和base按照key進行join,這個操作的意義就是更深一層的搜索,結果RDD的格式是(b,(List(ip on path),c))
       * 2.對數據進行一次過濾,過去掉那些路徑已經形成環的元素,成環的判據就是List(ip on path)中的數據已經包含c了
       * 3.進行map操作,b并入到List(ip on path),將c作為新的key,因此此時更深一層的搜索,導致c成為了當前搜索路徑中的尾節點,
       *   此時RDD中的每一個元素的格式應該是(c,(List(ip on path))
       */
      val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1))

      /**
       * 將tmp中已經成功搜索的路徑篩選出來,成功搜索的判據是(c,(List(ip on path)),c與dstIp相等
       */
      val success = tmp.filter(a => a._1.equals(dstIp))

      /**
       * 將成功搜索的數據合并到res中
       */
      res = res.union(success)
      
      /**
       * 更新iteration
       */
      iteration = tmp.subtract(success)

    }
    
    /**
     * 將成功搜索的路徑并入到res中
     */
    res.union(iteration.filter(a => a._1.equals(dstIp)))

    /**
     * 執行一次轉換操作,將res中的元素從(c,(List(ip on path))格式轉換成List(all ip on path)
     */
    val finalResult = res.map(a => a._2 :+ a._1)

    finalResult.saveAsTextFile(resPath)

  }

}

關于如何使用spark-core實現廣度優先搜索問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

铜鼓县| 视频| 荆门市| 大新县| 大同市| 社会| 榆树市| 龙里县| 获嘉县| 华阴市| 来凤县| 克拉玛依市| 松阳县| 和平县| 伊金霍洛旗| 芒康县| 文安县| 堆龙德庆县| 海林市| 临朐县| 得荣县| 新昌县| 慈溪市| 普兰县| 玛曲县| 卓尼县| 个旧市| 正安县| 德格县| 望江县| 和林格尔县| 巴彦淖尔市| 阿尔山市| 肇源县| 治多县| 错那县| 大同市| 什邡市| 德庆县| 鄂托克前旗| 米泉市|