中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行instance模塊的源碼解析

發布時間:2021-12-09 16:22:00 來源:億速云 閱讀:140 作者:柒染 欄目:大數據

如何進行instance模塊的源碼解析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

1.基本結構

如何進行instance模塊的源碼解析

instance模塊下面也分為三個子模塊,core、manager、spring。

其中,core是instance的核心邏輯 。

而manager和spring只是兩種不同的instance配置讀取方式,manager通過http請求讀取admin的配置,spring通過配置文件的方式讀取。

主要控制邏輯我們在deployer模塊源碼分析中提到過,就是在CanalController類 的instanceGenerator,配置參數是canal.instance.global.mode

  • 根據destination創建config
  • 如果canal.instance.global.mode = manager,就使用PlainCanalInstanceGenerator
  • 如果canal.instance.global.mode = spring,就使用SpringCanalInstanceGenerator

源碼如下

如何進行instance模塊的源碼解析

2.core子模塊

如何進行instance模塊的源碼解析

代碼不多,就兩個接口,兩個類。

2.1 CanalInstanceGenerator接口

這個接口只有一個方法

如何進行instance模塊的源碼解析

具體實現就是開頭提到的兩種,PlainCanalInstanceGenerator和SpringCanalInstanceGenerator,分別在manager子模塊和spring子模塊中實現。

具體選擇就是開頭的那個canalController里面根據canal.instance.global.mode來選擇。

2.2 CanalInstance接口

先看一張官方文檔的圖,這個前面的文章已經分析過了。

如何進行instance模塊的源碼解析

server代表一個canal-server運行實例,對應于一個jvm。server內部可以有多個instance。

Instance內部有4個主要組件:

  • eventParser :數據源接入,模擬slave協議和master進行交互,協議解析
  • eventSink :Parser和Store連接器,進行數據過濾,加工,分發的工作
  • eventStore :數據存儲
  • metaManager:增量訂閱&消費信息管理器

在這個接口中,就定義了獲取4個組件的方法,以及新版本增加的mqProducer的配置信息(mqProducer在server模塊解析中介紹過了,可以回頭去看看)

如何進行instance模塊的源碼解析

我們簡單看下4個組件接口的各個實現類。

CanalEventParser接口實現類(paser模塊):

  • MysqlEventParser:偽裝成單個mysql實例的slave解析binglog日志
  • GroupEventParser:偽裝成多個mysql實例的slave解析binglog日志。內部維護了多個CanalEventParser,組合多個EventParser進行合并處理,group只是作為一個delegate處理。主要應用場景是分庫分表:比如一個大表拆分了4個庫,位于不同的mysql實例上,正常情況下,我們需要配置四個CanalInstance。對應的,業務上要消費數據時,需要啟動4個客戶端,分別鏈接4個instance實例。為了方便業務使用,此時我們可以讓CanalInstance引用一個GroupEventParser,由GroupEventParser內部維護4個MysqlEventParser去4個不同的mysql實例去拉取binlog,最終合并到一起。此時業務只需要啟動1個客戶端,鏈接這個CanalInstance即可
  • LocalBinlogEventParser:解析本地的mysql binlog。例如將mysql的binlog文件拷貝到canal的機器上進行解析。
  • RdsLocalBinlogEventParser:基于阿里云rds的binlog備份文件復制,下載到本地后進行本地的binlog解析。

CanalEventSink接口實現類(sink模塊):

  • EntryEventSink:普通的單個parser的sink操作,進行數據過濾,加工,分發

  • GroupEventSink:用于分庫分表的場景,對應GroupEventParser的數據解析,然后實現基于歸并排序的sink處理

CanalEventStore接口實現類(store模塊):

  • MemoryEventStoreWithBuffer:基于內存實現存儲store

CanalMetaManager(meta模塊):

  • ZooKeeperMetaManager:將元數據存存儲到zk中

  • MemoryMetaManager:將元數據存儲到內存中

  • MixedMetaManager:組合memory + zookeeper的使用模式

  • PeriodMixedMetaManager:基于定時刷新的策略的mixed實現

  • FileMixedMetaManager:先寫內存,然后定時刷新數據到File

關于這些實現的具體細節,我們在相應模塊的源碼分析時,進行講解。目前只需要知道,一些組件有多種實現,因此組合工作方式有多種。

2.3 AbstractCanalInstance類

AbstractCanalInstance是canalInstance的抽象類,維護了相關組件的引用。

同時,也實現了canalInstance接口的subscribeChange方法。

如何進行instance模塊的源碼解析

這個抽象類有兩個實現,CanalInstanceWithManager 和 CanalInstanceWithSpring。

如何進行instance模塊的源碼解析

AbstractCanalInstance的初始化過程都是在實現類中完成的。

如果選擇admin控制模式,那就是在CanalInstanceWithManager中完成;如果是spring模式,就在CanalInstanceWithSpring中完成。 

這里有個小發現:

仔細看下實際代碼調用我們發現,CanalInstanceWithManager是給ManagerCanalInstanceGenerator使用的,而這個generator實際上沒有被使用到。如果使用admin模式,本文開頭我們就看到了,使用了PlainCanalInstanceGenerator。PlainCanalInstanceGenerator里面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然后替換系統變量,然后再從spring的beanfactory中構建具體的實例。

2.3.1 subscribeChange() 方法

AbstractCanalInstance類實現了CanalInstance接口的subscribeChange方法。

我們看到,如果訂閱關系發生變化,就做一些操作,這里看的話,主要就是更新了一下filter。

filter規定了需要訂閱哪些庫,哪些表。

如何進行instance模塊的源碼解析

2.3.2 start() 方法

啟動沒什么特別的邏輯,就是按照順序依次啟動各個組件。

