您好,登錄后才能下訂單哦!
這篇文章主要介紹了怎么在Apache Flink中使用Python API,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Apache Flink 是流批統一的開源大數據計算引擎,在 Flink 1.9.0 版本開啟了新的 ML 接口和全新的Python API架構。那么為什么 Flink 要增加對 Python 的支持,下文將進行詳細分析。
最流行的開發語言
Python 本身是非常優秀的開發語言,據 RedMonk 數據統計,除 Java 和 JavaScript 之外,受歡迎度排名第三。
RedMonk 是著名的以開發人員為中心的行業分析公司,其更詳細的分析信息,大家在拿到我的PPT之后,可以點擊鏈接進行詳細查閱。好了,那么Python的火熱,與我們今天向大家分享的流批統一的大數據計算引擎,Apache Flink有什么關系呢?帶著這個問題,我們大家想想目前與大數據相關的著名的開源組件有哪些呢?比如說最早期的批處理框架Hadoop?流計算平臺Storm,最近異常火熱的Spark?異或其他領域數倉的Hive,KV存儲的HBase?這些都是非常著名的開源項目,那么這些項目都無一例外的進行了Python API的支持。
眾多開源項目支持
Python 的生態已相對完善,基于此,Apache Flink 在 1.9 版本中也投入了大量的精力,去推出了一個全新的 Pyflink。除大數據外,人工智能與Python也有十分密切的關系。
ML青睞的語言
從上圖統計數據可以發現,Python API 本身已經占機器學習崗位需求語言的 0.129%。相對于 R 語言,Python 語言似乎更受青睞。Python 作為解釋型語言,語法的設計哲學是”用一種方法并且只有一種方法來做一件事”。其簡潔和易用性使其成為了世界上最受歡迎的語言,在大數據計算領域都有著很好的生態建設,同時Python在機器學習 在機器學習方面也有很好的前景,所以我們在近期發布的Apache Flink 1.9 以全新的架構推出新的 Python API
Flink 是一款流批統一的計算引擎,社區非常重視和關注 Flink 用戶,除 Java 語言或者 Scala 語言,社區希望提供多種入口,多種途徑,讓更多的用戶更方便的使用 Flink,并收獲 Flink 在大數據算力上帶來的價值。因此 Flink 1.9 開始,Flink 社區以一個全新的技術體系來推出 Python API,并且已經支持了大部分常用的一些算子,比如如 JOIN,AGG,WINDOW 等。
在 Flink 1.9 中雖然 Python 可以使用 Java 的 User-defined Function,但是還缺乏 Python native 的 User-defined function 的定義,所以我們計劃在 Flink 1.10 中進行支持 Python User-defined function 的支持。并技術增加對數據分析工具類庫 Pandas 的支持,在 Flink 1.11 增加對 DataStream API 和 ML API 的支持。
新的 Python API 架構分為用戶 API 部分,PythonVM 和 Java VM 的通訊部分,和最終將作業提交到 Flink 集群進行運行的部分。那么 PythonVM 和 JavaVM 是怎樣通訊的呢?我們在Python 端會會有一個 Python 的 Gateway 用于保持和 Java 通訊的鏈接,在 Java 部分有一個 GateWayServer 用于接收 Python 部分的調用請求。
關于 Python API 的架構部分,在 1.9 之前,Flink 的 DataSet 和 DataStream 已經有了對 Python API 的支持,但是擁有 DataSet API 和 DataStream API 兩套不同的 API。對于 Flink 這樣一個流批統一的流式計算引擎來講,統一的架構至關重要。并且對于已有的 Python DataSet API 和 DataStream API 而言,采用了JPython 的技術體系架構,而 JPython 本身對目前 Python 的 3.X 系列無法很好的支持,所以 Flink 1.9 發布后,決定將原有的 Python API 體系架構廢棄,以全新的技術架構出現。這套全新的 Python API 基于 Table API 之上。
Table API 和 Python API 之間的通訊采用了一種簡單的辦法,利用 Python VM 和 Java VM 進行通信。在 Python API 的書寫或者調用過程中,以某種方式來與 Java API 進行通訊。操作 Python API 就像操作 Java 的 Table API一樣。新架構中可以確保以下內容:
不需要另外創建一套新的算子,可以輕松與 Java 的 Table API 的功能保持一致;
得益于現有的 Java Table API 優化模型,Python 寫出來的API,可以利用 Java API 優化模型進行優化,可以確保 Python 的 API 寫出來的 Job 也能夠具備極致性能。
如圖,當 Python 發起對Java的對象請求時候,在 Java 段創建對象并保存在一個存儲結構中,并分配一個 ID 給 Python 端,Python 端在拿到 Java 對象的 ID 后就可以對這個對象進行操作,也就是說 Python 端可以操作任何 Java 端的對象,這也就是為什么新的架構可以保證Python Table API 和 Java Table API功能一致,并且能過服用現有的優化模型。
在新的架構和通訊模型下,Python API 調用 Java API 只需要在持有 Java 對象的 ID,將調用方法的名字和參數傳遞給 Java VM,就能完成對 Java Table API 的調用,所以在這樣的架構中開發 Python Table API 與開發 Java Table API 的方式完全一致,接下來我為大家詳細介紹如何開發一個簡單的 Python API 作業。
通常來講一個 Python Table Job 一般會分成四個部分,首先要根據目前的現狀,要決定這個Job 是以批的方式運行,還是流的方式運行。當然后續版本用戶可以不考慮,但當前 1.9 版本還是需要考慮。
在決定第一步以怎樣的方式執行 Job 后,我們需要了解數據從哪里來,如何定義 Source、結構數據類型等信息。然后需要寫計算邏輯,然后就是對數據進行計算操作,但最終計算的結果需要持久化到某個系統。最后定義 Sink,與 Source 類似,我們需要定義 Sink Schema,以及每一個字段類型。
下面將詳細分享如何用 Python API 寫每一步?首先,我們創建一個執行環境,對于執行環境本身來講,首先需要一個 ExecutionEnvironment,根本上我們需要一個 TableEnvironment。那么在 TableEnvironment 中,有一個參數 Table Config,Table Config 中會有一些在執行過程中的配置參數,可以傳遞到 RunTime 層。除此之外,還提供了一些個性化的配置項,可以在實際業務開發中進行使用。
在拿到 Environment 后,需要對數據源表進行定義,以 CSV 格式文件為例,用"逗號"分隔,用 Field 來表明這個文件中有哪些字段。那么會看到,目前里面用逗號分隔,并且只有一個字段叫 word,類型是 String。
在定義并描述完數據源數據結構轉換成 Table 數據結構后,也就是說轉換到 Table API 層面之后是怎樣的數據結構和數據類型?下面將通過 with_schema 添加字段及字段類型。這里只有一個字段,數據類型也是 String,最終注冊成一個表,注冊到 catlog 中,就可以供后面的查詢計算使用了。
創建結果表,當計算完成后需要將這些結果存儲到持久化系統中,以 WordCount 為例,首先存儲表會有一個 word 以及它的計數兩個字段,一個是 String 類型的 word,另一個是 Bigint 的計數,然后把它注冊成 Sink。
編寫注冊完 Table Sink 后,再來看如何編寫邏輯。其實用 Python API 寫 WordCount 和 Table API 一樣非常簡單。因為相對于 DataSream 而言 Python API 寫一個 WordCount 只需要一行。比如 group by,先掃描Source表,然后 group by 一個 Word,再進行 Select word 并加上聚合統計Count ,最終將最數據結果插入到結果表里面中。
那么WordCount 怎樣才能真正的運行起來?首先需要搭建開發環境,不同的機器上可能安裝的軟件版本不一樣,這里列出來了一些版本的需求和要求,其中括號中是示例機器上的版本。
第二步,構建一個 Java 的二進制發布包,以從源代碼進行構建,那么這一頁面就是從原代碼獲取我們的主干代碼,并且拉取 1.9 的分支。當然大家可以用 Mater,但是 Master 不夠穩定,還是建議大家在自己學習的過程中,最好是用 1.9 的分支去做。接下來進行實戰演練環節,首先驗證 PPT 的正確性。首先編譯代碼,示例如下:
//下載源代碼git clone https://github.com/apache/flink.git// 拉取1.9分支cd flink; git fetch origin release-1.9git checkout -b release-1.9 origin/release-1.9//構建二進制發布包mvn clean install -DskipTests -Dfast
編譯完成后,需要在相應目錄下找到發布包:
cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
在構建完 Java 的 API 之后進行檢驗,我們要構建一個 Python 的發布包。
因為大多數 Python 的用戶我們都知道我們需要 pip install 方式,將需要的依賴庫進行與本地的 Python 環境進行集成或者安裝。
那么 Flink 也是一樣,PyFlink 也需要打包一個 Pypip 能夠識別的資源進行安裝,在實際的使用中,也可以按這種命令去拷貝,在自己的環境中嘗試。
cd flink-Python;Python setup.py sdist
這個過程只是將 Java 包囊括進來,再把自己 PyFlink 本身模塊的一些 Java 的包和 Python 包打包成一起,它會在 dist 目錄下,有一個 apache-flink-1.9.dev0.tar.gz。
cd dist/
在 dist 目錄的 apache-flink-1.9.dev0.tar.gz 就是我們可以用于 pip install 的 PyFlink 包。在1.9版本,除了 Flink Table,還有 Flink Table Blink。Flink 同時會支持兩個 plan,如果大家可以嘗試,我們可以自由的切換是 Flink 原有的 Planner,還是 Blink 的 Planner,大家可以去嘗試。完成打包后,就可以嘗試把包安裝到我們的實際環境當中。
接下來是一個非常簡單的命令,首先檢查命令的正確性,在執行之前,我們用 pip 檢查一下 list,我們要看在已有的包里有沒有,現在嘗試把剛才打包的包再安裝。在實際的使用過程中,如果升級版,也要有這個過程,要把新的包要進行安裝。
pip install dist/*.tar.gz pip list|grep flink
安裝完成后,就可以用剛才寫的 WordCount 例子來驗證環境是否正確。驗證一下剛才的正確性,怎么驗證?為了大家方便,可以直接克隆 enjoyment.code 倉庫。
git clone https://github.com/sunjincheng121/enjoyment.code.gitcd enjoyment.code; Python word_count.py
接下來體驗并嘗試。在這個目錄下,我們剛才開發的 WordCount 例子。直接用 Python 或檢驗環境是否 OK。這個時候 Flink Python API 會啟動一個 Mini 的 Cluster,會將剛才 WordCount Job 進行執行,提交到一個 Mini Cluster 進行執行。現在 Run 的過程中其實已經在集群上進行執行了。其實在這個代碼里面是讀了一個 Source 文件,把結果寫到 CSV 文件,在當前目錄,是有一個 Sink CSV 的。具體的操作步驟可以查看Flink中文社區視頻Apache Flink Python API 現狀及規劃
IDE 的配置在正常的開發過程中,其實我們大部分還是在本地進行開發的,這里推薦大家還是用 Pychram 來開發 Python 相關的邏輯或者 Job。
同時由于有很大量的截圖存在,也把這些內容整理到了博客當中,大家可以掃描二維碼去關注和查看那么一些詳細的注意事項,博客詳細地址: https://enjoyment.cool。這里有一個很關鍵的地方,大家要注意,就是可能你的環境中有多種 Python 的環境,這時候選擇的環境一定是剛才 pip install 環境。具體操作詳見Apache Flink Python API 現狀及規劃。
還有哪些方式來提交 Job 呢?這是一個 CLI 的方式,也就是說真正的提交到一個現有的集群。首先啟動一個集群。構建的目錄一般在 target 目錄下,如果要啟動一個集群,直接啟動就可以。這里要說一點的是,其中一個集群外部有個 Web Port,它的端口的地址都是在 flink-conf.yaml 配置的。按照 PPT 中命令,可以去查看日志,看是否啟動成功,然后從外部的網站訪問。如果集群正常啟動,接下來看如何提交 Job 。
Flink 通過 run 提交作業,示例代碼如下:
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
用命令行方式去執行,除了用 PY 參數,還可以指定 Python 的 module,以及其他一些依賴的資源文件、JAR等。
在 1.9 版本中還為大家提供一種更便利的方式,就是以 Python Shell 交互式的方式來寫 Python API 拿到結果。有兩種方式可執行,第一種方式是 Local,第二種方式 Remote,其實這兩種沒有本質的差異。首先來看 Local ,命令如下:
bin/pyflink-shell.sh local
啟動一個mini Cluster ,當輸出后,會出來一個 Python 的 Flink CLI 同時會有一些示例程序,供大家來體驗,按照上面的案例就能夠達到正確的輸出和提交,既可以寫 Streaming,也可以寫 Batch。詳細步驟大家參考視頻操作即可。
到目前為止,大家應該已經對 Flink 1.9 上 Python API 架構有了大概了解,同時也了解到如何搭建 Python API 環境。并且以一個簡單的 WordCount 示例,體驗如何在 IDE 里面去執行程序,如何以 Flink run 和交互式的方式去提交 Job。同時也體驗了現有一些交互上的一種方式來使用 Flink Python API。那么介紹完了整個 Flink 的一些環境搭建和一個簡單的示例后。接下來詳細介紹一下在1.9里面所有的核心算子。
上面分享創建一個 Job 的過程,第一要選擇執行的方式是Streaming還是Batch;第二個要定義使用的表,Source、Schema、數據類型;第三是開發邏輯,同時在寫 WordCount 時,使用 Count 的函數。最后,在 Python API 里面內置了很多聚合函數,可以使用count,sum, max,min等等。
所以在目前 Flink 1.9 版本中,已經能夠滿足大多數常規需求。除了剛才講到的 count。Flink Table API 算子 1.9 中也已經支持。關于 Flink Table API 算子,不論是 Python Table API 還是 Java 的Table API,都有以下幾種類型的操作。第一單流上的操作,比如說做一些SELECT、Filter,同時還可以在流上做一些聚合,包括開窗函數的 windows 窗口聚合以及列的一些操作,比如最下面的 add_columns 和 drop_columns。
除了單流,還有雙流的操作,比如說雙流 JOIN、雙流 minus、union ,這些算子在Python Table API 里面都提供了很好的支持。Python Table API 在 Flink 1.9 中,從功能的角度看幾乎完全等同于Java Table API,下面以實際代碼來看上述算子是怎么編寫的以及怎么去開發Python算子。
2.Python Table API 算子-Watermark定義
細心的同學可能會注意到,我們尚未提到流的一個特質性 -> 時序。流的特性是來的順序是可能亂序,而這種亂序又是流上客觀存在的一種狀態。在 Flink 中一般采用 Watermark 機制來解決這種亂序的問題。
在 Python API 中如何定義 Watermark?假設有一個 JSON 數據,a 字段 String,time 字段 datetime。這個時候定義 Watermark 就要在增加 Schema 時增加 rowtime 列。rowtime 必須是 timestamps 類型。
Watermark 有多種定義方式,上圖中 watermarks_periodic_bounded 即會周期性的去發 Watermark,6萬單位是毫秒。如果數據是亂序的,能夠處理一分鐘之內的亂序,所以這個值調的越大,數據亂序接受程度越高,但是有一點數據的延遲也會越高。關于 Watermark 原理大家可以查看我的blog: http://1t.click/7dM。
最后,跟大家分享一下 Java UDF在 Flink 1.9 版本中的應用, 雖然在1.9中不支持 Python 的 UDF ,但 Flink 為大家提供了可以在 Python 中使用 Java UDF。在 Flink 1.9 中,對 Table 模塊進行了優化和重構,目前開發 Java UDF 只需要引入 Flink common 依賴就可以進行 Python API 開發。
接下來以一個具體的示例給大家介紹利用 Java UDF 開發 Python API UDF,假設我們開發一個求字符串長度的 UDF,在 Python 中需要用 Java 中的 register_java_function,function 的名字是包全路徑。然后在使用時,就可以用注冊的名字完成UDF的調用,詳細可以查閱我的Blog: http://1t.click/HQF。
那怎樣來執行?可以用 Flink run 命令去執行,同時需要將UDF的JAR包攜帶上去。
Java UDF 只支持 Scalar Function?其實不然,在 Java UDF中既支持 Scalar Function,也支持 Table Function和Aggregate Function。如下所示:
感謝你能夠認真閱讀完這篇文章,希望小編分享的“怎么在Apache Flink中使用Python API”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。