中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink1.2版本時間、水位線的介紹和用法

發布時間:2021-06-29 09:34:21 來源:億速云 閱讀:341 作者:chen 欄目:大數據

本篇內容主要講解“flink1.2版本時間、水位線的介紹和用法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink1.2版本時間、水位線的介紹和用法”吧!

水位線

  • 水位線是flink的一種處理延時數據的機制,主要對設定時間內延時數據的自動容錯,水位線的本質是時間戳,計算公式為:當前事件最大時間值 - 數據延時時間。(看了幾遍有點懵)

  • 個人理解:

    • 水位線是收到數據邏輯時間便簽,是處理延時數據的基礎,通過與數據自帶的生成時間Timestamps,實現延遲數據矯正。

種類

順序事件中的Watermarks

  • 理想狀態下的水位線,即數據元素的事件事件是有序的,Watermark時間戳會隨著數據元素的事件時間安裝順序生成,此時,水位線時間和時間時間保持一致。

亂序事件中的Watermarks

  • 現實情況數據元素往往并不按照其生產順序接入Flink,而頻繁處理亂序或遲到情況,這時候需要watermark來處理,當事件8和事件11同時進入系統,flink系統將根據設定延時值分別計算它們的watermark,兩個事件到達一個operator中后,匹配事件時間的虛擬時間與watermark匹配,觸發響應的計算。

并行數據流中的Watermarks

  • Watermark在Source Operator中生成,且在每個Operator的子Task中獨立生成。

  • 如果一個watermark同時更新一個算子Task的當前事件時間,Flink會選擇最小的水位線進行更新。當一個Window算子Task中水位線大于Window結束時間,立即觸發窗口計算。

時間概念

  • 流式處理中最大的特點是數據上具有時間的屬性特征,Flink根據時間產生的位置不同,將時間分為三種概念:事件生成時間(Event Time)、事件接入時間(Ingestion TIme)、事件處理時間(Processing Time)。

    • 事件生成時間:數據從終端或系統中產生的過程消耗的時間。

    • 數據接入時間:數據接入DataSource時的時間。

    • 事件處理時間:處理過程中獲取的主機時間。

Event Time

  • Timestamps和Watermark成對對存在,使用時,都要指定

watermark
  • watermark設定Flink中Watermark默認200ms生成一次,也可以手動指定,代碼如下:

// 1、創建flink運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 設置并行度
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);  //處理模式設定:流或批

// 生成 watermark 的時間間隔(每 n 毫秒),設置周期性的產生水位線的時間間隔。當數據流很大的時候,如果每個事件都產生水位線,會影響性能。
//env.getConfig().setAutoWatermarkInterval(1000); // 自動水印時間間隔 12版本不用設置,有默認

指定Timestamps

  • 此處以滾動窗口為例,窗口知識下次分享,首先對數據進行機構化,數據結構:"yyyy-MM-dd HH:mm:ss|type|num",處理代碼如下:

SingleOutputStreamOperator<Tuple3<String,String, Integer>> formatData =text.map(new MapFunction<String, Tuple3<String, String, Integer>>() { 
  // 數據格式轉換
  private static final long serialVersionUID = 1L;
  @Override
  public Tuple3<String, String, Integer> map(String value) throws Exception {
    Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
    String[] dataTmp = value.split("\\|");

    data.f0 = dataTmp[0];
    data.f1 = dataTmp[1];
    data.f2 = Integer.parseInt(dataTmp[2]);
    return data;
  }
});
  • 設置Timestamps和最大時延

SingleOutputStreamOperator<Tuple3<String,String, Integer>> orderDSWithWatemark=formatData
    .assignTimestampsAndWatermarks( // 設置watermark  watemark = 最大事件時間 - 最大延遲或亂序時間
    WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大無序度時間即最大延遲時間/亂序時間
    .withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000)  //時間為毫秒級

);
  • 設定窗口大小和處理邏輯

SingleOutputStreamOperator<Tuple3<String,String, Integer>> result=orderDSWithWatemark.keyBy(one -> one.f1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 設定窗口大小
//		.allowedLateness(Time.seconds(1))  //延時處理時間
//		.sideOutputLateData(lateOutputTag)   //側輸出
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { // 處理邏輯
    private static final long serialVersionUID = -6695049408336015245L;

    @Override
    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1,
        Tuple3<String, String, Integer> value2) throws Exception {
      Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
      data.f0 = value2.f0;
      data.f1 = value1.f1;
      data.f2 = value1.f2 + value2.f2;
      System.out.println(data);
      return data;
    }
  });
result.print("滾動事件時間");
env.execute();

總結

  • 時間和水位線是flink中比較難理解且重要的概念,我也是一知半解,在使用的過程中再慢慢深化,基本邏輯是針對數據建立自己的時間標簽,并通過時間范圍(窗口)和數據延遲完成事件內數據的匯集、計算和輸出,以此,完成更精確的實時事件數據計算。

  • 技術是需求的一種呈現,基礎本質相互交疊,編程語言、技術框架都是,最重要的細微處的優化和整體的使用的簡便,功能的穩定和強大。

到此,相信大家對“flink1.2版本時間、水位線的介紹和用法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

贵州省| 定结县| 高平市| 巴中市| 白朗县| 扶风县| 始兴县| 诸城市| 清新县| 朝阳县| 子洲县| 安乡县| 青铜峡市| 波密县| 南平市| 望都县| 宁明县| 房产| 巩义市| 瓮安县| 馆陶县| 星子县| 定边县| 麦盖提县| 武平县| 孟州市| 垫江县| 万盛区| 田东县| 浦县| 青阳县| 南城县| 肇东市| 鄂托克前旗| 涟源市| 梁山县| 凌源市| 酉阳| 黔江区| 溧水县| 香格里拉县|