中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Flink CEP的示例分析

發布時間:2021-12-16 14:46:17 來源:億速云 閱讀:243 作者:小新 欄目:云計算

這篇文章主要為大家展示了“Apache Flink CEP的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Apache Flink CEP的示例分析”這篇文章吧。

1、Flink CEP 概念以及使用場景

1.什么是 CEP

CEP 的意思是復雜事件處理,例如:起床-->洗漱-->吃飯-->上班等一系列串聯起來的事件流形成的模式稱為 CEP。如果發現某一次起床后沒有刷牙洗臉亦或是吃飯就直接上班,就可以把這種非正常的事件流匹配出來進行分析,看看今天是不是起晚了。

下圖中列出了幾個例子:

Apache Flink CEP的示例分析

  • 第一個是異常行為檢測的例子:假設車輛維修的場景中,當一輛車出現故障時,這輛車會被送往維修點維修,然后被重新投放到市場運行。如果這輛車被投放到市場之后還未被使用就又被報障了,那么就有可能之前的維修是無效的。

  • 第二個是策略營銷的例子:假設打車的場景中,用戶在 APP 上規劃了一個行程訂單,如果這個行程在下單之后超過一定的時間還沒有被司機接單的話,那么就需要將這個訂單輸出到下游做相關的策略調整。 

  • 第三個是運維監控的例子:通常運維會監控服務器的 CPU、網絡 IO 等指標超過閾值時產生相應的告警。但是在實際使用中,后臺服務的重啟、網絡抖動等情況都會造成瞬間的流量毛刺,對非關鍵鏈路可以忽略這些毛刺而只對頻繁發生的異常進行告警以減少誤報。

2.Flink CEP 應用場景

  • 風險控制:對用戶異常行為模式進行實時檢測,當一個用戶發生了不該發生的行為,判定這個用戶是不是有違規操作的嫌疑。

  • 策略營銷:用預先定義好的規則對用戶的行為軌跡進行實時跟蹤,對行為軌跡匹配預定義規則的用戶實時發送相應策略的推廣。

  • 運維監控:靈活配置多指標、多依賴來實現更復雜的監控模式。

3.Flink CEP 原理

Apache Flink CEP的示例分析

Flink CEP 內部是用 NFA(非確定有限自動機)來實現的,由點和邊組成的一個狀態圖,以一個初始狀態作為起點,經過一系列的中間狀態,達到終態。點分為起始狀態、中間狀態、最終狀態三種,邊分為 take、ignore、proceed 三種。 

  • take:必須存在一個條件判斷,當到來的消息滿足 take 邊條件判斷時,把這個消息放入結果集,將狀態轉移到下一狀態。

  • ignore:當消息到來時,可以忽略這個消息,將狀態自旋在當前不變,是一個自己到自己的狀態轉移。 

  • proceed:又叫做狀態的空轉移,當前狀態可以不依賴于消息到來而直接轉移到下一狀態。舉個例子,當用戶購買商品時,如果購買前有一個咨詢客服的行為,需要把咨詢客服行為和購買行為兩個消息一起放到結果集中向下游輸出;如果購買前沒有咨詢客服的行為,只需把購買行為放到結果集中向下游輸出就可以了。 也就是說,如果有咨詢客服的行為,就存在咨詢客服狀態的上的消息保存,如果沒有咨詢客服的行為,就不存在咨詢客服狀態的上的消息保存,咨詢客服狀態是由一條 proceed 邊和下游的購買狀態相連。

下面以一個打車的例子來展示狀態是如何流轉的,規則見下圖所示。

Apache Flink CEP的示例分析

以乘客制定行程作為開始,匹配乘客的下單事件,如果這個訂單超時還沒有被司機接單的話,就把行程事件和下單事件作為結果集往下游輸出。

假如消息到來順序為:行程-->其他-->下單-->其他。

狀態流轉如下:

(1)開始時狀態處于行程狀態,即等待用戶制定行程。 

