中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行Canal binlog日志的Dump流程分析

發布時間:2021-11-17 15:33:41 來源:億速云 閱讀:124 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關如何進行Canal binlog日志的Dump流程分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

Canal 的 dump 支持串行和并行模式兩種模式,本篇重點梳理 dump 的核心流程,以便對 dump 過程有一個充分的了解,更好的理解 Canal 的實現原理與細節,下一篇中將重點關注Canal是如何引入并行模式來提高dump的性能,即并行編程相關的技巧。

從前面的文章我們得知 Canal binlog 日志解析的基本流程如下圖所示:

如何進行Canal binlog日志的Dump流程分析  
在這里插入圖片描述

解析來重點梳理一下 dump 命令的發送邏輯,特別是日志的處理流程,一些基本的日志格式。    

1、 dump 流程分析


在 Canal 中 dump 方法聲明如下:

如何進行Canal binlog日志的Dump流程分析  
帶有參數 MultiStageCoprocessor 為并行處理模式,底層使用了disruptor 高性能并發框架,下文將重點關注學習。 我們今天重點來看一下串行dump的實現,其方法聲明如下:
 
如何進行Canal binlog日志的Dump流程分析  
在這里插入圖片描述
其方法參數說明如下:
  • String binlogfilename
      binlog 文件名稱,例如  mysql-bin.000038。

  • Long binlogPosition
    在文件中的偏移量。

  • SinkFunction func
    每解析出一條binlog日志的處理函數。

接下來我們直奔主題,一起來看一下 MysqlConnection 關于 dump 的實現流程。

如何進行Canal binlog日志的Dump流程分析  
MysqlConnection#dump

Step1:在發送dump之前先設置相關的參數。
  • set wait_timeout=9999999
    連接空閑超時時間,默認為8消息,用于 Canal Slave 的等待超時時間遠大于默認值。

  • set net_write_timeout=1800
    網絡寫請求超時時間,針對正在進行數據讀寫的連接,該值默認為 60s。

  • set net_read_timeout=1800
    網絡讀請求超時時間,針對正在進行數據讀寫的連接,該值默認為 30s。

  • set names 'binary'
    設置服務端返回結果時不做編碼轉化,直接按照數據庫的二進制編碼進行發送,由客戶端自己根據需求進行編碼轉化。

  • set @master_binlog_checksum= @@global.binlog_checksum
    設置master_binlog_checksum,因為在mysql5.6之后為binlog引入了checksum機制,例如crc32,canal作為mysql slave,需要與服務端相關參數保持一致。

  • set @slave_uuid=uuid()
    canal相對與mysql數據庫服務而言就是一個從服務器,這個指令用于設置server_id,使用uuid,避免server_id重復。

  • SET @master_heartbeat_period=15
    設置客戶端與服務端心跳發送間隔,默認為15s。

    如何進行Canal binlog日志的Dump流程分析    
    MysqlConnection#dump


    Step2:從主庫查詢binlog checksum,具體向主庫發送 select @@global.binlog_checksum 語句。

    如何進行Canal binlog日志的Dump流程分析    
    MysqlConnection#dump


    Step3:向MySQL Master 注冊從節點,告知客戶端的host、port、用戶名與密碼、serverId,具體實現是發送命令CODE為 0x15。

    如何進行Canal binlog日志的Dump流程分析    
    MysqlConnection#dump


    Step4:向 MySQL Master 發送 dump 請求,MySQL是基于請求與應答模式,發送請求命令后,就會向網絡通道中寫入響應請求。(在這里大家不妨先大概思考一下如何讀取 dump 命令的返回值,這部分雖然涉及到網絡相關的知識,我在這邊會稍微簡單提一下)。

    如何進行Canal binlog日志的Dump流程分析    
    MysqlConnection#dump


    Step5:構建 DirectLogFetcher對象,實現基于 socket 的日志拉取服務,并構建 LogDecoder 對象,用于解析 binlog 日志。

    如何進行Canal binlog日志的Dump流程分析    
    MysqlConnection#dump


    Step6:使用 while 循環反復拉取消息,通過通過 LogDecoder 對二進制流進行解析,提取一條完整的binlog事件,交給 SinkFunction 去處理,并且如果開啟了半同步機制,則需要向master發送ACK。既然是while循環,該方法的退出條件還是值得我們關注的:

  • fetch.fetch()方法返回 false

  • SinkFunction 的 sink 方法 false,SinkFunction的詳細處理流程將在下文介紹,這里先告知返回false的情況是 binlog 日志解析線程已停止運行。

上面粗略的介紹了 dump 命令的幾個核心關鍵步驟,要想詳細掌握其實現細節,我們必須繼續深入探討如下幾個問題:

  • DirectLogFetcher 內部工作機制

  • LogDecoder binlog 日志解析

  • 發送Dump底層網實現思路

 

