中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink怎么實時計算topN

發布時間:2021-10-18 09:59:07 來源:億速云 閱讀:172 作者:柒染 欄目:開發技術

這篇文章將為大家詳細講解有關Flink怎么實時計算topN,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

1. 用到的知識點

  • Flink創建kafka數據源;

  • 基于 EventTime 處理,如何指定 Watermark;

  • Flink中的Window,滾動(tumbling)窗口與滑動(sliding)窗口;

  • State狀態的使用;

  • ProcessFunction 實現 TopN 功能;

2. 案例介紹

通過用戶訪問日志,計算最近一段時間平臺最活躍的幾位用戶topN。

  • 創建kafka生產者,發送測試數據到kafka;

  • 消費kafka數據,使用滑動(sliding)窗口,每隔一段時間更新一次排名;

3. 數據源

這里使用kafka api發送測試數據到kafka,代碼如下:

@Data @NoArgsConstructor @AllArgsConstructor @ToString public class User {      private long id;     private String username;     private String password;     private long timestamp; }  Map<String, String> config = Configuration.initConfig("commons.xml");  @Test public void sendData() throws InterruptedException {     int cnt = 0;      while (cnt < 200){         User user = new User();         user.setId(cnt);         user.setUsername("username" + new Random().nextInt((cnt % 5) + 2));         user.setPassword("password" + cnt);         user.setTimestamp(System.currentTimeMillis());         Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user));         while (!future.isDone()){             Thread.sleep(100);         }         try {             RecordMetadata recordMetadata = future.get();             System.out.println(recordMetadata.offset());         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         }         System.out.println("發送消息:" + cnt + "******" + user.toString());         cnt = cnt + 1;     } }

這里通過隨機數來擾亂username,便于使用戶名大小不一,讓結果更加明顯。KafkaUtil是自己寫的一個kafka工具類,代碼很簡單,主要是平時做測試方便。

4. 主要程序

創建一個main程序,開始編寫代碼。

創建flink環境,關聯kafka數據源。

Map<String, String> config = Configuration.initConfig("commons.xml");  Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper")); kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport")); kafkaProps.setProperty("group.id", config.get("kafka-groupid"));  StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

EventTime 與 Watermark

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

設置屬性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照數據時間字段來處理,默認是TimeCharacteristic.ProcessingTime

/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

這個屬性必須設置,否則后面,可能窗口結束無法觸發,導致結果無法輸出。取值有三種:

  • ProcessingTime:事件被處理的時間。也就是由flink集群機器的系統時間來決定。

  • EventTime:事件發生的時間。一般就是數據本身攜帶的時間。

  • IngestionTime:攝入時間,數據進入flink流的時間,跟ProcessingTime還是有區別的;

指定好使用數據的實際時間來處理,接下來需要指定flink程序如何get到數據的時間字段,這里使用調用DataStream的assignTimestampsAndWatermarks方法,抽取時間和設置watermark。

senv.addSource(         new FlinkKafkaConsumer010<>(                 config.get("kafka-topic"),                 new SimpleStringSchema(),                 kafkaProps         ) ).map(x ->{     return JSON.parseObject(x, User.class); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) {     @Override     public long extractTimestamp(User element) {         return element.getTimestamp();     } })

前面給出的代碼中可以看出,由于發送到kafka的時候,將User對象轉換為json字符串了,這里使用的是fastjson,接收過來可以轉化為JsonObject來處理,我這里還是將其轉化為User對象JSON.parseObject(x,  User.class),便于處理。

這里考慮到數據可能亂序,使用了可以處理亂序的抽象類BoundedOutOfOrdernessTimestampExtractor,并且實現了唯一的一個沒有實現的方法extractTimestamp,亂序數據,會導致數據延遲,在構造方法中傳入了一個Time.milliseconds(1000),表明數據可以延遲一秒鐘。比如說,如果窗口長度是10s,0~10s的數據會在11s的時候計算,此時watermark是10,才會觸發計算,也就是說引入watermark處理亂序數據,最多可以容忍0~t這個窗口的數據,最晚在t+1時刻到來。

Flink怎么實時計算topN

具體關于watermark的講解可以參考這篇文章

https://blog.csdn.net/qq_39657909/article/details/106081543

窗口統計

業務需求上,通常可能是一個小時,或者過去15分鐘的數據,5分鐘更新一次排名,這里為了演示效果,窗口長度取10s,每次滑動(slide)5s,即5秒鐘更新一次過去10s的排名數據。

.keyBy("username") .timeWindow(Time.seconds(10), Time.seconds(5)) .aggregate(new CountAgg(), new WindowResultFunction())