Apache Flink CEP的示例分析

(2)當收到行程事件時,匹配行程狀態的條件,把行程事件放到結果集中,通過 take 邊將狀態往下轉移到下單狀態。 

Apache Flink CEP的示例分析

(3)由于下單狀態上有一條 ignore 邊,所以可以忽略收到的其他事件,直到收到下單事件時將其匹配,放入結果集中,并且將當前狀態往下轉移到超時未接單狀態。這時候結果集當中有兩個事件:制定行程事件和下單事件。  

Apache Flink CEP的示例分析Apache Flink CEP的示例分析

(4)超時未接單狀態時,如果來了一些其他事件,同樣可以被 ignore 邊忽略,直到超時事件的觸發,將狀態往下轉移到最終狀態,這時候整個模式匹配成功,最終將結果集中的制定行程事件和下單事件輸出到下游。

Apache Flink CEP的示例分析

上面是一個匹配成功的例子,如果是不成功的例子會怎么樣?

假如當狀態處于超時未接單狀態時,收到了一個接單事件,那么就不符合超時未被接單的觸發條件,此時整個模式匹配失敗,之前放入結果集中的行程事件和下單事件會被清理。

Apache Flink CEP的示例分析

當收到行程事件時,匹配行程狀態的條件,把行程事件放到結果集中,通過 take 邊將狀態往下轉移到下單狀態。 

2、Flink CEP 程序開發

本節將詳細介紹 Flink CEP 的程序結構以及 API。

1.Flink CEP 程序結構

主要分為兩部分:定義事件模式和匹配結果處理。

官方示例如下:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void select(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });

程序結構分為三部分:首先需要定義一個模式(Pattern),即第 2 行代碼所示,接著把定義好的模式綁定在 DataStream 上(第 25 行),最后就可以在具有 CEP 功能的 DataStream 上將匹配的結果進行處理(第 27 行)。下面對關鍵部分做詳細講解:

  • 定義模式:上面示例中,分為了三步,首先匹配一個 ID 為 42 的事件,接著匹配一個體積大于等于 10 的事件,最后等待收到一個 name 等于 end 的事件。 

  • 匹配結果輸出:此部分,需要重點注意 select 函數(第 30 行,注:本文基于 Flink 1.7 版本)里邊的 Map 類型的 pattern 參數,Key 是一個 pattern 的 name,它的取值是模式定義中的 Begin 節點 start,或者是接下來 next 里面的 middle,或者是第三個步驟的 end。后面的 map 中的 value 是每一步發生的匹配事件。因在每一步中是可以使用循環屬性的,可以匹配發生多次,所以 map 中的 value 是匹配發生多次的所有事件的一個集合。

2.Flink CEP 構成

Apache Flink CEP的示例分析

上圖中,藍色方框代表的是一個個單獨的模式;淺黃色的橢圓代表的是這個模式上可以添加的屬性,包括模式可以發生的循環次數,或者這個模式是貪婪的還是可選的;橘色的橢圓代表的是模式間的關系,定義了多個模式之間是怎么樣串聯起來的。通過定義模式,添加相應的屬性,將多個模式串聯起來三步,就可以構成了一個完整的 Flink CEP 程序。

2.1 定義模式

下面是示例代碼:

pattern.next("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
)

定義模式主要有如下 5 個部分組成:

  • pattern:前一個模式

  • next/followedBy/...:開始一個新的模式

  • start:模式名稱

  • where:模式的內容

  • filter:核心處理邏輯

2.2 模式的屬性

接下來介紹一下怎樣設置模式的屬性。模式的屬性主要分為循環屬性和可選屬性。

  • 循環屬性可以定義模式匹配發生固定次數(times),匹配發生一次以上(oneOrMore),匹配發生多次以上(timesOrMore)。

  • 可選屬性可以設置模式是貪婪的(greedy),即匹配最長的串,或設置為可選的(optional),有則匹配,無則忽略。

2.3 模式的有效期

