中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark 的Core深入(二)

發布時間:2020-07-25 09:40:23 來源:網絡 閱讀:1886 作者:flyfish225 欄目:大數據

Spark 的 Core 深入(二)

標簽(空格分隔): Spark的部分


  • 一: 日志清洗的優化
  • 二:Spark RDD
  • 三:SparkContext三大功能
  • 四:Spark on YARN
  • 五: spark RDD 的 依賴

一、日志清洗的優化:

1.1 日志清洗有臟數據問題

hdfs dfs -mkdir /apachelog/
hdfs dfs -put access_log /apachelogs
hdfs dfs -ls /apachelogs

Spark 的Core深入(二)

 執行結果報錯。

Spark 的Core深入(二)

 LogAnalyzer.scala
package com.ibeifeng.bigdata.spark.app.core
import org.apache.spark.{SparkContext, SparkConf}
/**
 * Created by zhangyy on 2016/7/16.
 */
object LogAnalyzer {
  def main(args: Array[String]) {
    // step 0: SparkContext
    val sparkConf = new SparkConf()
      .setAppName("LogAnalyzer Applicaiton") // name
      .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
    // Create SparkContext
    val sc = new SparkContext(sparkConf)
    /** ================================================================== */
    val logFile = "/apachelogs/access_log"
    // step 1: input data
    val accessLogs = sc.textFile(logFile)
       // filer logs data
       .filter(ApacheAccessLog.isValidateLogLine) // closures
        /**
         * parse log
         */
        .map(line => ApacheAccessLog.parseLogLine(line))
    /**
     * The average, min, and max content size of responses returned from the server.
     */
    val contentSizes = accessLogs.map(log => log.contentSize)
    // compute
    val avgContentSize = contentSizes.reduce(_ + _) / contentSizes.count()
    val minContentSize = contentSizes.min()
    val maxContentSize = contentSizes.max()
    // println
    printf("Content Size Avg: %s , Min : %s , Max: %s".format(
      avgContentSize, minContentSize, maxContentSize
    ))
    /**
     * A count of response code's returned
     */
    val responseCodeToCount = accessLogs
      .map(log => (log.responseCode, 1))
      .reduceByKey(_ + _)
      .take(3)
    println(
      s"""Response Code Count: ${responseCodeToCount.mkString(", ")}"""
    )
    /**
     * All IPAddresses that have accessed this server more than N times
     */
    val ipAddresses = accessLogs
        .map(log => (log.ipAddress, 1))
        .reduceByKey( _ + _)
    //    .filter( x => (x._2 > 10))
        .take(5)
    println(
      s"""IP Address : ${ipAddresses.mkString("< ", ", " ," >")}"""
    )
    /**
     * The top endpoints requested by count
     */
    val topEndpoints = accessLogs
      .map(log => (log.endPoint, 1))
      .reduceByKey(_ + _)

      .top(3)(OrderingUtils.SecondValueOrdering)

     // .map(tuple => (tuple._2, tuple._1))

     // .sortByKey(false)
      //.take(3)
      //.map(tuple => (tuple._2, tuple._1))
    println(
      s"""Top Endpoints : ${topEndpoints.mkString("[", ", ", " ]")}"""
    )
    /** ================================================================== */
    // Stop SparkContext
    sc.stop()
  }
}
ApacheAccessLog.scala
package com.ibeifeng.bigdata.spark.app.core
/**
 * Created by zhangyy on 2016/7/16.
 *
 * 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800]
 * "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1"
 * 200 1234
 */
case class ApacheAccessLog (
                             ipAddress: String,
                             clientIndentd: String,
                             userId: String,
                             dateTime:String,
                             method: String,
                             endPoint: String,
                             protocol: String,
                             responseCode: Int,
                             contentSize: Long)
