中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark-Sql的示例分析

發布時間:2021-12-03 13:35:11 來源:億速云 閱讀:247 作者:小新 欄目:開發技術

這篇文章主要介紹Spark-Sql的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

SparkSQL運行架構

Spark SQL對SQL語句的處理,首先會將SQL語句進行解析(Parse),然后形成一個Tree,在后續的如綁定、優化等處理過程都是對Tree的操作,而操作的方法是采用Rule,通過模式匹配,對不同類型的節點采用不同的操作。

spark-sql是用來處理結構化數據的模塊,是入門spark的首要模塊。

技術的學習無非就是去了解它的API,但是Spark有點難,因為它的例子和網上能搜到的基本都是Scala寫的。我們這里使用Java。

入門例子

數據處理的第一個例子通常都是word count,就是統計一個文件里每個單詞出現了幾次。我們也來試一下。

> 這個例子網上有很多,即使是通過spark實現的也不少;這里面大部分都是使用Scala寫的,我沒有試過;少部分是通過Java寫的;

Java里面的例子有一些是使用RDD實現的,只有極個別是通過DataSet來做的。但即使這一小撮例子,我也跑不通。

所以我自己來嘗試完成這個例子,看到別人用Scala寫三五行就完成了,而我嘗試了一整天幾無進展。在網上東拼西湊熟悉Spark的Java 

還是以我們前面的例子來改:

String logFile = "words";

SparkSession spark = SparkSession.builder().appName("Simple Application").master("local").getOrCreate();

Dataset<String> logData = spark.read().textFile(logFile).cache();

System.out.println("行數:" + logData.count());這里我不再使用之前的README文件,自己創建了一個words文件,內容隨意寫了一堆單詞。

執行程序,可以正常打印出來:

Spark-Sql的示例分析

接下來我們需要把句子分割成一個個單詞合在一起,然后統計每個單詞出現的次數。

> 可能有人會說,這個簡單,我用Java8的流一下就處理好了:

把行集合通過flatMap處理,每一行通過split(" ")分割成一個獨立的單詞集合,再把結果通過自身groupBy一下就拿到終止數據結構Map了。

最后把map的key和value的大小拿到就好了。

的確,使用Java就是這樣實現。但是Spark提供了一套和Java的流API名字和效果類似的工具,區別是Spark的是分布式API

我們通過Spark的flatMap先來處理一下:

Dataset<String> words = logData.flatMap((FlatMapFunction<String, String>) k -> Arrays.asList(k.split("\\s")).iterator(), Encoders.STRING());
System.out.println("單詞數:" + words.count());
words.foreach(k -> {
System.out.println("W:" + k);
});

不同于Java的流,spark這個flatMap的返回值是可以直接訪問結果的:

Spark-Sql的示例分析

> 可能有人留意到spark中函數式方法的參數定義和Java差距較大。他們的參數不太一樣,還多了個編碼器。目前來講我還不清楚為啥這樣定義,不過印象中編碼器也是spark3的重要優化內容。

再Java中使用Scala的方法總是有些怪異,Lambda表達式前面總是需要強制類型轉換,只是為了指明參數類型,否則需要new一個匿名類。

這個也花了我不少時間,后來找到一個網頁org.apache.spark.sql.Dataset.flatMap java code examples | Tabnine

Spark-Sql的示例分析

再往后我迷茫了:

KeyValueGroupedDataset<String, String> group = words.groupByKey((Function1<String, String>) k -> k, Encoders.STRING());

這樣我已經group好了,但是返回的不是DataSet,我也不知道這個返回有啥用,怎么拿到里面的內容呢?我費了好大勁沒搞定。

比如我發現count方法會返回一個DataSet:

Spark-Sql的示例分析

看起來正是我想要的,但是當我想把它輸出竟然執行報錯:

ount.foreach(t -> {
    System.out.println(t);
});

Spark-Sql的示例分析

別說foreach了,就算想看看里面的數量(就像一開始我們查看了文件有幾行那樣)都會報錯,錯誤內容一樣

count.count();

查了很多資料,大意是說spark的計算方法都是分布式的,各個任務之間需要通信,通信時需要序列化來傳遞信息。所以上面我們能看文件行數因為類型是String,有序列化標志;現在生成的是元組,不能序列化。我嘗試了各種方法,甚至自己創建新類模擬了計算過程還是不行

Spark-Sql的示例分析Spark-Sql的示例分析

查了好久資料,比如Job aborted due to stage failure: Task not serializable: | Databricks Spark Knowledge Base (gitbooks.io)依然沒有解決。偶然的機會找到一個令人激動的網站Spark Groupby Example with DataFrame — SparkByExamples終于解決了我的問題。

使用DataFrame

DataFrame雖然是spark提供的重要工具,但是再Java上并沒有對應的類,只是把DataSet的泛型對象改成Row而已。注意這個Row沒有泛型定義,所以里面有哪些列不知道

可以從一開始就把DataSet轉成DataFrame:

Spark-Sql的示例分析

但是可以看到要從Row里面拿數據比較麻煩。所以目前我只在需要序列化的地方轉:

Spark-Sql的示例分析

以上是“Spark-Sql的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

林州市| 镇雄县| 昌吉市| 鄢陵县| 黎川县| 罗山县| 德化县| 五峰| 左贡县| 宜城市| 塔河县| 榕江县| 石台县| 云龙县| 江西省| 宣威市| 辽宁省| 筠连县| 巴林右旗| 通城县| 金塔县| 凤台县| 滁州市| 岑巩县| 临泽县| 黄平县| 长顺县| 玉林市| 饶平县| 华阴市| 广安市| 恩施市| 九龙县| 徐汇区| 忻州市| 星座| 武定县| 襄樊市| 大城县| 延庆县| 万年县|