中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Graphx如何實現圖中極大團挖掘

發布時間:2021-12-17 11:03:05 來源:億速云 閱讀:147 作者:柒染 欄目:大數據

今天就跟大家聊聊有關Spark Graphx如何實現圖中極大團挖掘,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

spark graphx并未提供極大團挖掘算法

當下的極大團算法都是串行化的算法,基于Bron–Kerbosch算法

####思路:####

spark graphx提供了連通圖的算法,連通圖和極大團都是無向圖中的概念,極大團為連通圖的子集

利用spark graphx 找出連通圖,在從各個連通圖中,利用串行化的極大團算法,找出極大團 (偽并行化)

對于關聯性較強的圖,找出來的連通圖非常大,這時串行化的極大團算法,仍然會耗時很久,這里利用剪枝的思想減少樣本數據量,但是對于大圖,優化空間有限

期待真正的并行化的極大團算法

####配置文件:####

graph_data_path=hdfs://localhost/graph_data out_path=hdfs://localhost/clique ck_path=hdfs://localhost/checkpoint numIter=50      剪枝次數 count=3         極大團頂點數大小 algorithm=2     極大團算法,1:個人實現  2:jgrapht percent=90      剪枝后的頂點數,占前一次的百分比,如果剪完后,還剩下90%的數據,那么剪枝效率已然不高 spark.master=local spark.app.name=graph spark.serializer=org.apache.spark.serializer.KryoSerializer spark.yarn.executor.memoryOverhead=20480 spark.yarn.driver.memoryOverhead=20480 spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.driver.maxResultSize=10g spark.default.parallelism=60

####樣本數據:####

