中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中新的水印策略是什么

發布時間:2021-12-31 10:32:13 來源:億速云 閱讀:134 作者:iii 欄目:大數據

這篇文章主要介紹“flink中新的水印策略是什么”,在日常操作中,相信很多人在flink中新的水印策略是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”flink中新的水印策略是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

背景

在flink 1.11之前的版本中,提供了兩種生成水印(Watermark)的策略,分別是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,這兩個接口都繼承自TimestampAssigner接口。

用戶想使用不同的水印生成方式,則需要實現不同的接口,但是這樣引發了一個問題,對于想給水印添加一些通用的、公共的功能則變得復雜,因為我們需要給這兩個接口都同時添加新的功能,這樣還造成了代碼的重復。

所以為了避免代碼的重復,在flink 1.11 中對flink的水印生成接口進行了重構,

新的水印生成接口

當我們構建了一個DataStream之后,使用assignTimestampsAndWatermarks方法來構造水印,新的接口需要傳入一個WatermarkStrategy對象。

DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
 

WatermarkStrategy 這個接口是做什么的呢?這里面提供了很多靜態的方法和帶有缺省實現的方法,只有一個方法是非default和沒有缺省實現的,就是下面的這個方法。

 /**
  * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
  */
 @Override
 WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
 

所以默認情況下,我們只需要實現這個方法就行了,這個方法主要是返回一個 WatermarkGenerator,我們在進入這里邊看看。

@Public
public interface WatermarkGenerator<T> {

 /**
  * Called for every event, allows the watermark generator to examine and remember the
  * event timestamps, or to emit a watermark based on the event itself.
  */
 void onEvent(T event, long eventTimestamp, WatermarkOutput output);

 /**
  * Called periodically, and might emit a new watermark, or not.
  *
  * <p>The interval in which this method is called and Watermarks are generated
  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
  */
 void onPeriodicEmit(WatermarkOutput output);
}
 

這個方法簡單明了,主要是有兩個方法:

  • onEvent :每個元素都會調用這個方法,如果我們想依賴每個元素生成一個水印,然后發射到下游(可選,就是看是否用output來收集水印),我們可以實現這個方法.
  • onPeriodicEmit : 如果數據量比較大的時候,我們每條數據都生成一個水印的話,會影響性能,所以這里還有一個周期性生成水印的方法。這個水印的生成周期可以這樣設置:env.getConfig().setAutoWatermarkInterval(5000L);

我們自己實現一個簡單的周期性的發射水印的例子:

在這個onEvent方法里,我們從每個元素里抽取了一個時間字段,但是我們并沒有生成水印發射給下游,而是自己保存了在一個變量里,在onPeriodicEmit方法里,使用最大的日志時間減去我們想要的延遲時間作為水印發射給下游。

  DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
    new WatermarkStrategy<Tuple2<String,Long>>(){
     @Override
     public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
       WatermarkGeneratorSupplier.Context context){
      return new WatermarkGenerator<Tuple2<String,Long>>(){
       private long maxTimestamp;
       private long delay = 3000;
       @Override
       public void onEvent(
         Tuple2<String,Long> event,
         long eventTimestamp,
         WatermarkOutput output){
        maxTimestamp = Math.max(maxTimestamp, event.f1);
       }
       @Override
       public void onPeriodicEmit(WatermarkOutput output){
        output.emitWatermark(new Watermark(maxTimestamp - delay));
       }
      };
     }
    });
     

內置水印生成策略

為了方便開發,flink提供了一些內置的水印生成方法供我們使用。 

固定延遲生成水印

通過靜態方法forBoundedOutOfOrderness提供,入參接收一個Duration類型的時間間隔,也就是我們可以接受的最大的延遲時間.使用這種延遲策略的時候需要我們對數據的延遲時間有一個大概的預估判斷。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

 

我們實現一個延遲3秒的固定延遲水印,可以這樣做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

 

他的底層使用的WatermarkGenerator接口的一個實現類BoundedOutOfOrdernessWatermarks。我們看下源碼中的這兩個方法,是不是和我們上面自己寫的很像.

 @Override
 public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
 }

 @Override
 public void onPeriodicEmit(WatermarkOutput output) {
  output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
 }
   

單調遞增生成水印

通過靜態方法forMonotonousTimestamps來提供.

WatermarkStrategy.forMonotonousTimestamps()
 

這個也就是相當于上述的延遲策略去掉了延遲時間,以event中的時間戳充當了水印。

在程序中可以這樣使用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

 

它的底層實現是AscendingTimestampsWatermarks,其實它就是BoundedOutOfOrdernessWatermarks類的一個子類,沒有了延遲時間,我們來看看具體源碼的實現.

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

 /**
  * Creates a new watermark generator with for ascending timestamps.
  */
 public AscendingTimestampsWatermarks() {
  super(Duration.ofMillis(0));
 }
}
   

event時間的獲取

上述我們講了flink自帶的兩種水印生成策略,但是對于我們使用eventtime語義的時候,我們想從我們的自己的數據中抽取eventtime,這個就需要TimestampAssigner了.

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {

    ............
    
 long extractTimestamp(T element, long recordTimestamp);
}

 

使用的時候我們主要就是從我們自己的元素element中提取我們想要的eventtime。

使用flink自帶的水印策略和eventtime抽取類,可以這樣用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(
    WatermarkStrategy
      .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
      .withTimestampAssigner((event, timestamp)->event.f1));
   

處理空閑數據源

在某些情況下,由于數據產生的比較少,導致一段時間內沒有數據產生,進而就沒有水印的生成,導致下游依賴水印的一些操作就會出現問題,比如某一個算子的上游有多個算子,這種情況下,水印是取其上游兩個算子的較小值,如果上游某一個算子因為缺少數據遲遲沒有生成水印,就會出現eventtime傾斜問題,導致下游沒法觸發計算。

所以filnk通過WatermarkStrategy.withIdleness()方法允許用戶在配置的時間內(即超時時間內)沒有記錄到達時將一個流標記為空閑。這樣就意味著下游的數據不需要等待水印的到來。

當下次有水印生成并發射到下游的時候,這個數據流重新變成活躍狀態。

通過下面的代碼來實現對于空閑數據流的處理

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

到此,關于“flink中新的水印策略是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

伊宁县| 阜康市| 玉树县| 黄龙县| 山丹县| 方城县| 无极县| 吉木乃县| 嵊州市| 青海省| 平罗县| 玉溪市| 彩票| 富平县| 大宁县| 泰安市| 桑日县| 横山县| 历史| 乡宁县| 大连市| 霍林郭勒市| 长葛市| 涟水县| 海盐县| 彩票| 读书| 治多县| 建瓯市| 闻喜县| 虹口区| 额济纳旗| 新田县| 义乌市| 白水县| 大足县| 辽宁省| 万山特区| 上杭县| 云阳县| 罗平县|