中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink的bulkIteration迭代操作怎么實現

發布時間:2021-12-31 13:38:02 來源:億速云 閱讀:125 作者:iii 欄目:大數據

本篇內容介紹了“Flink的bulkIteration迭代操作怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。為了從大數據中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。大數據處理框架很多,比如spark,mr。實際上這些實現迭代計算都是很困難的。

Flink神奇之處就是它直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,然后將其嵌入到迭代算子中去。有兩種迭代操作算子:Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調用step函數。

本小節是主要是講解理論。

迭代操作算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),然后計算得到下輪迭代的輸出(例如,map,reduce,join等)

1.迭代輸入(Iteration Input)

第一次迭代的初始輸入,可能來源于數據源或者先前的操作算子。

2. Step函數

每次迭代都會執行step函數。其是由map,reduce,join等算子組成的數據流,根據業務定制的。

3. 下次迭代的部分結果(Next Partial Solution):

每次迭代,step函數的輸出結果會有部分返回參與繼續迭代。

4. 最大迭代次數

如果沒有其他終止條件,就會在聚合次數達到該值的情況下終止。

5. 自定義聚合器收斂:

迭代允許指定自定義聚合器和收斂標準,如sum會聚合要發出的記錄數(聚合器),如果此數字為零則終止(收斂標準)。

案例:累加計數

這個例子主要是給定數據輸入,每次增加一,輸出結果。

Flink的bulkIteration迭代操作怎么實現

  1. 迭代輸入:輸入是1-5的數字。

  2. step函數:給數字加一操作。

  3. 部分結果:實際上就是一個map函數。

  4. 迭代結果:最大迭代次數是十次,所以最終輸出是11-15.

Flink的bulkIteration迭代操作怎么實現

代碼操作

編程的時候,本文說的這種迭代方式叫做bulk Iteration,需要調用iterate(int),該函數返回的是一個IterativeDataSet,當然我們可以對他進行一些操作,比如map等。Iterate函數唯一的參數是代表最大迭代次數。

迭代是一個環有前面的圖可以看到,我們需要進行閉環操作,那么這時候就要用到closeWith(Dataset)操作了,參數就是需要循環迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數后終止。

下面就是通過迭代計算pi的例子。

package Streaming.iteration;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;


public class IteratePi {

  public static voidmain(String[] args) throws Exception{
     final ExecutionEnvironmentenv = ExecutionEnvironment.getExecutionEnvironment();
     // Create initialIterativeDataSet
     IterativeDataSet<Integer> initial= env.fromElements(0).iterate(100);

     DataSet<Integer> iteration= initial.map(new MapFunction<Integer, Integer>(){
        @Override
        public Integermap(Integer i) throws Exception{
           double x = Math.random();
           double y = Math.random();

           return i + ((x * x + y * y < 1) ? 1 : 0);
        }
     });


     // Iterativelytransform the IterativeDataSet
     DataSet<Integer> count = initial.closeWith(iteration);

     count.map(new MapFunction<Integer, Double>(){
        @Override
        public Double map(Integercount) throws Exception {
           return count /(double) 10000 * 4;
        }
     }).print();

     // execute theprogram
     env.execute("IterativePi Example");
  }

}

“Flink的bulkIteration迭代操作怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金山区| 搜索| 千阳县| 龙泉市| 丹江口市| 蒲城县| 邮箱| 本溪市| 如东县| 开鲁县| 环江| 中超| 鹤峰县| 台南市| 宁乡县| 海伦市| 全州县| 稷山县| 呈贡县| 锦州市| 南宁市| 迁西县| 崇阳县| 吉隆县| 宣化县| 商丘市| 聂荣县| 祥云县| 措美县| 游戏| 中宁县| 庐江县| 建宁县| 宿州市| 铁岭县| 江华| 双江| 奇台县| 都匀市| 新津县| 喜德县|