中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

入門教程 | 5分鐘從零構建第一個 Flink 應用

發布時間:2020-07-13 16:21:37 來源:網絡 閱讀:964 作者:Ververica 欄目:大數據

本文轉載自 Jark’s Blog ,作者伍翀(云邪),Apache Flink Committer,阿里巴巴高級開發工程師。
本文將從開發環境準備、創建 Maven 項目,編寫 Flink 程序、運行程序等方面講述如何迅速搭建第一個 Flink 應用。
在本文中,我們將從零開始,教您如何構建第一個 Flink 應用程序。

開發環境準備

Flink 可以運行在 Linux, Max OS X, 或者是 Windows 上。為了開發 Flink 應用程序,在本地機器上需要有 Java 8.xmaven 環境。

如果有 Java 8 環境,運行下面的命令會輸出如下版本信息:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
如果有 maven 環境,運行下面的命令會輸出如下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
另外我們推薦使用 ItelliJ IDEA (社區免費版已夠用)作為 Flink 應用程序的開發 IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項目下會有些已知問題,所以不太推薦 Eclipse。下一章節,我們會介紹如何創建一個 Flink 工程并將其導入 ItelliJ IDEA。

創建 Maven 項目

我們將使用 Flink Maven Archetype 來創建我們的項目結構和一些初始的默認依賴。在你的工作目錄下,運行如下命令來創建項目:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數,Maven 將自動為你創建如下所示的項目結構:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我們的 pom.xml 文件已經包含了所需的 Flink 依賴,并且在 src/main/java 下有幾個示例程序框架。接下來我們將開始編寫第一個 Flink 程序。

編寫 Flink 程序

啟動 IntelliJ IDEA,選擇 "Import Project"(導入項目),選擇 my-flink-project 根目錄下的 pom.xml。根據引導,完成項目導入。

在 src/main/java/myflink 下創建 SocketWindowWordCount.java 文件:


package myflink;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

  }
}

現在這程序還很基礎,我們會一步步往里面填代碼。注意下文中我們不會將 import 語句也寫出來,因為 IDE會自動將他們添加上去。在本節末尾,我會將完整的代碼展示出來,如果你想跳過下面的步驟,可以直接將最后的完整代碼粘到編輯器中。

Flink 程序的第一步是創建一個 StreamExecutionEnvironment 。這是一個入口類,可以用來設置參數和創建數據源以及提交任務。所以讓我們把它添加到 main 函數中:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們將創建一個從本地端口號 9000 的 socket 中讀取數據的數據源:

DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

這創建了一個字符串類型的 DataStreamDataStream 是 Flink 中做流處理的核心 API,上面定義了非常多常見的操作(如,過濾、轉換、聚合、窗口、關聯等)。在本示例中,我們感興趣的是每個單詞在特定時間窗口中出現的次數,比如說5秒窗口。為此,我們首先要將字符串數據解析成單詞和次數(使用Tuple2&lt;String, Integer&gt;表示),第一個字段是單詞,第二個字段是次數,次數初始值都設置成了1。我們實現了一個flatmap,因為一行數據中可能有多個單詞。

DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });

接著我們將數據流按照單詞字段(即0號索引字段)做分組,這里可以簡單地使用 keyBy(int index)方法,得到一個以單詞為 key 的Tuple2&lt;String, Integer&gt;數據流。然后我們可以在流上指定想要的窗口,并根據窗口中的數據計算結果。在我們的例子中,我們想要每5秒聚合一次單詞數,每個窗口都是從零開始統計的。

DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

第二個調用的 .timeWindow()指定我們想要5秒的翻滾窗口(Tumble)。第三個調用為每個key每個窗口指定了sum聚合函數,在我們的例子中是按照次數字段(即1號索引字段)相加。得到的結果數據流,將每5秒輸出一次這5秒內每個單詞出現的次數。

最后一件事就是將數據流打印到控制臺,并開始執行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

最后的 env.execute調用是啟動實際Flink作業所必需的。所有算子操作(例如創建源、聚合、打印)只是構建了內部算子操作的圖形。只有在execute()被調用時才會在提交到集群上或本地計算機上執行。

下面是完整的代碼,部分代碼經過簡化(代碼在 GitHub 上也能訪問到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 創建 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 通過連接 socket 獲取輸入數據,這里連接到本地9000端口,如果9000端口已被占用,請換一個端口
    DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

    // 解析數據,按 word 分組,開窗,聚合
    DataStream<Tuple2<String, Integer>> windowCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 將結果打印到控制臺,注意這里使用的是單線程打印,而非多線程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}

運行程序

要運行示例程序,首先我們在終端啟動 netcat 獲得輸入流:

nc -lk 9000

如果是 Windows 平臺,可以通過 https://nmap.org/ncat/ 安裝 ncat 然后運行:

ncat -lk 9000

然后直接運行SocketWindowWordCount的 main 方法。

只需要在 netcat 控制臺輸入單詞,就能在 SocketWindowWordCount 的輸出控制臺看到每個單詞的詞頻統計。如果想看到大于1的計數,請在5秒內反復鍵入相同的單詞。

Cheers !

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

五寨县| 白水县| 沈丘县| 大城县| 吉木萨尔县| 疏勒县| 新建县| 富宁县| 大邑县| 临沭县| 漳浦县| 合山市| 海宁市| 大竹县| 大石桥市| 新营市| 武汉市| 佛教| 内江市| 新安县| 日喀则市| 上饶县| 云林县| 句容市| 西林县| 子长县| 时尚| 隆林| 灵丘县| 富锦市| 望奎县| 山东省| 乐安县| 仁化县| 莱西市| 繁昌县| 舒兰市| 承德县| 舟曲县| 揭西县| 多伦县|