2、DirectLogFetcher 內部工作機制


 
2.1 DirectLogFetcher 類圖
如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher的類繼承體系如上圖所示,我們來看一下其關鍵點:  
  • LogBuffer
    日志buffer,主要定義如下屬性:

  • byte[] buffer
    緩存區中數據容器。

  • int origin
    當前buffer中的讀指針

  • int limit
    當前buffer的最大可讀可寫指針

  • int position
    當前buffer的寫指針。

  • int semival
    是否需要發送ACK(用于半同步)。
    LogBuffer封裝了字節相關的操作,不僅定義了上面的屬性,也定義了字節讀取相關眾多API,其截圖如下:

    如何進行Canal binlog日志的Dump流程分析    
    在這里插入圖片描述
  • LogFetcher binlog日志抓取抽象類,定義了如下關鍵屬性與抽象方法。

    • int DEFAULT_INITIAL_CAPACITY
      LogBuffer中的初始容量,默認為8K。

    • float DEFAULT_GROWTH_FACTOR
      容量增長因子,默認為 2.0。

    • int   BIN_LOG_HEADER_SIZE
      binlog日志條目 header 的長度,固定為4字節。

    • float  factor
      增長因子。

    • public abstract boolean fetch()
      抓取binlog日志。

    • public abstract void close()
           關閉 Fetch。

  • DirectLogFetcher Canal LogFetcher模式實現類,其核心屬性如下:

    • SocketChannel channel
      網絡通道,用于發送dump請求的網絡通道。

    • boolean issemi = false
      是否開啟半同步。

 
2.2 fetch流程詳解

接下來我們重點剖析 DirectLogFetcher 的 fetch 方法,來探究其實現原理。
在研究DirectLogFetcher的fetch方法之前,我們先重點跟蹤一下其內部網絡讀寫方法fetch0方法,該方法是具體與網絡讀寫相關的實現。

如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch0

在詳細介紹該方法之前先來介紹一下其參數的含義:
  • int off
    從通道中讀取到的內容放入到buffer中的起始位置

  • int len
    期望從通道中讀取的字節長度。

該方法的實現關鍵點如下:

  • 首先先確保接收緩存區有足夠的剩余空間,如果空間不足,則進行擴容。

  • 然后從通道中讀取指定長度的字節。

接下來我們來重點看一下DirectLogFetcher的fetch的實現流程。

如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch

Step1:嘗試從網絡通道中讀取4個字節(即讀取協議的頭部),如果通道中還沒有可讀取內容,返回false,造成的效果是一次 dump 請求結束。  
 
如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch

Step2:從上文讀到的4個字節分別讀出該網絡包的總長度以及當前包的序號,從這里可以看成MySQL協議頭為4字節,前3個字節為網絡包的總長度,第4個字節為包的序列號。再取出數據包的長度后,繼續向通道中讀取netlen個字節,即讀取一個完整的數據包到buffer中。  
 
如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch

Step3:繼續從數據包中讀取一個字節,判斷該包的狀態碼,是否是一個成功的響應,如果是錯誤的響應,會向外拋出一次,Canal 會記錄dump命令執行錯誤的次數。  
 
如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch

Step4:如果一個包的長度為允許的最大包長度,則繼續讀取,這個主要是根據MySQL協議做的處理,即讀取到一個數據包,然后返回true,表示拉取到一條日志,然后通過LogDecoder解碼,然后傳入到sink方法中,進行日志的后續處理。  
 
如何進行Canal binlog日志的Dump流程分析  
DirectLogFetcher#fetch

Step5:這一步的目的,就是將buffer中的當前指針指向數據的開始位置。這樣一次 fetch就結束了。

從上面的流程來看,DirectLogFetcher#fetch 方法結束后,就將進入到LogDecoder中。經過一次DirectLogFetcher#fetch方法后,即取回一條binlog日志,即二進制流,接下來就根據binlog協議對其解析。本文暫不深入該方法,如果大家想深入數據庫中間件方面,可以作為一個很好的示例,面向MySQL通信協議進行編程。

 

3、SinkFunction


通過 LogDecoder從中解析一個事件后,會調用SinkFunction的sink方法,如果該方法返回 false,一次dump請求將介紹,接下來我們看一下其sink方法。

如何進行Canal binlog日志的Dump流程分析  
AbstractEventParser#start

該方法的實現比較簡單,這里不打算繼續深入,我們重點來看一下 Canal.Entry 的結構:  
 
如何進行Canal binlog日志的Dump流程分析  
在這里插入圖片描述

這個結構是基于Canal做架構設計,解決順序消費、數據不丟失一個重要參考依據,沒解析一條事務,最終放入到環形緩存區,環形緩存區盡量以一個事務提交到Sink組件,其代碼如下:  
 
如何進行Canal binlog日志的Dump流程分析  
在這里插入圖片描述

這里主要有如下幾個關鍵點:
  • 首先需要調用EventSink組件將解析出來的數據傳入EventSink。

  • EventSink組件處理成功后,會提交解析位點。

關于如何進行Canal binlog日志的Dump流程分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新平| 青铜峡市| 房山区| 大埔区| 青浦区| 琼结县| 巫溪县| 双江| 大庆市| 建始县| 灵武市| 汉中市| 奉化市| 平湖市| 鄂伦春自治旗| 道孚县| 鹿邑县| 柯坪县| 丹东市| 平邑县| 伊宁市| 芮城县| 佛山市| 天峨县| 聂拉木县| 镇巴县| 西畴县| 韩城市| 观塘区| 鲜城| 靖远县| 大余县| 宁晋县| 壶关县| 铜梁县| 应城市| 达尔| 莱芜市| 礼泉县| 商城县| 安泽县|