中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark 流式去重的示例分析

發布時間:2021-12-16 21:17:54 來源:億速云 閱讀:131 作者:柒染 欄目:大數據

本篇文章為大家展示了spark 流式去重的示例分析,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

大數據去重本身很蛋疼,針對個別數據去重更是不可理喻但是spark的Structured Streaming就很容易能實現這個功能。

數據從采集到最終處理結束是會存在一條數據在某一個點被重復接收處理的情況。如 kafka支持的是至少一次寫語義,也即是當寫數據到kafka的時候,有些記錄可能重復,例如如果消息已經被broker接收并寫入文件但是并沒有應答,這時生產者向kafka重發一個消息,就可能重復。由于kafka的至少一次的寫語義,structured streaming不能避免這種類型數據重復。所以一旦寫入成功,可以假設structured Streaming的查詢輸出是以至少一次語義寫入kafka的。一個可行去除重復記錄的解決方案是數據中引入一個primary(unique)key,這樣就可以在讀取數據的時候實行去重。

structured streaming是可以使用事件中的唯一標識符對數據流中的記錄進行重復數據刪除。這與使用唯一標識符列的靜態重復數據刪除完全相同。該查詢將存儲來自先前記錄的一定量的數據,以便可以過濾重復的記錄。與聚合類似,可以使用帶有或不帶有watermark 的重復數據刪除功能。

A),帶watermark:如果重復記錄可能到達的時間有上限,則可以在事件時間列上定義watermark,并使用guid和事件時間列進行重復數據刪除。

B),不帶watermark:由于重復記錄可能到達時間沒有界限,所以查詢將來自所有過去記錄的數據存儲為狀態。

源代碼,已測試通過~

package bigdata.spark.StructuredStreaming.KafkaSourceOperator

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object KafkaDropDuplicate {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/opt/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
       ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))

   val spark = SparkSession
     .builder
     .appName("StructuredKafkaWordCount")
     .config(sparkConf)
     .getOrCreate()
   import spark.implicits._

   val df = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers","mt-mdh.local:9093")
     .option("subscribe", "jsontest")
     .load()
   val words = df.selectExpr("CAST(value AS STRING)")

   val fruit = words.select(
     get_json_object($"value", "$.time").alias("timestamp").cast("long")
     , get_json_object($"value", "$.fruit").alias("fruit"))

   val fruitCast = fruit
     .select(fruit("timestamp")
       .cast("timestamp"),fruit("fruit"))
     .withWatermark("timestamp", "10 Seconds")
     .dropDuplicates("fruit")
     .groupBy("fruit").count()

   fruitCast.writeStream
     .outputMode(OutputMode.Complete())
     .format("console")
     .trigger(Trigger.ProcessingTime(5000))
     .option("truncate","false")
     .start()
     .awaitTermination()
 }
}

上述內容就是spark 流式去重的示例分析,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

普兰店市| 出国| 南靖县| 安庆市| 新余市| 双柏县| 崇信县| 临城县| 塔城市| 伊吾县| 察雅县| 吉首市| 彭泽县| 栾城县| 日土县| 宿州市| 陵水| 浦城县| 阜新市| 腾冲县| 慈溪市| 横山县| 罗山县| 江口县| 四子王旗| 钟山县| 依兰县| 金沙县| 元阳县| 绿春县| 大理市| 桂平市| 广元市| 上林县| 赫章县| 孝昌县| 济宁市| 凤山县| 普兰店市| 木兰县| 梓潼县|