您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關怎么應對Spark-Redis行海量數據插入、查詢作業時碰到的問題,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
由于redis是基于內存的數據庫,穩定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis時也會碰到很多問題,尤其是執行海量數據插入與查詢的場景中。
Redis是基于內存讀取的數據庫,相比其它的數據庫,Redis的讀取速度會更快。但是當我們要查詢上千萬條的海量數據時,即使是Redis也需要花費較長時間。這時候如果我們想要終止select作業的執行,我們希望的是所有的running task立即killed。
Spark是有作業調度機制的。SparkContext是Spark的入口,相當于應用程序的main函數。SparkContext中的cancelJobGroup函數可以取消正在運行的job。
/** * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup` * for more information. */ def cancelJobGroup(groupId: String) { assertNotStopped() dagScheduler.cancelJobGroup(groupId) }
按理說取消job之后,job下的所有task應該也終止。而且當我們取消select作業時,executor會throw TaskKilledException,而這個時候負責task作業的TaskContext在捕獲到該異常之后,會執行killTaskIfInterrupted。
// If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. val killReason = reasonIfKilled if (killReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException(killReason.get) }
/** * If the task is interrupted, throws TaskKilledException with the reason for the interrupt. */ private[spark] def killTaskIfInterrupted(): Unit
但是Spark-Redis中還是會出現終止作業但是task仍然running。因為task的計算邏輯最終是在RedisRDD中實現的,RedisRDD的compute會從Jedis中取獲取keys。所以說要解決這個問題,應該在RedisRDD中取消正在running的task。這里有兩種方法:
def close() { if (closed) return try { if (null != rs) { rs.close() } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { if (null != stmt) { stmt.close() } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { if (null != conn) { if (!conn.isClosed && !conn.getAutoCommit) { try { conn.commit() } catch { case NonFatal(e) => logWarning("Exception committing transaction", e) } } conn.close() } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } closed = true } context.addTaskCompletionListener{ context => close() } CompletionIterator[InternalRow, Iterator[InternalRow]]( new InterruptibleIterator(context, rowsIterator), close())
try{ val thread = new Thread() { override def run(): Unit = { try { keys = doCall } catch { case e => logWarning(s"execute http require failed.") } isRequestFinished = true } } // control the http request for quite if user interrupt the job thread.start() while (!context.isInterrupted() && !isRequestFinished) { Thread.sleep(GetKeysWaitInterval) } if (context.isInterrupted() && !isRequestFinished) { logInfo(s"try to kill task ${context.getKillReason()}") context.killTaskIfInterrupted() } thread.join() CompletionIterator[T, Iterator[T]]( new InterruptibleIterator(context, keys), close)
我們可以異步線程來執行compute,然后在另外的線程中判斷是否task isInterrupted,如果是的話就執行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted無法殺掉task,再結合InterruptibleIterator:一種迭代器,以提供任務終止功能。通過檢查[TaskContext]中的中斷標志來工作。
我們都已經redis的數據是保存在內存中的。當然Redis也支持持久化,可以將數據備份到硬盤中。當插入海量數據時,如果Redis的內存不夠的話,很顯然會丟失部分數據。這里讓使用者困惑的點在于: 當Redis已使用內存大于最大可用內存時,Redis會報錯:command not allowed when used memory > ‘maxmemory’。但是當insert job的數據大于Redis的可用內存時,部分數據丟失了,并且還沒有任何報錯。
因為不管是Jedis客戶端還是Redis服務器,當插入數據時內存不夠,不會插入成功,但也不會返回任何response。所以目前能想到的解決辦法就是當insert數據丟失時,擴大Redis內存。
Spark-Redis是一個應用還不是很廣泛的開源項目,不像Spark JDBC那樣已經商業化。所以Spark-Redis還是存在很多問題。相信隨著commiter的努力,Spark-Redis也會越來越強大。
看完上述內容,你們對怎么應對Spark-Redis行海量數據插入、查詢作業時碰到的問題有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。