object ApacheAccessLog{
  // regex
  // 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
  val PARTTERN ="""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  /**
   *
   * @param log
   * @return
   */

  def isValidateLogLine(log: String): Boolean = {
    // parse log
    val res = PARTTERN.findFirstMatchIn(log)
    // invalidate
    if (res.isEmpty) {
      false
    }else{
      true
    }

  }

  /**
   *
   * @param log
   * @return
   */
  def parseLogLine(log: String): ApacheAccessLog ={
    // parse log
    val res = PARTTERN.findFirstMatchIn(log)
    // invalidate
    if(res.isEmpty){
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    // get value
    val m = res.get
    // return
    ApacheAccessLog( //
      m.group(1), //
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong)
  }
}
OrderingUtils.scala
package com.ibeifeng.bigdata.spark.app.core

import scala.math.Ordering

/**
 * Created by zhangyy on 2016/7/16.
 */
object OrderingUtils {

  object SecondValueOrdering extends Ordering[(String, Int)]{
    /**
     *
     * @param x
     * @param y
     * @return
     */
    override def compare(x: (String, Int), y: (String, Int)): Int = {
      x._2.compare(y._2)
      // x._2 compare y._2  // 1 to 10 | 1.to(10)
    }
  }
}

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)


二、Spark RDD

2.1:RDD的含義:

RDD,全稱為Resilient Distributed Datasets,是一個容錯的、并行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,并能控制數據的分區。同時,RDD還提供了一組豐富的操作來操作這些數據。在這些操作中,諸如map、flatMap、filter等轉換操作實現了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支持常見的數據運算

2.2、RDD 在 hdfs的結構

Spark 的Core深入(二)

Spark 的Core深入(二)

val rdd = sc.textFile("/spark/rdd")
rdd.partitions.length

rdd.cache
rdd.count 

一個分區默認一個task 分區去處理
默認是兩個分區去處理

Spark 的Core深入(二)

Spark 的Core深入(二)

2.3、RDD的五個特點對應方法

1. A list of partitions : (protected def getPartitions: Array[Partition])

   一系列的的分片,比如說64M一片,類似于hadoop中的split

2. A function ofr computing each split :( @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T])

 在每個分片上都有一個方式去迭代/執行/計算

 3. A list of dependencies on other RDD  :(protected def getDependencies: Seq[Dependency[_]] = deps)
 一系列的依賴:RDDa 轉換為RDDb,轉換為 RDDc, 那么RDDc 就依賴于RDDb , RDDb 又依賴于RDDa

 ---
 wordcount 程序:

 ## val rdd = sc.textFile("xxxx")

 val wordRdd = rdd.flatMap(_.split(""))

 val kvRdd = wordRdd.map((_,1))

 val WordCountRdd = kvRdd.reduceByKey(_ + _)

 # wrodcountRdd.saveAsTextFile("yy")

 kvRdd <- wordRdd <- rdd

 rdd.toDebugString

 ---

 4. Optionlly,a Partitioner for kev-values RDDs (e,g,to say that the RDDis hash-partitioned) :(/** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None)

 5. optionlly,a list of preferred location(s) to compute each split on (e,g,block location for an HDFS file)
 :(protected def getPreferredLocations(split: Partition): Seq[String] = Nil)
 要運行的計算/執行最好在哪(幾)個機器上運行,數據本地型

 為什么會有那幾個呢?

 比如: hadoop 默認有三個位置,或者spark cache 到內存是可能同過StroageLevel 設置了多個副本,所以一個partition 可能返回多個最佳位置。

Spark 的Core深入(二)

2.4、 如何創建RDD的兩種方式

方式一:
    并行化集合:
     并行化集合
    List\Seq\Array

    SparkContext:
    ----
        def parallelize[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] 
---
list 創建:
val list = List("11","22","33")
val listRdd = sc.parallelize(list)
listRdd.count
listRdd.frist
listRdd.take(10)
seq 創建:
val seq = Sep("aa","bb","cc")
val seqRdd = sc.parallelize(seq)

seqRdd.count
seqRdd.frist 
seqRdd.take(10)
Array創建:
val array = Array(1,2,3,4,5)

val arryRdd = sc.parallelize(array)

arryRdd.first
arryRdd.count
arryRdd.take(10)
方式二:從外部存儲創建:

val disFile = sc.textFile("/input")

2.5、RDD的轉換過程

Spark 的Core深入(二)

Spark 的Core深入(二)

transformation 轉換
actions 執行出結果

persistence  基本都是cache過程
2.5.1: rdd transformation 應用

Spark 的Core深入(二)

union()合并應用

val rdd1 = sc.parallelize(Array(1,2,3,4,5))

val rdd2 = sc.parallelize(Array(6,7,8,9,10))

val rdd = rdd1.union(rdd2)

rdd.collect

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

對于分布式計算框架來說,性能瓶頸
    IO
        -1,磁盤IO
        -2,網絡IO

    rdd1 -> rdd2
        Shuffle

============================================
groupByKey() & reduceByKey()

在實際開發中,如果可以使用reduceByKey實現的功能,就不要使用groupBykey
    使用reduceByKey有聚合功能,類似MapReduce中啟用了Combiner
===============
join()
    -1,等值鏈接

    -2,左連接

數據去重
    結果數據
        res-pre.txt  - rdd1
    新數據進行處理
        web.tsv - 10GB    - rdd2
        解析里面的url,
        如果res-pre.txt中包含,就不放入,不包含就加入或者不包含url進行特殊處理

rdd2.leftJoin(rdd1)
join()應用
val list =List("aa","bb","cc","dd")

val rdd1 = sc.parallelize(list).map((_, 1))

rdd1.collect

val list2 = List("bb","cc","ee","hh")

val rdd2 = sc.parallelize(list2).map((_, 1))

rdd2.collect

val rdd = rdd2.leftOuterJoin(rdd1)

rdd.collect

rdd.filter(tuple => tuple._2._2.isEmpty).collect
repartition()應用:
val rdd = sc.textFile("/spark/rdd")

rdd.repartition(2)

rdd.count 

Spark 的Core深入(二)

2.5.2: RDD Actions 操作

Spark 的Core深入(二)

val list = List(("aa",1),("bb",4),("aa",56),("cc",0),("aa",89),("cc",34))
val rdd = sc.parallelize(list)
rdd.countByKey

Spark 的Core深入(二)


wordcount 轉變

val rdd = sc.textFile("\input")
rdd.flatMap(_.split(" ")).map((_, 1)).countByKey

Spark 的Core深入(二)

foreach() 應用
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
rdd.foreach(line => println(line))
分組topkey
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
val rdd = sc.textFile("/topkeytest")

val topRdd = rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))

topRdd.collect

Spark 的Core深入(二)

三:SparkContext三大功能

3.1、沒有使用廣播變量

Spark 的Core深入(二)

SparkContext 的作用:

-1,向Master(主節點,集群管理的主節點)申請資源,運行所有Executor
    -2,創建RDD的入口
        sc.textFile("") // 從外部存儲系統創建
        sc.parxx() // 并行化,從Driver 中的集合創建
    -3,調度管理JOB運行
        DAGScheduler 、 TaskScheduler
        --3.1
            為每個Job構建DAG圖
        --3.2
            DAG圖劃分為Stage
                按照RDD之間是否存在Shuffle
                倒推(Stack)
        --3.3
            每個Stage中TaskSet
                每個階段中Task代碼相同,僅僅處理數據不同

3.2 使用廣播變量

Spark 的Core深入(二)

val list = List(".", "?", "!", "#", "$")
      val braodCastList = sc.broadcast(list)
      val wordRdd = sc.textFile("")
        wordRdd.filter(word => {
            braodCastList.value.contains(word)
        })

3.4 spark 的 cluster mode

Spark 的Core深入(二)

3.4.1 spark的部署模式:
1.spark的默認模式是local模式
  spark-submint Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

2. spark job 運行在客戶端集群模式:

spark-submit --master spark://192.168.3.1:7077 --deploy-mode cluster Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

3.5 spark 增加外部依賴jar包的方法

方式一:
    --jars JARS                 
      Comma-separated list of local jars to include on the driver and executor classpaths.
      jar包的位置一定要寫決定路徑。

方式二:
    --driver-class-path
      Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

方式三:
    SPARK_CLASSPATH
      配置此環境變量
3.5.1 企業中Spark Application提交,shell 腳本
spark-app-submit.sh:

#!/bin/sh

## SPARK_HOME
SPARK_HOME=/opt/cdh6.3.6/spark-1.6.1-bin-2.5.0-cdh6.3.6

## SPARK CLASSPATH
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

四:Spark on YARN

4.1 啟動hadoop的YARN上面的服務

cd /soft/hadoop/sbin

啟動rescouremanager: 
./yarn-daemon.sh start resourcemanager

啟動nodemanger:
./yarn-daemon.sh start nodemanager

Spark 的Core深入(二)

4.2 yarn 的架構

Spark 的Core深入(二)

YARN
    -1,分布式資源管理
        主節點:ResouceManager
        從節點:NodeManager -> 負責管理每臺機器上的資源(內存和CPU Core)
    -2,資源調度
        --1,容器Container
            AM/Task
        --2,對于運行在YARN上的每個應用,一個應用的管理者ApplicaitonMaster   資源申請和任務調度

4.2 Spark Application

Spark Application
    -1,Driver Program
        資源申請和任務調度
    -2,Executors
        每一個Executor其實就是一個JVM,就是一個進程

以spark deploy mode : client
    AM
                        -- 全部都允許在Container中
    Executor s
        運行在Container中,類似于MapReduce任務中Map Task和Reduce Task一樣

Driver -> AM -> RM 

Spark 的Core深入(二)

Spark 的Core深入(二)

4.3 spark on yarn 的運行

spark-shell --master yarn

Spark 的Core深入(二)
Spark 的Core深入(二)
Spark 的Core深入(二)

4.4 spark job on yarn

cd jars/

spark-submit --master yarn --deploy-mode cluster Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

五: spark RDD 的 依賴

5.1 RDD Rependencies

Spark 的Core深入(二)

spark的wordcount

## 
val rdd = sc.textFile("/input")
##
val wordRdd = rdd.flatMap(_.split(" "))
val kvRdd = wordRdd.map((_, 1))
val wordcountRdd = kvRdd.reduceByKey(_ + _)
##
wordcountRdd.collect

-----------------

    input -> rdd  -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT

-> 

    wordcountRdd -> output            :Stage-02 -> ResultStage -> ResultTask
1. 窄依賴(narrow dependencies)
    1.1:子RDD的每個分區依賴于常數個父分區(即與數據規模無關)
    1.2: 輸入輸出一對一的算子,且結過RDD 的分區結構不變,主要是map,flatMap
    1.3:輸出一對一,單結果RDD 的分區結構發生變化,如:union,coalesce
    1.4: 從輸入中選擇部分元素的算子,如filer,distinct,subtract,sample

2. 寬依賴(wide dependencies)
   2.1: 子RDD的每個分區依賴于所有父RDD 分區
   2.2:對單個RDD 基于key進行重組和reduce,如groupByKey,reduceByKey

   2.3:對兩個RDD 基于key 進行join和重組,如:join
如何判斷RDD之間是窄依賴還是寬依賴:
    父RDD的每個分區數據 給 子RDD的每個分區數據

        1    ->     1

        1    ->     N    :  MapReduce 中 Shuffle

5.2 spark 的shuffle

5.2.1 spark shuffle 的內在原理
在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark作為MapReduce框架的一種實現,自然也實現了shuffle的邏輯。
5.2.2 shuffle
Shuffle是MapReduce框架中的一個特定的phase,介于Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,并且分發到每一個Reducer上去,這個過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。

下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介于Map phase和Reduce phase之間。

Spark 的Core深入(二)

概念上shuffle就是一個溝通數據連接的橋梁,那么實際上shuffle(partition)這一部分是如何實現的的呢,下面我們就以Spark為例講一下shuffle在Spark中的實現。
5.2.3 spark的shuffle

Spark 的Core深入(二)

 1.首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×RM×R,其中MM是Map的個數,RR是Reduce的個數。

2.其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。
當Reducer啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。
這里的bucket是一個抽象概念,在實現中每個bucket可以對應一個文件,可以對應文件的一部分或是其他等。

3. Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有著諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供數據的一端,被稱作 Map 端,Map 端每個生成數據的任務稱為 Mapper,對應的,接收數據的一端,被稱作 Reduce 端,Reduce 端每個拉取數據的任務稱為 Reducer,Shuffle 過程本質上都是將 Map 端獲得的數據使用分區器進行劃分,并將數據發送給對應的 Reducer 的過程。
那些操作會引起shuffle

1. 具有重新調整分區操作,
eg: repartition,coalese

2. *ByKey   eg: groupByKey,reduceByKey

3. 關聯操作 eg:join,cogroup
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金川县| 岳阳县| 永登县| 东乌| 乌鲁木齐县| 焦作市| 诸城市| 丰城市| 渭南市| 安阳县| 岳普湖县| 进贤县| 太白县| 恭城| 黄山市| 建始县| 毕节市| 红河县| 曲麻莱县| 高清| 新绛县| 河津市| 兰坪| 疏附县| 泰顺县| 庄河市| 星座| 镇巴县| 自治县| 新宁县| 东海县| 托克托县| 吉隆县| 肥城市| 大田县| 唐海县| 鲁甸县| 连江县| 得荣县| 蚌埠市| 万安县|