中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Javalambda表達式如何實現FlinkWordCount

發布時間:2021-09-28 10:52:59 來源:億速云 閱讀:79 作者:小新 欄目:編程語言

小編給大家分享一下Javalambda表達式如何實現FlinkWordCount,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

環境準備

導入Flink 1.9 pom依賴

<dependencies>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-java</artifactId>      <version>1.9.0</version>    </dependency>    <dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-streaming-java_2.11</artifactId>      <version>1.9.0</version>    </dependency>    <dependency>      <groupId>org.apache.commons</groupId>      <artifactId>commons-lang3</artifactId>      <version>3.7</version>    </dependency>  </dependencies>

構建Flink流處理環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {      private boolean isCanal = false;      private String[] words = {          "important oracle jdk license update",          "the oracle jdk license has changed for releases starting april 16 2019",          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",          "downloading and using this product an faq is available here ",          "commercial license and support is available with a low cost java se subscription",          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"      };      @Override      public void run(SourceContext<String> ctx) throws Exception {        // 每秒發送一行文本        while (!isCanal) {          int randomIndex = RandomUtils.nextInt(0, words.length);          ctx.collect(words[randomIndex]);          Thread.sleep(1000);        }      }      @Override      public void cancel() {        isCanal = true;      }    });

單詞計算

// 3. 單詞統計    // 3.1 將文本行切分成一個個的單詞    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {      // 切分單詞      Arrays.stream(line.split(" ")).forEach(word -> {        ctx.collect(word);      });    }).returns(Types.STRING);    //3.2 將單詞轉換為一個個的元組    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS        .map(word -> Tuple2.of(word, 1))        .returns(Types.TUPLE(Types.STRING, Types.INT));    // 3.3 按照單詞進行分組    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);    // 3.4 對每組單詞數量進行累加    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS        .timeWindow(Time.seconds(3))        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));    resultDS.print();

參考代碼

public class WordCount {  public static void main(String[] args) throws Exception {    // 1. 構建Flink流式初始化環境    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    // 2. 自定義source - 每秒發送一行文本    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {      private boolean isCanal = false;      private String[] words = {          "important oracle jdk license update",          "the oracle jdk license has changed for releases starting april 16 2019",          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",          "downloading and using this product an faq is available here ",          "commercial license and support is available with a low cost java se subscription",          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"      };      @Override      public void run(SourceContext<String> ctx) throws Exception {        // 每秒發送一行文本        while (!isCanal) {          int randomIndex = RandomUtils.nextInt(0, words.length);          ctx.collect(words[randomIndex]);          Thread.sleep(1000);        }      }      @Override      public void cancel() {        isCanal = true;      }    });    // 3. 單詞統計    // 3.1 將文本行切分成一個個的單詞    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {      // 切分單詞      Arrays.stream(line.split(" ")).forEach(word -> {        ctx.collect(word);      });    }).returns(Types.STRING);    //3.2 將單詞轉換為一個個的元組    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS        .map(word -> Tuple2.of(word, 1))        .returns(Types.TUPLE(Types.STRING, Types.INT));    // 3.3 按照單詞進行分組    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);    // 3.4 對每組單詞數量進行累加    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS        .timeWindow(Time.seconds(3))        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));    resultDS.print();    env.execute("app");  }}

Flink對Java Lambda表達式支持情況

Flink支持Java API所有操作符使用Lambda表達式。但是,但Lambda表達式使用Java泛型時,就需要聲明類型信息。

我們來看下上述的這段代碼:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {      // 切分單詞      Arrays.stream(line.split(" ")).forEach(word -> {        ctx.collect(word);      });    }).returns(Types.STRING);

之所以這里將所有的類型信息,因為Flink無法正確自動推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的源代碼

@Public@FunctionalInterfacepublic interface FlatMapFunction<T, O> extends Function, Serializable {  /**  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms  * it into zero, one, or more elements.  *  * @param value The input value.  * @param out The collector for returning result values.  *  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation  *          to fail and may trigger recovery.  */  void flatMap(T value, Collector<O> out) throws Exception;}

我們發現 flatMap的第二個參數是Collector<O>,是一個帶參數的泛型。Java編譯器編譯該代碼時會進行參數類型擦除,所以Java編譯器會變成成:

void flatMap(T value, Collector out)

這種情況,Flink將無法自動推斷類型信息。如果我們沒有顯示地提供類型信息,將會出現以下錯誤:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.  Otherwise the type has to be specified explicitly using type information.

這種情況下,必須要顯示指定類型信息,否則輸出將返回值視為Object類型,這將導致Flink無法正確序列化。

所以,我們需要顯示地指定Lambda表達式的參數類型信息,并通過returns方法顯示指定輸出的類型信息

我們再看一段代碼:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS        .map(word -> Tuple2.of(word, 1))        .returns(Types.TUPLE(Types.STRING, Types.INT));

為什么map后面也需要指定類型呢?

因為此處map返回的是Tuple2類型,Tuple2是帶有泛型參數,在編譯的時候同樣會被查出泛型參數信息,導致Flink無法正確推斷。

以上是“Javalambda表達式如何實現FlinkWordCount”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

海原县| 临城县| 绵阳市| 同德县| 哈巴河县| 高要市| 海南省| 金山区| 壶关县| 贵南县| 广饶县| 依安县| 湄潭县| 六安市| 嵩明县| 安远县| 柞水县| 陈巴尔虎旗| 攀枝花市| 米易县| 宣武区| 集贤县| 林州市| 绿春县| 聂拉木县| 星座| 任丘市| 科技| 台中市| 秀山| 宜宾市| 揭东县| 靖安县| 陵川县| 简阳市| 平邑县| 尚义县| 长沙县| 涿州市| 常宁市| 林芝县|