中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中的聚合算子是什么

發布時間:2021-12-31 10:37:08 來源:億速云 閱讀:220 作者:iii 欄目:大數據

這篇文章主要講解了“flink中的聚合算子是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“flink中的聚合算子是什么”吧!

前言

flink中的一個接口org.apache.flink.api.common.functions.AggregateFunction,這個類可以接在window流之后,做窗口內的統計計算。

注意:除了這個接口AggregateFunction,flink中還有一個抽象類AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把這個弄混淆了,接口AggregateFunction我們可以理解為flink中的一個算子,和MapFunction、FlatMapFunction等是同級別的,而抽象類AggregateFunction是用于用戶自定義聚合函數的,和max、min之類的函數是同級的。

 

原理解析

比如我們想實現一個類似sql的功能:

select TUMBLE_START(proctime,INTERVAL '2' SECOND)  as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
 

這個sql就是來統計一下每兩秒鐘的滑動窗口內每個人出現的次數,今天我們就以這個簡單的sql的功能為例講解一下flink的aggregate算子,其實就是我們用程序來實現這個sql的功能。

首先看一下聚合函數的接口:


@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
 ACC createAccumulator();
 ACC add(IN value, ACC accumulator);
 ACC merge(ACC a, ACC b);
 OUT getResult(ACC accumulator);
}

 

這個接口AggregateFunction里面有4個方法,我們分別來講解一下。

  1. AggregateFunction這個類是一個泛型類,這里面有三個參數,IN, ACC, OUT。IN就是聚合函數的輸入類型,ACC是存儲中間結果的類型,OUT是聚合函數的輸出類型。
  2. createAccumulator    
    這個方法首先要創建一個累加器,要進行一些初始化的工作,比如我們要進行count計數操作,就要給累加器一個初始值。
  3. add    
    add方法就是我們要做聚合的時候的核心邏輯,比如我們做count累加,其實就是來一個數,然后就加一。    
    類似上面的sql的邏輯,我們在寫業務邏輯的時候,可以這么想,進入這方法數的數據都是屬于某一個用戶的,系統在調用這個方法之前會先進行hash分組,然后不同的用戶會重復調用這個方法。所以這個函數的入參是IN類型,返回值是ACC類型
  4. merge    
    因為flink是一個分布式計算框架,可能計算是分布在很多節點上同時進行的,比如上述的add操作,可能同一個用戶在不同的節點上分別調用了add方法在本地節點對本地的數據進行了聚合操作,但是我們要的是整個結果,整個時候,我們就需要把每個用戶各個節點上的聚合結果merge一下,整個merge方法就是做這個工作的,所以它的入參和出參的類型都是中間結果類型ACC。
  5. getResult    
    這個方法就是將每個用戶最后聚合的結果經過處理之后,按照OUT的類型返回,返回的結果也就是聚合函數的輸出結果了。
 

實例講解 

自定義source

首先我們自定義source生成用戶的信息

 public static class MySource implements SourceFunction<Tuple2<String,Long>>{

  private volatile boolean isRunning = true;

  String userids[] = {
    "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
    "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
    "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
    "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
    "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
  };

  @Override
  public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(10);
    String userid = userids[(int) (Math.random() * (userids.length - 1))];
    ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }
   

自定義聚合函數


 public static class CountAggregate
   implements AggregateFunction<Tuple2<String,Long>,Integer,Integer>{

  @Override
  public Integer createAccumulator(){
   return 0;
  }

  @Override
  public Integer add(Tuple2<String,Long> value, Integer accumulator){
   return ++accumulator;
  }

  @Override
  public Integer getResult(Integer accumulator){
   return accumulator;
  }

  @Override
  public Integer merge(Integer a, Integer b){
   return a + b;
  }
 }
   

自定義結果輸出函數


 /**
  * 這個是為了將聚合結果輸出
  */
 public static class WindowResult
   implements WindowFunction<Integer,Tuple3<String,Date,Integer>,Tuple,TimeWindow>{

  @Override
  public void apply(
    Tuple key,
    TimeWindow window,
    Iterable<Integer> input,
    Collector<Tuple3<String,Date,Integer>> out) throws Exception{

   String k = ((Tuple1<String>) key).f0;
   long windowStart = window.getStart();
   int result = input.iterator().next();
   out.collect(Tuple3.of(k, new Date(windowStart), result));

  }
 }
   

主流程


 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource());

  dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
            .aggregate(new CountAggregate(), new WindowResult()
            ).print();

  env.execute();

感謝各位的閱讀,以上就是“flink中的聚合算子是什么”的內容了,經過本文的學習后,相信大家對flink中的聚合算子是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

深州市| 长汀县| 平潭县| 绥江县| 凤山市| 岳阳市| 阳曲县| 合肥市| 剑川县| 长治市| 绥芬河市| 普安县| 民县| 当阳市| 清镇市| 庆阳市| 博乐市| 襄城县| 宜章县| 雷州市| 广东省| 余江县| 长治市| 五家渠市| 杂多县| 周至县| 兴宁市| 鱼台县| 鄯善县| 河南省| 四子王旗| 广南县| 瑞昌市| 陕西省| 桐庐县| 威信县| 凤阳县| 衡阳县| 葵青区| 锡林浩特市| 新宁县|