{"src":"0","dst":"1"} {"src":"0","dst":"2"}  {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"}  {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"}  {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"}  {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"}  {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"}  {"src":"7","dst":"6"}

####樣本圖:####

Spark Graphx如何實現圖中極大團挖掘

####輸出:####

0,1,2 0,2,3 3,4,5 4,5,6

####代碼實現:####

import java.util import java.util.Properties
import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} import org.jgrapht.alg.BronKerboschCliqueFinder import org.jgrapht.graph.{DefaultEdge, SimpleGraph}  import scala.collection.JavaConverters._ import scala.collection.mutable  object ApplicationTitan {     def main(args: Array[String]) {         val prop = new Properties()         prop.load(getClass.getResourceAsStream("/config.properties"))              val graph_data_path = prop.getProperty("graph_data_path")         val out_path = prop.getProperty("out_path")         val ck_path = prop.getProperty("ck_path")         val count = Integer.parseInt(prop.getProperty("count"))         val numIter = Integer.parseInt(prop.getProperty("numIter"))         val algorithm = Integer.parseInt(prop.getProperty("algorithm"))         val percent = Integer.parseInt(prop.getProperty("percent"))         val conf = new SparkConf()         try {           Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path) //            Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path)         } catch {           case ex: Exception =>             ex.printStackTrace(System.out)         }              prop.stringPropertyNames().asScala.foreach(s => {           if (s.startsWith("spark")) {             conf.set(s, prop.getProperty(s))           }         })         conf.registerKryoClasses(Array(getClass))         val sc = new SparkContext(conf)         sc.setLogLevel("ERROR")         sc.setCheckpointDir(ck_path)         val sqlc = new SQLContext(sc)         try {           val e_df = sqlc.read //                        .json(graph_data_path)         .parquet(graph_data_path)            var e_rdd = e_df             .mapPartitions(it => {               it.map({                 case Row(dst: String, src: String) =>                   val src_long = src.toLong                   val dst_long = dst.toLong                   if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)               })             }).distinct()           e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)                var bc: Broadcast[Set[Long]] = null           var iter = 0           var bc_size = 0          //剪枝           while (iter <= numIter) {             val temp = e_rdd               .flatMap(x => List((x._1, 1), (x._2, 1)))               .reduceByKey((x, y) => x + y)               .filter(x => x._2 >= count - 1)               .mapPartitions(it => it.map(x => x._1))             val bc_value = temp.collect().toSet             bc = sc.broadcast(bc_value)             e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2))             e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)             iter += 1             if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {               println("total iter : "+ iter)               iter = Int.MaxValue             }             bc_size = bc_value.size           }                // 構造圖           val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))           val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)                //連通圖           val cc = graph.connectedComponents().vertices           cc.persist(StorageLevel.MEMORY_AND_DISK_SER)                cc.join(e_rdd)             .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))             .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)             .mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))             .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4)             .filter(x => x._2.size >= count - 1)             .flatMap(x => {               if (algorithm == 1)                 find(x, count)               else                 find2(x, count)             })             .mapPartitions(it => {               it.map({                 case set =>                   var temp = ""                   set.asScala.foreach(x => temp += x + ",")                   temp.substring(0, temp.length - 1)                 case _ =>               })             })     //                .coalesce(1)     .saveAsTextFile(out_path) }      catch {   case ex: Exception =>     ex.printStackTrace(System.out)     }     sc.stop() } //自己實現的極大團算法  def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = {     println(x._1 + "|s|" + x._2.size)     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     val neighbors = new util.HashMap[String, util.Set[String]]     val finder = new CliqueFinder(neighbors, count)     x._2.foreach(r => {       val v1 = r._1.toString       val v2 = r._2.toString       if (neighbors.containsKey(v1)) {         neighbors.get(v1).add(v2)       } else {         val temp = new util.HashSet[String]()         temp.add(v2)         neighbors.put(v1, temp)       }       if (neighbors.containsKey(v2)) {         neighbors.get(v2).add(v1)       } else {         val temp = new util.HashSet[String]()         temp.add(v1)         neighbors.put(v2, temp)       }     })     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     finder.findMaxCliques().asScala } //jgrapht 中的極大團算法  def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = {     println(x._1 + "|s|" + x._2.size)     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])     x._2.foreach(r => {       val v1 = r._1.toString       val v2 = r._2.toString       to_clique.addVertex(v1)       to_clique.addVertex(v2)       to_clique.addEdge(v1, v2)     })     val finder = new BronKerboschCliqueFinder(to_clique)     val list = finder.getAllMaximalCliques.asScala     var result = Set[util.Set[String]]()     list.foreach(x => {       if (x.size() >= count)         result = result + x     })     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     result } }

//自己實現的極大團算法

import java.util.*;  /**  * [@author](https://my.oschina.net/arthor) mopspecial@gmail.com  * [@date](https://my.oschina.net/u/2504391) 2017/7/31  */ public class CliqueFinder {     private Map<String, Set<String>> neighbors;     private Set<String> nodes;     private Set<Set<String>> maxCliques = new HashSet<>();     private Integer minSize;      public CliqueFinder(Map<String, Set<String>> neighbors, Integer minSize) {         this.neighbors = neighbors;         this.nodes = neighbors.keySet();         this.minSize = minSize;     }      private void bk3(Set<String> clique, List<String> candidates, List<String> excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) {             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique);             }             return;         }          for (String s : degeneracy_order(candidates)) {             List<String> new_candidates = new ArrayList<>(candidates);             new_candidates.retainAll(neighbors.get(s));              List<String> new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s));             Set<String> nextClique = new HashSet<>(clique);             nextClique.add(s);             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s);             excluded.add(s);         }     }      private void bk2(Set<String> clique, List<String> candidates, List<String> excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) {             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique);             }             return;         }         String pivot = pick_random(candidates);         if (pivot == null) {             pivot = pick_random(excluded);         }         List<String> tempc = new ArrayList<>(candidates);         tempc.removeAll(neighbors.get(pivot));          for (String s : tempc) {             List<String> new_candidates = new ArrayList<>(candidates);             new_candidates.retainAll(neighbors.get(s));              List<String> new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s));             Set<String> nextClique = new HashSet<>(clique);             nextClique.add(s);             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s);             excluded.add(s);         }     }      private List<String> degeneracy_order(List<String> innerNodes) {         List<String> result = new ArrayList<>();         Map<String, Integer> deg = new HashMap<>();         for (String node : innerNodes) {             deg.put(node, neighbors.get(node).size());         }         while (!deg.isEmpty()) {             Integer min = Collections.min(deg.values());             String minKey = null;             for (String key : deg.keySet()) {                 if (deg.get(key).equals(min)) {                     minKey = key;                     break;                 }             }             result.add(minKey);             deg.remove(minKey);             for (String k : neighbors.get(minKey)) {                 if (deg.containsKey(k)) {                     deg.put(k, deg.get(k) - 1);                 }             }          }         return result;     }       private String pick_random(List<String> random) {         if (random != null && !random.isEmpty()) {             return random.get(0);         } else {             return null;         }     }      public Set<Set<String>> findMaxCliques() {         this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>());         return maxCliques;     }      public static void main(String[] args) {         Map<String, Set<String>> neighbors = new HashMap<>();         neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3")));         neighbors.put("1", new HashSet<>(Arrays.asList("0", "2")));         neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6")));         neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5")));         neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6")));         neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6")));         neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5")));         neighbors.put("7", new HashSet<>(Arrays.asList("6")));         CliqueFinder finder = new CliqueFinder(neighbors, 3);         finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>());         System.out.println(finder.maxCliques);     } }

看完上述內容,你們對Spark Graphx如何實現圖中極大團挖掘有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

吉林省| 青川县| 徐闻县| 陇西县| 措美县| 隆昌县| 枣庄市| 杭锦后旗| 墨脱县| 岳阳市| 吉林市| 大渡口区| 曲阜市| 辽阳县| 普安县| 焉耆| 南召县| 丰宁| 江阴市| 渑池县| 左贡县| 金秀| 井陉县| 乌拉特前旗| 新密市| 垦利县| 荆州市| 盖州市| 平陆县| 于田县| 石屏县| 鲁山县| 天气| 攀枝花市| 滁州市| 岫岩| 大田县| 普陀区| 清苑县| 龙里县| 会宁县|