中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink Aggregate怎么用

發布時間:2021-12-31 10:21:24 來源:億速云 閱讀:484 作者:iii 欄目:大數據

本篇內容主要講解“Flink  Aggregate怎么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Flink  Aggregate怎么用”吧!

Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中)

示例環境

java.version: 1.8.x
flink.version: 1.11.1

Aggregate.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;

/**
 * @Description Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中)
 */
public class Aggregate {

    /**
     * 遍歷集合,分別打印不同性別的總人數與平均值
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Tuple3<姓名,性別(man男,girl女),年齡>
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        DataStream<MyAverageAccumulator> dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //按數量窗口滾動,每3個輸入窗口數據流,計算一次
                .countWindow(3)
                //只能基于Windowed窗口Stream進行調用
                .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, MyAverageAccumulator, MyAverageAccumulator>() {
                    /**
                     * 創建新累加器,開始聚合計算
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator createAccumulator() {
                        return new MyAverageAccumulator();
                    }

                    /**
                     * 將窗口輸入的數據流值添加到窗口累加器,并返回新的累加器值
                     * @param tuple3
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator add(Tuple3<String, String, Integer> tuple3, MyAverageAccumulator accumulator) {
                        System.out.println("tuple3:" + tuple3.toString());
                        accumulator.setGender(tuple3.f1);
                        //此accumulator保含個數統計和值累計兩個屬性,add方法內會計算窗口內總數與求和
                        accumulator.add(tuple3.f2);
                        return accumulator;
                    }

                    /**
                     * 獲取累加器聚合結果
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) {
                        return accumulator;
                    }

                    /**
                     * 合并兩個累加器,返回合并后的累加器的狀態
                     * @param a
                     * @param b
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) {
                        a.merge(b);
                        return a;
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 添加性別屬性(此類用于顯示不同性別的平均值)
     */
    public static class MyAverageAccumulator extends AverageAccumulator{
        private String gender;
        public String getGender() {
            return gender;
        }
        public void setGender(String gender) {
            this.gender = gender;
        }
        @Override
        public String toString() {
            //繼承父類的this.getLocalValue()方法用于計算并返回平均值
            return super.toString() + ", gender to " + gender;
        }
    }

}

打印結果

tuple3:(張三,man,20)
tuple3:(李四,girl,24)
tuple3:(劉六,girl,32)
tuple3:(王五,man,29)
tuple3:(伍七,girl,18)
tuple3:(吳八,man,30)
4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl
2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man

到此,相信大家對“Flink  Aggregate怎么用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

探索| 惠东县| 高唐县| 太白县| 富顺县| 繁昌县| 莫力| 永胜县| 开平市| 黄浦区| 宣汉县| 绥芬河市| 广安市| 运城市| 阳谷县| 尼木县| 安达市| 开江县| 永年县| 寿宁县| 广东省| 南溪县| 绥德县| 台东县| 曲周县| 灵丘县| 芦溪县| 郁南县| 博爱县| 揭西县| 会泽县| 昌吉市| 新竹市| 司法| 工布江达县| 公主岭市| 突泉县| 图们市| 柳江县| 邯郸市| 天津市|