我們使用.keyBy("username")對用戶進行分組,使用.timeWindow(Time size, Time  slide)對每個用戶做滑動窗口(10s窗口,5s滑動一次)。然后我們使用 .aggregate(AggregateFunction af,  WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少 state  的存儲壓力。較之.apply(WindowFunction  wf)會將窗口中的數據都存儲下來,最后一起計算要高效地多。aggregate()方法的第一個參數用于

這里的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。

public class CountAgg implements AggregateFunction<User, Long, Long>{     @Override     public Long createAccumulator() {         return 0L;     }      @Override     public Long add(User value, Long accumulator) {         return accumulator + 1;     }      @Override     public Long getResult(Long accumulator) {         return accumulator;     }      @Override     public Long merge(Long a, Long b) {         return a + b;     } }

.aggregate(AggregateFunction af, WindowFunction wf) 的第二個參數WindowFunction將每個  key每個窗口聚合后的結果帶上其他信息進行輸出。我們這里實現的WindowResultFunction將用戶名,窗口,訪問量封裝成了UserViewCount進行輸出。

private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> {       @Override     public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception {         Long count = input.iterator().next();         out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count));     } }  @Data @NoArgsConstructor @AllArgsConstructor @ToString public static class UserViewCount {     private String userName;     private long windowEnd;     private long viewCount;  }

TopN計算最活躍用戶

為了統計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這里根據UserViewCount中的windowEnd進行keyBy()操作。然后使用  ProcessFunction 實現一個自定義的 TopN 函數 TopNHotItems  來計算點擊量排名前3名的用戶,并將排名結果格式化成字符串,便于后續輸出。

.keyBy("windowEnd") .process(new TopNHotUsers(3)) .print();

ProcessFunction 是 Flink 提供的一個 low-level API,用于實現更高級的功能。它主要提供了定時器 timer  的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有用戶的訪問數據。由于  Watermark 的進度是全局的,在 processElement 方法中,每當收到一條數據(ItemViewCount),我們就注冊一個  windowEnd+1 的定時器(Flink 框架會自動忽略同一時間的重復注冊)。windowEnd+1 的定時器被觸發時,意味著收到了windowEnd+1的  Watermark,即收齊了該windowEnd下的所有用戶窗口統計值。我們在 onTimer() 中處理將收集的所有商品及點擊量進行排序,選出  TopN,并將排名信息格式化成字符串后進行輸出。

這里我們還使用了 ListState來存儲收到的每條 UserViewCount  消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它集成了框架的  checkpoint 機制,自動做到了 exactly-once 的語義保證。

private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> {      private int topSize;     private ListState<UserViewCount> userViewCountListState;      public TopNHotUsers(int topSize) {         this.topSize = topSize;     }      @Override     public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {         super.onTimer(timestamp, ctx, out);         List<UserViewCount> userViewCounts = new ArrayList<>();         for(UserViewCount userViewCount : userViewCountListState.get()) {             userViewCounts.add(userViewCount);         }          userViewCountListState.clear();          userViewCounts.sort(new Comparator<UserViewCount>() {             @Override             public int compare(UserViewCount o1, UserViewCount o2) {                 return (int)(o2.viewCount - o1.viewCount);             }         });          // 將排名信息格式化成 String, 便于打印         StringBuilder result = new StringBuilder();         result.append("====================================\n");         result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n");         for (int i = 0; i < topSize; i++) {             UserViewCount currentItem = userViewCounts.get(i);             // No1:  商品ID=12224  瀏覽量=2413             result.append("No").append(i).append(":")                     .append("  用戶名=").append(currentItem.userName)                     .append("  瀏覽量=").append(currentItem.viewCount)                     .append("\n");         }         result.append("====================================\n\n");          Thread.sleep(1000);          out.collect(result.toString());      }      @Override     public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {         super.open(parameters);         ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>(                 "user-state",                 UserViewCount.class         );         userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor);      }      @Override     public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception {         userViewCountListState.add(value);         ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000);     } }

結果輸出

可以看到,每隔5秒鐘更新輸出一次數據。

Flink怎么實時計算topN

關于Flink怎么實時計算topN就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

扎囊县| 厦门市| 金塔县| 仁化县| 获嘉县| 卢龙县| 博罗县| 那曲县| 龙岩市| 宣汉县| 芒康县| 玛多县| 永安市| 奈曼旗| 亳州市| 金华市| 楚雄市| 梧州市| 疏勒县| 西充县| 临海市| 福安市| 鄂伦春自治旗| 临沂市| 秦安县| 舒城县| 塘沽区| 久治县| 镇康县| 莱阳市| 神池县| 永泰县| 潼关县| 柳州市| 久治县| 承德市| 昭苏县| 舞钢市| 东光县| 扎兰屯市| 井陉县|