由于模式的匹配事件存放在狀態中進行管理,所以需要設置一個全局的有效期(within)。若不指定有效期,匹配事件會一直保存在狀態中不會被清除。至于有效期能開多大,要依據具體使用場景和數據量來衡量,關鍵要看匹配的事件有多少,隨著匹配的事件增多,新到達的消息遍歷之前的匹配事件會增加 CPU、內存的消耗,并且隨著狀態變大,數據傾斜也會越來越嚴重。

2.4 模式間的聯系

主要分為三種:嚴格連續性(next/notNext),寬松連續性(followedBy/notFollowedBy),和非確定寬松連續性(followedByAny)。

三種模式匹配的差別見下表所示:

模式&數據流嚴格連續性寬松連續性非確定寬松連續性

Pattern(A B) Streaming('a','c','b1','b2')

不匹配

匹配 輸出:a,b1

匹配 輸出:a,b1 a,b2

總結如下:

  • 嚴格連續性:需要消息的順序到達與模式完全一致。

  • 寬松連續性:允許忽略不匹配的事件。

  • 非確定寬松連性:不僅可以忽略不匹配的事件,也可以忽略已經匹配的事件。

2.5 多模式組合

除了前面提到的模式定義和模式間的聯系,還可以把相連的多個模式組合在一起看成一個模式組,類似于視圖,可以在這個模式視圖上進行相關操作。

Apache Flink CEP的示例分析

上圖這個例子里面,首先匹配了一個登錄事件,然后接下來匹配瀏覽,下單,購買這三個事件反復發生三次的用戶。 

如果沒有模式組的話,代碼里面瀏覽,下單,購買要寫三次。有了模式組,只需把瀏覽,下單,購買這三個事件當做一個模式組,把相應的屬性加上 times(3) 就可以了。

2.6 處理結果

處理匹配的結果主要有四個接口:PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction 和 PatternTimeoutFunction。

從名字上可以看出,輸出可以分為兩類:select 和 flatSelect 指定輸出一條還是多條,timeoutFunction 和不帶 timeout 的 Function 指定可不可以對超時事件進行旁路輸出。 

下圖是輸出的綜合示例代碼:

Apache Flink CEP的示例分析

2.7 狀態存儲優化

當一個事件到來時,如果這個事件同時符合多個輸出的結果集,那么這個事件是如何保存的?

Flink CEP 通過 Dewey 計數法在多個結果集中共享同一個事件副本,以實現對事件副本進行資源共享。 

Apache Flink CEP的示例分析

3、Flink CEP 的擴展

本章主要介紹一些 Flink CEP 的擴展,講述如何做到超時機制的精確管理,以及規則的動態加載與更新。

1.超時觸發機制擴展

原生 Flink CEP 中超時觸發的功能可以通過 within+outputtag 結合來實現,但是在復雜的場景下處理存在問題,如下圖所示,在下單事件后還有一個預付款事件,想要得到下單并且預付款后超時未被接單的訂單,該如何表示呢? 

Apache Flink CEP的示例分析

參照下單后超時未被接單的做法,把下單并且預付款后超時未被接單規則表示為下單.followedBy(預付款).followedBy(接單).within(time),那么這樣實現會存在問題嗎?

這種做法的計算結果是會存在臟數據的,因為這個規則不僅匹配到了下單并且預付款后超時未被接單的訂單(想要的結果),同樣還匹配到了只有下單行為后超時未被接單的訂單(臟數據,沒有預付款)。原因是因為超時 within 是控制在整個規則上,而不是某一個狀態節點上,所以不論當前的狀態是處在哪個狀態節點,超時后都會被旁路輸出。

那么就需要考慮能否通過時間來直接對狀態轉移做到精確的控制,而不是通過規則超時這種曲線救國的方式。于是乎,在通過消息觸發狀態的轉移之外,需要增加通過時間觸發狀態的轉移支持。要實現此功能,需要在原來的狀態以及狀態轉移中,增加時間屬性的概念。如下圖所示,通過 wait 算子來得到 waiting 狀態,然后在 waiting 狀態上設置一個十秒的時間屬性以定義一個十秒的時間窗口。

