您好,登錄后才能下訂單哦!
本篇文章為大家展示了什么是Flink windows和Time操作,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
在Flink中常用的Time類型:
處理時間
攝取時間
事件時間
是上圖中,最后一步的處理時間,表示服務器中執行相關操作的處理時間。例如一些算子操作時間,在服務器上面的時間。
如果你以處理時間作為流處理的時間處理方式,那么所有的基于時間的操作都會使用服務器的時間,來運行相關的操作。例如:一個小時的處理時間窗口,將會包含一個小時內的到達服務器內的所有數據。例如應用程序9:15am開始執行,第一個小時的時間處理窗口會包含所有的9:15到10:15內的事件數據,下一個時間窗口是10:15到11:15內的所有數據。
處理時間是最簡單的事件處理方式,并不需要流和機器的時間協調。因此提供了高性能和低延遲。然而在分布式環境中或者異步環境中處理時間并不能夠提供準確性(也就是說在處理數據時,由于網絡的抖動在一個處理時間窗口中例如9:15到10:15,很大可能包括9:00的事件數據)。
事件時間是每一個設備上每一個單獨事件發生的時間例如手機登錄APP的日志時間。這個時間就是這條數據記錄的時間。每一條數據都有一個時間戳表示這條數據的事件發生時間。這個時間取決于每條數據,而并不會依賴于機器的時間。事件時間處理時必須指定如何獲得Event Time watermarks(用來描述Event Time如何處理)。
按照事件時間處理數據,處理結果應該是完全一致,也就是說無論處理多少次結果都是一樣的,這就是所謂的大數據處理的冪等性。 不管事件到達時間和事件是不是有序到達(在生產環境中,數據往往進入到服務器中的時間和順序是不一定的,有可能先產生的數據后到達服務器,這取決于很多網絡因素)
攝取時間表示某個事件數據進入到Flink的時間。在source操作中,每條記錄都會得到source的當前時間戳,也就是接收到的數據自動會有一個攝取時間,也就是例如時間窗都是基于這個時間來處理的。
攝取時間是處于事件時間和處理時間之間。如上圖所示。攝取時間是有成本的,但是卻是結果可預測的。因為攝取時間使用了穩定的時間戳(在source端只會分配一次),每一條數據的時間戳都是固定的。并且同一攝取時間的數據有可能被分配到不同的處理時間窗口中。
Windows使我們處理無限數據流(源源不斷的進來)的核心部件。Windows把我們的數據流拆成一個個的buckets。我們需要把算子作用到buckets上面去。
第一件事情就是需要指定我們的流數據是不是有key,有key和沒有key對應的算子是完全不一樣的。
帶keyby,會結合windows一起使用。輸入的數據內容中的任意屬性都可以作為一個key。在這個流上可以允許窗口多任務并行計算,每一個邏輯key都可以被獨立計算,相同的key的數據會被發送到相同的并行任務中去處理。
通過使用windowAll來指定。原始的數據流不會被拆分成多個邏輯任務,所有窗口邏輯都是一個窗口任務來執行,所以并行度是1。
簡而言之,當第一個元素到達對應的窗口時,一個windows就會被開始創建。當時間(不管是event時間還是processing時間)達到時間戳范圍,就會移除窗口。另外,每一個窗口都有一個Trigger和window Functions,當數據到達窗口后,執行的函數就是window Functions,這個函數包含了對這個窗口內容的所有計算,當Trigger達到一定條件之后,就會觸發。
在指定流數據是否帶key之后,下一步就是定義窗口的分配器(windows assigner),windows assigner的職責是定義每一個傳入的元素如何分配到窗口內。對于keyby使用window()方法,對于non-keyby使用windowAll()方法。
A
WindowAssigner
is responsible for assigning each incoming element to one or more windows.
每個傳入的數據分配給一個或多個窗口。
Flink內置的window assigner對于大多數場景來講基本上是夠用的(tumbling windows滾動窗口, sliding windows滑動窗口, session windows會話窗口 and global windows全局窗口)。也可以通過繼承WindowAssigner來自定義一個window assigner。所有的內置window assigner(除了全局窗口)都是基于時間(處理時間或事件時間)來分配數據的。
基于時間的窗口有一個開始的timestamp(inclusive)和結束timestamp(exclusive)表示窗口的大小。
Flink中對于窗口的劃分有兩大類,第一大類是基于time(用的最多),第二大類是基于count。
滾動窗口分配器將分配每一個元素到一個指定大小的窗口,這種類型的窗口有一個固定的大小而且不會有重疊的。上面這張圖就是隨著時間流按照指定的時間間隔拆開。
簡單實例代碼:
object WindosApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1) env.execute("WindosApp") } }
上面的代碼表示監聽socket數據流,每隔5秒獲取一次數據。timeWindow表示根據時間來劃分窗口,(此外還有countWindow根據數量來劃分窗口)。默認時間是processTime處理時間。
public class JavaWindowApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1); env.execute("JavaWindowApp"); } }
滑動窗口分配器分配每一個元素到一個固定大小的窗口,類似于滾動窗口,窗口大小可以通過配置進行修改,但是滑動窗口還有另外一個附加滑動參數控制滑動窗口什么時候啟動,所以這個窗口是有可能重疊的。
上面圖的意思是window1的窗口大小是10分鐘,滑動大小是5分鐘,也就是每隔5分鐘產生一個窗口,這個窗口的大小是10分鐘,這個窗口就是window2,然后window2又過5分鐘產生一個窗口,窗口的大小是10分鐘 window3,以此類推。所以滑動窗口處理的數據可能會有重疊。一個數據元素可能會在多個窗口中進行處理。
使用場景:每個半個小時統計前一個小時的TopN。
object WindosApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")).map((_,1)).keyBy(0) //.timeWindow(Time.seconds(5)) # 滾動窗口 .timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1).print().setParallelism(1) env.execute("WindosApp") } }
每隔5秒統計近10秒的數據。所以當服務器端輸入:
a,a,a,b,b,b a,a,a,b,b,b a,b,a,b,a,a
時,控制臺會打印兩遍結果:
(a,10) (b,8) (b,8) (a,10)
在定義窗口分配器之后,就需要指定基于每一個窗口的計算方法了(在上面的例子中我們做了一個keyby sum操作)。window function會處理窗口中的每一個元素。window function包括如下幾個:
ReduceFunction
AggregationFunction
FoldFunction
ProcessWindowFunction
ReduceFunction和AggregationFunction的執行效率更高,因為Flink會在數據到達每一個窗口時首先做一個增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一個Iterable,是一個全量聚合。因此ProcessWindowFunction的執行效率不高,因為Flink會緩存窗口中的所有數據。
input中的兩個元素進行結合產生一個同樣類型的輸出。這里我們舉例,通過傳入的數據類型是數值類型來演示增量效果。
object WindowReduceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")) .map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5) .keyBy(0) //因為key都是1, 所以所有的元素都到一個task去執行 .timeWindow(Time.seconds(5)) // 滾動窗口 .reduce((v1, v2) => { //// reduce函數作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數據到達之后進行一次性處理,而是數據兩兩處理 println(v1 + "....." + v2) (v1._1, v1._2 + v2._2) }) .print().setParallelism(1) env.execute("WindowReduceApp") } }
服務器端輸入:
1,2,3,4,5
控制臺中輸出如下:
(1,1).....(1,2) (1,3).....(1,3) (1,6).....(1,4) (1,10).....(1,5) (1,15)
reduce函數作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數據到達之后進行一次性處理,而是數據兩兩處理。
public class JavaWindowReduceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token))); } } } }).keyBy(0).timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]"); return new Tuple2<>(value1.f0,value1.f1 + value2.f1); } }) .print().setParallelism(1); env.execute("JavaWindowApp"); } }
輸出結果如下:
value1 = [(1,1)], value2 = [(1,2)] value1 = [(1,3)], value2 = [(1,3)] value1 = [(1,6)], value2 = [(1,4)] value1 = [(1,10)], value2 = [(1,5)] (1,15)
ProcessWindowFunction可以拿到一個Iterable,可以拿到窗口中的所有元素,并且有一個上下文對象可以訪問時間和狀態信息,比reducefunction可以提供更多的功能。但這樣卻可以帶來資源和性能的開銷,因為元素并不能通過增量的方式去聚合,相反,它需要把所有的數據都放在一個buffer中。
public class JavaWindowProcessApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token))); } } } }).keyBy(0).timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() { @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception { System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]"); long count = 0; for(Tuple2<Integer, Integer> in:elements) { count++; } out.collect("window:" + context.window() + "count:" + count); } }) .print().setParallelism(1); env.execute("JavaWindowApp"); } }
服務器輸入:
1,2,3,4,5
控制臺輸出:
tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00] window:TimeWindow{start=1568542160000, end=1568542165000}count:5
只輸出一次,說明是等待所有數據都拿到之后才進行處理。
使用場景:窗口內的數據進行排序。在Reduce中是無法進行排序的。
上述內容就是什么是Flink windows和Time操作,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。