中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第17課:Spark Streaming資源動態申請和動態控制消費速率原理剖析

發布時間:2020-06-12 17:22:36 來源:網絡 閱讀:872 作者:Spark_2016 欄目:大數據

本期內容:

    1、Spark Streaming資源動態分配

    2、Spark Streaming動態控制消費速率

為什么需要動態? 
a)Spark默認情況下粗粒度的,先分配好資源再計算。對于Spark Streaming而言有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會有大量的資源浪費。 

b) Spark Streaming不斷的運行,對資源消耗和管理也是我們要考慮的因素。 
Spark Streaming資源動態調整的時候會面臨挑戰: 
Spark Streaming是按照Batch Duration運行的,Batch Duration需要很多資源,下一次Batch Duration就不需要那么多資源了,調整資源的時候還沒調整完Batch Duration運行就已經過期了。這個時候調整時間間隔。


Spark Streaming資源動態申請 
1. 在SparkContext中默認是不開啟動態資源分配的,但是可以通過手動在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
  } else {
    None
  }
_executorAllocationManager.foreach(_.start())

設置spark.dynamicAllocation.enabled參數為true


這里會通過實例化ExecutorAllocationManager對象來動態分配資源,其內部是有定時器會不斷的去掃描Executor的情況,通過線程池的方式調用schedule()來完成資源動態分配。

/**
 * Register for scheduler callbacks to decide when to add and remove executors, and start
 * the scheduling task.
 */
def start(): Unit = {
  listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
        schedule() //動態調整Executor分配數量
      } catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}


private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now) //更新Executor數量

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
    }
    !expired
  }
}
/**
 * Updates our target number of executors and syncs the result with the cluster manager.
 *
 * Check to see whether our existing allocation and the requests we've made previously exceed our
 * current needs. If so, truncate our target and let the cluster manager know so that it can
 * cancel pending requests that are unneeded.
 *
 * If not, and the add time has expired, see if we can request new executors and refresh the add
 * time.
 *
 * @return the delta in the target number of executors.
 */
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (initializing) {
// Do not change our target while we are still initializing,
    // Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
    // executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
    numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
      logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
    }
numExecutorsTarget - oldNumExecutorsTarget
  } else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
    logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
  } else {
0
}
}

動態控制消費速率: 
Spark Streaming提供了一種彈性機制,流進來的速度和處理速度的關系,是否來得及處理數據。如果不能來得及的話,他會自動動態控制數據流進來的速度,spark.streaming.backpressure.enabled參數設置。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

盐津县| 井陉县| 永清县| 普陀区| 龙里县| 北流市| 瑞安市| 汕头市| 宁德市| 天祝| 大厂| 亳州市| 灵寿县| 瑞安市| 朝阳县| 甘孜县| 邹城市| 彩票| 前郭尔| 泾源县| 华亭县| 大城县| 阳曲县| 肃北| 溧水县| 康乐县| 芒康县| 韶山市| 乐亭县| 华池县| 无锡市| 游戏| 华容县| 黄骅市| 施甸县| 大城县| 加查县| 陈巴尔虎旗| 孟州市| 奇台县| 西吉县|