中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中Look up維表怎么使用

發布時間:2021-12-31 10:41:45 來源:億速云 閱讀:210 作者:iii 欄目:大數據

本篇內容主要講解“flink中Look up維表怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink中Look up維表怎么使用”吧!

背景

在流式計算中,維表是一個很常見的概念,一般用于sql的join中,對流式數據進行數據補全,比如我們的source stream是來自日志的訂單數據,但是日志中我們只是記錄了訂單商品的id,并沒有其他的信息,但是我們把數據存入數倉進行數據分析的時候,卻需要商品名稱、價格等等其他的信息,這種問題我們可以在進行流處理的時候通過查詢維表的方式對數據進行數據補全。

維表一般存儲在外部存儲中,比如mysql、hbase、redis等等,今天我們以mysql為例,講講flink中維表的使用。

LookupableTableSource

在flink中提供了一個LookupableTableSource,可以用于實現維表,也就是我們可以通過某幾個key列去查詢外部存儲來獲取相關的信息用于補全stream的數據。

public interface LookupableTableSource<T> extends TableSource<T> {

TableFunction<T> getLookupFunction(String[] lookupKeys);

AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

我們看到,LookupableTableSource有三個方法

  • getLookupFunction:用于同步查詢維表的數據,返回一個TableFunction,所以本質上還是通過用戶自定義 UDTF來實現的。

  • getAsyncLookupFunction:用于異步查詢維表的數據,該方法返回一個對象

  • isAsyncEnabled:默認情況下是同步查詢,如果要開啟異步查詢,這個方法需要返回true

在flink里,我們看到實現了這個接口的主要有四個類,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我們主要以jdbc為例講講如何進行維表查詢。

實例講解

接下來我們講一個小例子,首先定義一下stream source,我們使用flink 1.11提供的datagen來生成數據。

我們來模擬生成用戶的數據,這里只生成的用戶的id,范圍在1-100之間。

CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
)

datagen具體的使用方法可以參考:

聊聊flink 1.11 中的隨機數據生成器-DataGen connector

然后再創建一個mysql維表信息:

CREATE TABLE dim_mysql (
 id int,
 name STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/test',
  'table-name' = 'userinfo',
  'username' = 'root',
  'password' = 'root'
)

我們這個mysql表中樣例數據如下:

flink中Look up維表怎么使用

最后執行sql查詢,流表關聯維表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

結果示例如下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

我們看到對于維表中存在的數據,已經關聯出來了,對于維表中沒有的數據,顯示為null

源碼解析

JdbcTableSource

以jdbc為例,我們來看看flink底層是怎么做的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持異步的查詢,所以進入JdbcTableSource#getLookupFunction方法。

	@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}

最終是構造了一個JdbcLookupFunction對象,

  • options是連接jdbc的一些參數,比如user、pass、url等。

  • lookupOptions是一些有關維表的參數,主要是緩存的大小、超時時間等。

  • lookupKeys也就是要去關聯查詢維表的字段。

JdbcLookupFunction

所以我們來看看JdbcLookupFunction類,這個JdbcLookupFunction是一個TableFunction的子類,具體的TableFunction的使用可以參考這個文章:

Flink實戰教程-自定義函數之TableFunction

一個TableFunction最核心的就是eval方法,在這個方法里,做的主要的工作就是通過傳進來的多個keys拼接成sql去來查詢數據,首先查詢的是緩存,緩存有數據就直接返回,緩存沒有的話再去查詢數據庫,然后再將查詢的結果返回并放入緩存,下次查詢的時候直接查詢緩存。

為什么要加一個緩存呢?默認情況下是不開啟緩存的,每來一個查詢,都會給維表發送一個請求去查詢,如果數據量比較大的話,勢必會給存儲維表的系統造成一定的壓力,所以flink提供了一個LRU緩存,查詢維表的時候,先查詢緩存,緩存沒有再去查詢外部系統,但是如果有一個數據查詢頻率比較高,一直被命中,就無法獲取新數據了。所以緩存還要加一個超時時間,過了這個時間,把這個數據強制刪除,去外部系統查詢新的數據。

具體的怎么開啟緩存呢?我們看下JdbcLookupFunction#open方法

	@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

也就是說cacheMaxSize和cacheExpireMs需要同時設置,就會構造一個緩存對象cache來緩存數據.這兩個參數對應的DDL的屬性就是lookup.cache.max-rows和lookup.cache.ttl

對于具體的緩存的大小和超時時間的設置,用戶需要根據自身的情況來自己定義,在數據的準確性和系統的吞吐量之間做一個權衡。

到此,相信大家對“flink中Look up維表怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

忻城县| 通化市| 隆德县| 楚雄市| 道孚县| 新晃| 宝丰县| 西畴县| 原阳县| 华阴市| 杨浦区| 台湾省| 乌鲁木齐市| 亳州市| 宁陕县| 青河县| 河曲县| 唐海县| 博爱县| 姚安县| 金平| 湘潭县| 潼南县| 出国| 普陀区| 临泉县| 凤阳县| 波密县| 巴里| 盐亭县| 仙桃市| 遂平县| 衡南县| 灯塔市| 泾源县| 资阳市| 石城县| 颍上县| 荔浦县| 潞城市| 万盛区|