中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java spark中的bug分析

發布時間:2021-11-24 14:38:04 來源:億速云 閱讀:196 作者:iii 欄目:大數據

這篇文章主要介紹“Java spark中的bug分析”,在日常操作中,相信很多人在Java spark中的bug分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java spark中的bug分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

在spark 中存在一個bug,該bug的詳細信息如下:

None.get
java.util.NoSuchElementException: None.get
scala.None$.get(Option.scala:529)
scala.None$.get(Option.scala:527)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3200)
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3198)

根據源碼定位FileSourceScanExec,定位到如下位置:

 SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled

SparkSession.getActiveSession.get的內容如下:

/**
   * Returns the active SparkSession for the current thread, returned by the builder.
   *
   * @note Return None, when calling this function on executors
   *
   * @since 2.2.0
   */
  def getActiveSession: Option[SparkSession] = {
    if (TaskContext.get != null) {
      // Return None when running on executors.
      None
    } else {
      Option(activeThreadSession.get)
    }
  }

正如注釋所寫的一樣,當在executors端獲取SparkSession的時候,直接返回None。 為什么直接返回none,可以參考spark-pr-21436
當然這個問題,已經有人發現了并且提交了pr-29667,所以拿到commitID(37a660866342f2d64ad2990a5596e67cfdf044c0)直接cherry-pick就ok了,

分析一下原因: 其實該原因就是同一個jvm中,兩個不同的線程同步調用,就如unit test所示:

test("SPARK-32813: Table scan should work in different thread") {
    val executor1 = Executors.newSingleThreadExecutor()
    val executor2 = Executors.newSingleThreadExecutor()
    var session: SparkSession = null
    SparkSession.cleanupAnyExistingSession()

    withTempDir { tempDir =>
      try {
        val tablePath = tempDir.toString + "/table"
        val df = ThreadUtils.awaitResult(Future {
          session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()

          session.createDataFrame(
            session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
            StructType(Seq(
              StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))
            .write.parquet(tablePath)

          session.read.parquet(tablePath)
        }(ExecutionContext.fromExecutorService(executor1)), 1.minute)

        ThreadUtils.awaitResult(Future {
          assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
        }(ExecutionContext.fromExecutorService(executor2)), 1.minute)
      } finally {
        executor1.shutdown()
        executor2.shutdown()
        session.stop()
      }
    }
  }

到此,關于“Java spark中的bug分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

张掖市| 曲靖市| 沧州市| 将乐县| 甘孜县| 洛川县| 义乌市| 江孜县| 施甸县| 靖州| 桐城市| 阿巴嘎旗| 竹溪县| 新营市| 景洪市| 迭部县| 泽库县| 中宁县| 称多县| 通江县| 巴青县| 合山市| 凤阳县| 北安市| 玉树县| 三台县| 阿拉善盟| 历史| 文登市| 龙南县| 德昌县| 石城县| 泸西县| 长子县| 沂源县| 沙田区| 姜堰市| 息烽县| 白朗县| 安塞县| 赞皇县|