您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關flink 1.11中的CDC是什么意思,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
CDC,Change Data Capture,變更數據獲取的簡稱,使用CDC我們可以從數據庫中獲取已提交的更改并將這些更改發送到下游,供下游使用。這些變更可以包括INSERT,DELETE,UPDATE等,
用戶可以在以下的場景下使用CDC:
使用flink sql進行數據同步,可以將數據從一個數據同步到其他的地方,比如mysql、elasticsearch等。
可以在源數據庫上實時的物化一個聚合視圖
因為只是增量同步,所以可以實時的低延遲的同步數據
使用EventTime join 一個temporal表以便可以獲取準確的結果
flink 1.11 將這些changelog提取并轉化為table apa和sql,目前支持兩種格式:Debezium和Canal,這就意味著源表不僅僅是append操作,而且還有upsert、delete操作。
接下來我們使用canal為例簡單介紹下CDC的使用
canal 格式:
{ "data": [ { "id": "13", "username": "13", "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9", "name": "Canal Manager V2" } ], "old": [ { "id": "13", "username": "13", "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9", "name": "Canal Manager" } ], "database": "canal_manager", "es": 1568972368000, "id": 11, "isDdl": false, "mysqlType": {...}, "pkNames": [ "id" ], "sql": "", "sqlType": {...}, "table": "canal_user", "ts": 1568972369005, "type": "UPDATE" }
簡單講下幾個核心的字段:
type : 描述操作的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。
data : 代表操作的數據。如果為'INSERT',則表示行的內容;如果為'UPDATE',則表示行的更新后的狀態;如果為'DELETE',則表示刪除前的狀態。
old :可選字段,如果存在,則表示更新之前的內容,如果不是update操作,則為 null。
完整的語義如下;
private String destination; // 對應canal的實例或者MQ的topic private String groupId; // 對應mq的group id private String database; // 數據庫或schema private String table; // 表名 private List<String> pkNames; private Boolean isDdl; private String type; // 類型: INSERT UPDATE DELETE // binlog executeTime private Long es; // 執行耗時 // dml build timeStamp private Long ts; // 同步時間 private String sql; // 執行的sql, dml sql為空 private List<Map<String, Object>> data; // 數據列表 private List<Map<String, Object>> old; // 舊數據列表, 用于update, size和data的size一一對應
-- 定義的字段和data 里面的數據想匹配 CREATE TABLE my_table ( id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'products_binlog', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'canal-json.ignore-parse-errors'='true' -- 忽略解析錯誤,缺省值false );
canal 格式也是作為一種flink的格式,而且是source,所以也就是涉及到讀取數據的時候進行反序列化,我們接下來就簡單看看CanalJson的反序列化的實現。具體的實現類是CanalJsonDeserializationSchema。
我們看下這個最核心的反序列化方法:
@Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException { try { //使用json反序列化器將message反序列化成RowData RowData row = jsonDeserializer.deserialize(message); //獲取type字段,用于下面的判斷 String type = row.getString(2).toString(); if (OP_INSERT.equals(type)) { // 如果操作類型是insert,則data數組表示的是要插入的數據,則循環遍歷data,然后添加一個標識INSERT,構造RowData對象,發送下游。 ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.INSERT); out.collect(insert); } } else if (OP_UPDATE.equals(type)) { // 如果是update操作,從data字段里獲取更新后的數據、 ArrayData data = row.getArray(0); // old字段獲取更新之前的數據 ArrayData old = row.getArray(1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { //如果old字段非空,則說明進行了數據的更新,如果old字段是null,則說明更新前后數據一樣,這個時候把before的數據也設置成after的,也就是發送給下游的before和after數據一樣。 before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); //把更新前后的數據都發送下游 out.collect(before); out.collect(after); } } else if (OP_DELETE.equals(type)) { // 如果是刪除操作,data字段里包含將要被刪除的數據,把這些數據組織起來發送給下游 ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.DELETE); out.collect(insert); } } else { if (!ignoreParseErrors) { throw new IOException(format( "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { // a big try catch to protect the processing. if (!ignoreParseErrors) { throw new IOException(format( "Corrupt Canal JSON message '%s'.", new String(message)), t); } } }
以上就是flink 1.11中的CDC是什么意思,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。