Apache Flink CEP的示例分析

wait 算子對應 NFA 中的 ignore 狀態,將在沒有到達時間窗口結束時間時自旋,在 ComputationState 中記錄 wait 的開始時間,在 NFA 的 doProcess 中,將到來的數據與waiting 狀態處理,如果到了 waiting 的結束時間,則進行狀態轉移。

Apache Flink CEP的示例分析

上圖中紅色方框中為 waiting 狀態設置了兩條 ignore 邊:

  1. waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition 中的邏輯是獲取當前的時間(支持事件時間),判斷有沒有超過設置的 waiting 閾值,如果超過就把狀態向后轉移。

  2. waitingStatus.addIgnore(waitingCondition),waitingCondition 中如果未達到設置的 waiting 閾值,就會自旋在當前的 waiting 狀態不變。

2.規則動態注入

線上運行的 CEP 中肯定經常遇到規則變更的情況,如果每次變更時都將任務重啟、重新發布是非常不優雅的。尤其在營銷或者風控這種對實時性要求比較高的場景,如果規則窗口過長(一兩個星期),狀態過大,就會導致重啟時間延長,期間就會造成一些想要處理的異常行為不能及時發現。

那么要怎么樣做到規則的動態更新和加載呢?

Apache Flink CEP的示例分析

梳理一下整體架構,Flink CEP 是運行在 Flink Job 里的,而規則庫是放在外部存儲中的。首先,需要在運行的 Job 中能及時發現外部存儲中規則的變化,即需要在 Job 中提供訪問外部庫的能力。其次,需要將規則庫中變更的規則動態加載到 CEP 中,即把外部規則的描述解析成 Flink CEP 所能識別的 pattern 結構體。最后,把生成的 pattern 轉化成 NFA,替換歷史 NFA,這樣對新到來的消息,就會使用新的規則進行匹配。

下圖就是一個支持將外部規則動態注入、更新的接口。 

Apache Flink CEP的示例分析

這個接口里面主要實現了四個方法:

  • initialize:初始化方法,進行外部庫連接的初始化。

  • inject:和外部數據庫交互的主要方法,監聽外部庫變化,獲取最新的規則并通過 Groovy 動態加載,返回 pattern。

  • getPeriod:設置輪巡周期,在一些比較簡單的實時性要求不高的場景,可以采用輪巡的方式,定期對外部數據庫進行檢測。

  • getNfaKeySelector:和動態更新無關,用來支持一個流對應多個規則組。

3.歷史匹配結果清理

新規則動態加載到 Flink CEP 的 Job 中,替換掉原來的 NFA 之后,還需要對歷史匹配的結果集進行清理。在 AbstractKeyedCEPPatternOperator 中實現刷新 NFA,注意,歷史狀態是否需要清理和業務相關:

  1. 修改的邏輯對規則中事件的匹配沒有影響,保留歷史結果集中的狀態。

  2. 修改的邏輯影響到了之前匹配的部分,需要將之前匹配的結果集中的狀態數據清除,防止錯誤的輸出。

Apache Flink CEP的示例分析

以上是“Apache Flink CEP的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

吴江市| 黄石市| 唐海县| 武平县| 如皋市| 武强县| 霍林郭勒市| 凤庆县| 邵阳县| 海林市| 思南县| 额尔古纳市| 凤翔县| 朝阳区| 淮阳县| 宾川县| 抚顺县| 灌阳县| 太白县| 宜兰市| 南陵县| 东山县| 当阳市| 永新县| 泸溪县| 揭阳市| 韶关市| 博兴县| 招远市| 平遥县| 乌鲁木齐市| 威宁| 陆良县| 泾源县| 阿克陶县| 屏边| 宝兴县| 武陟县| 沁阳市| 故城县| 仪陇县|