中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink開發如何批處理應用程序

發布時間:2021-10-20 16:23:45 來源:億速云 閱讀:156 作者:柒染 欄目:大數據

Flink開發如何批處理應用程序,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

需求

詞頻統計,即給一個文件,統計文件中每個單詞出現的次數,分隔符是\t。這個文件內容如下:

hello    world    welcome
hello    welcome

統計結果直接打印在控制臺。生產環境下一般Sink到目的地。

使用Flink + java實現需求

環境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

創建項目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train version:1.0 這樣就創建了一個項目,使用Idea導入這個項目,項目結構如下:

Flink開發如何批處理應用程序

里面有兩個自動為我們準備好的java類。

開發步驟

第一步:創建批處理上下文環境

// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

第二步:讀取數據

env.readTextFile(textPath);

第三步:transform operations,例如 filter()  flatMap()  join()  coGroup(),這是開發的核心所在,一般就是業務邏輯

第四步:execute program

具體操作

第一步:讀取數據

hello	welcome

第二步:每一行的數據按照指定的分隔符拆分

hello
welcome

第三步:為每一個單詞賦上次數為1

(hello,1)
(welcome,1)

第四步:合并操作

代碼實現

/**
 * 使用Java API來開發Flink的批處理應用程序
 */
public class BatchWCJavaApp {
    public static void main(String[] args) throws Exception {
        String input = "E:/test/input/test.txt";
        // step1: 獲取運行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // step2: 讀取數據
        DataSource<String> text = env.readTextFile(input);
        // step3: transform
        // FlatMapFunction<String, Tuple2<String, Integer>表示進來一個String, 轉換成一個<String, Integer>類型
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            /**
             *
             * @param value 就是一行一行的字符串
             * @param out 轉換成(單詞,次數)
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split("\t");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).groupBy(0).sum(1).print();
    }
}

運行結果

(world,1)
(hello,2)
(welcome,2)

使用Flink + scala實現需求

環境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

創建項目,跟使用java方式是一樣的

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 這樣就創建了一個項目,使用Idea導入這個項目:

Flink開發如何批處理應用程序

接下來的開發步驟與使用java實現的開發步驟是一樣的:這里給出

代碼實現

import org.apache.flink.api.scala.ExecutionEnvironment


/**
  * 使用Scala開發Flink的批處理應用程序
  */
object BatchWCScalaApp {
  def main(args: Array[String]): Unit = {
    val input = "E:/test/input/test.txt"
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(input)
    // 引入隱式轉換
    import org.apache.flink.api.scala._

    text.flatMap(_.toLowerCase.split("\t"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .print()
  }
}

Java與Scala實現方式對比

算子與簡潔性

也就是transform部分雖然原理是一樣的,但是實現的方式不一樣,scala更加簡潔

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

酉阳| 白河县| 延寿县| 上林县| 阿合奇县| 柏乡县| 静海县| 延安市| 察隅县| 扶沟县| 马鞍山市| 清河县| 天门市| 托克托县| 文昌市| 辽中县| 宾阳县| 纳雍县| 巴林右旗| 海宁市| 黔江区| 富宁县| 宣武区| 收藏| 清徐县| 灌云县| 海城市| 肇源县| 锡林郭勒盟| 平度市| 土默特右旗| 乌拉特后旗| 乌拉特中旗| 昔阳县| 榆林市| 新蔡县| 葵青区| 扶绥县| 南投市| 独山县| 敖汉旗|