順序為 metaManager -> alarmHandler -> eventStore -> eventSink -> eventParser。

啟動順序主要跟依賴關系有關,元信息相關的管理跟所有都有關,所以metaManager最先啟動,其他的按照彼此之間的關系一一啟動。

如何進行instance模塊的源碼解析

這里我們發現,在啟動eventParser的時候做了特殊處理,分別是beforeStartEventParser 和 afterStartEventParser。我們在2.3.4專門講一下。

2.3.3 stop()方法

stop也沒什么特殊的,就是依次關閉各個組件。

關閉的順序就是start的逆過程。

這里就不貼代碼了。

2.3.4 eventParserr的特殊處理

在start和stop方法中的eventParser前后都有特殊的處理,start的beforeStartEventParser 和 afterStartEventParser,Stop的beforeStopEventParser 和 afterStopEventParser。

這個其實跟eventParser的設計有關。

EventParser 設計

如何進行instance模塊的源碼解析

  • 每個EventParser都會關聯兩個內部組件

  • CanalLogPositionManager : 記錄binlog 最后一次解析成功位置信息,主要是描述下一次canal啟動的位點 CanalHAController: 控制 EventParser 的鏈接主機管理,判斷當前該鏈接哪個mysql數據庫

所以這兩個beforexxx、afterxxxx方法做的主要是CanalLogPositionManager和CanalHAController的啟停工作。

2.3.5 AbstractCanalInstance類 總結

可以看到AbstractCanalInstance除了負責啟動和停止其內部組件,就沒有其他工作了。

eventParser在AbstractCanalInstance中啟動后,就會自行開啟多線程任務dump數據,通過eventSink投遞給eventStore。

而對eventStore的操作邏輯,實際上都是在CanalServerWithEmbedded中完成的,我們可以回顧一下CanalServerWithEmbedded中 getWithoutAck( ) 的相關邏輯。

包括:

  • 根據clientIdentity的destination獲取對應的instance

  • 獲取到流式數據中的最后一批獲取的位置positionRanges(跟batchId有關聯,就是上面那個map里面的)

  • 從cananlEventStore里面獲取binlog,轉化為event。一般是從最后的一個batchId位置開始,如果之前沒有batchId,那么就從cursor記錄的消費位點開始;如果cursor為空,那只能從eventStore的第一條消息開始。(這里幾個位置關系再想一想,跟ack有關,畫個圖)

  • event轉化為entry,并生成新的batchId,組合成message返回給客戶端

所以,其實這里只是簡單的啟動和停止,組件的交互邏輯是在CanalServerWithEmbedded中get出instance的各個組件來進行實現的。

3.spring模塊

前面提到了,PlainCanalInstanceGenerator里面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然后替換系統變量,然后再從spring的beanfactory中構建具體的實例。

所以我們重點關注spring子模塊的配置方式即可。

就下面四個類

如何進行instance模塊的源碼解析

3.1 CanalInstanceWithSpring類

基于spring容器啟動canal實例,方便獨立于manager啟動。

繼承了AbstractCanalInstance,其實就是一系列組件的setter方法,就不貼源碼了。

具體配置是基于spring的xml來做的.

當我們配置加載方式為spring時,創建的CanalInstance實例類型都是CanalInstanceWithSpring。canal將會尋找本地的spring配置文件來創建instance實例。canal默認提供了以下幾種spring配置文件:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

四個配置文件中,對CanalInstanceWithSpring都采用了同樣的配置方式:

如何進行instance模塊的源碼解析

當然,具體每個組件的ref在不同配置文件中有所不同。

最主要的就是metaManager 和eventParser 這兩個配置有所不同,可能在內存、文件或zk進行存儲。

eventStore 、和eventSink 定義都是相同的,eventStore目前的開源版本中eventStore只有一種基于內存的實現,eventSink其作用是eventParser和eventStore的連接器,進行數據過濾,加工,分發的工作。不涉及存儲,也就沒有必要針對內存、file、或者zk進行區分。

3.2 SpringCanalInstanceGenerator類

這個是具體創建instance的邏輯。

如何進行instance模塊的源碼解析

順便看下PlainCanalInstanceGenerator里面的實現,就是多了從遠端拉取配置,然后用PropertyPlaceholderConfigurer進行了變量替換,然后還是用beanFactory來獲取實例。

com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer繼承了org.springframework.beans.factory.config.PropertyPlaceholderConfigurer,設置動態properties,替換掉本地properties。

如何進行instance模塊的源碼解析

其實這個模塊的東西比較少,沒有什么特別復雜的邏輯。

我們來回顧下幾個問題

  • instance配置模式有哪幾種,如何根據配置創建instance?

主要有基于spring和基于遠端配置兩種方式,前者的實現在,后者的實現在PlainCanalInstanceGenerator

  • 遠端配置如何覆蓋本地配置的?

PlainCanalInstanceGenerator中使用了spring的PropertyPlaceholderConfigurer來覆蓋配置

  • instance實例內部有哪些組件?

包括了parser、sink、store、metamanager等組件,但是只負責了啟動和停止邏輯,具體交互邏輯還是在CanalServerWithEmbedded中實現的。

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

曲水县| 湖口县| 广南县| 商河县| 邛崃市| 茌平县| 江门市| 余干县| 祁门县| 于田县| 增城市| 汾阳市| 台中县| 桃源县| 普兰店市| 文山县| 青岛市| 定西市| 鱼台县| 平谷区| 上高县| 大姚县| 温宿县| 桦川县| 廉江市| 沾益县| 龙里县| 多伦县| 沿河| 栾城县| 微山县| 桐庐县| 涞水县| 林西县| 朔州市| 山东| 平泉县| 吉木萨尔县| 彭水| 武威市| 来安县|