您好,登錄后才能下訂單哦!
這篇“SpringBoot怎么整合Canal方法”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“SpringBoot怎么整合Canal方法”文章吧。
(1.1.5 改動很大,這兒客戶端用 1.1.4)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>top.yueshushu</groupId> <artifactId>learn</artifactId> <version>1.0-SNAPSHOT</version> <name>Canal</name> <description>學習 Canal</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 導入配置文件處理器,配置文件進行綁定就會有提示,需要重啟 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--導入自動熱步署的依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--引入MySql的驅動--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--引入springboot與mybatis整合的依賴--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.4</version> </dependency> <!-- 引入pagehelper分頁插件 --> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.2.5</version> </dependency> <!--添加 druid-spring-boot-starter的依賴的依賴--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.14</version> </dependency> <!--SpringBoot 的aop 模塊--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--添加canal的依賴. 重要. 使用 1.1.4--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.4</version> </dependency> </dependencies> <build> <!--將該目錄下的文件全部打包成類的路徑--> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
/** * 一個簡單的canal 的連接測試程序 */ @Test public void connectionTest() { //1. 創建連接 填充對應的地址信息 ,要監控的實例和相應的用戶名和密碼 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); //2. 進行連接 canalConnector.connect(); log.info(">>>連接成功:{}", canalConnector); }
17:26:32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>連接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3
/** * 獲取數據信息. 可以發現,未獲取到數據 . 這個應該是實時的. */ @Test public void getDataTest() { //1. 創建連接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 進行連接 canalConnector.connect(); //3. 注冊,看使用哪個數據庫表 canalConnector.subscribe("springboot.user"); //4. 獲取 1條數據 Message message = canalConnector.get(1); log.info("獲取的數據:id:{},數據:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未獲取到數據"); return; } //5. 獲取相應的數據集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>獲取數據 {}", entry); //獲取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>獲取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>獲取類型 {}:,對應的信息:{}", entryType.getNumber(), entryType.name()); //獲取數據 ByteString storeValue = entry.getStoreValue(); log.info(">>>輸出存儲的值:{}", storeValue); } }
在主庫里面插入一條數據
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用戶',24,'男','學習canal');
再次執行:
/** * 獲取數據信息. 獲取現在的數據. 再次執行時,就沒有這個數據了. */ @Test public void getNowDataTest() { //1. 創建連接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 進行連接 canalConnector.connect(); //3. 注冊,看使用哪個數據庫表 canalConnector.subscribe("springboot.user"); for (;;) { //4. 獲取 1條數據 Message message = canalConnector.get(1); log.info("獲取的數據:id:{},數據:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未獲取到數據"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //5. 獲取相應的數據集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>獲取數據 {}", entry); //獲取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>獲取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>獲取類型 {}:,對應的信息:{}", entryType.getNumber(), entryType.name()); //獲取數據 ByteString storeValue = entry.getStoreValue(); log.info(">>>輸出存儲的值:{}", storeValue); } } }
可以隨時獲取相應的數據變更信息。
會發現, storeValue 的值是很難解讀的。 需要將這個數據解析出來。
/** * 將 storeValue 進行解析,解析成我們能看懂的語句. * 對數據庫 cud 進行處理操作觀看一下. * 發現,點是不好的,也有多余的記錄信息. * * @throws Exception 異常 */ @Test public void convertDataTest() throws Exception { //1. 創建連接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); //2. 進行連接 canalConnector.connect(); canalConnector.subscribe("springboot.user"); for (;;) { //獲取信息 Message message = canalConnector.get(1); if (message.getId() == -1L) { // log.info("未獲取到數據"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } continue; } List<CanalEntry.Entry> entryList = message.getEntries(); //對獲取到的數據進行處理 log.info(">>獲取到{}條數據", entryList.size()); for (CanalEntry.Entry entry : entryList) { CanalEntry.Header header = entry.getHeader(); log.info(">>>獲取表名:{}", header.getTableName()); //獲取類型. CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>類型編號 {},類型名稱:{}", entryType.getNumber(), entryType.name()); //獲取存入日志的值 ByteString storeValue = entry.getStoreValue(); //將這個值進行解析 CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue); String sql = rowChange.getSql(); log.info(">>>獲取對應的sql:{}", sql); // 這個sql 可能是 批量的sql語句 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { log.info(">>>獲取信息:{}", rowData); //對數據進行處理 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); beforeColumnsList.forEach( n -> log.info("哪個列{},原先是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); afterColumnsList.forEach( n -> log.info("哪個列{},后來是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); } } } }
再次執行sql
insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用戶2',25,'男','學習canal2');
發現 其他類型的 如: TRANSACTIONBEGIN 也進行了處理
/** * 類型轉換數據 * * @throws Exception 異常 */ @Test public void dataTypeTest() throws Exception { CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); canalConnector.subscribe("springboot.user"); for(;;){ Message message = canalConnector.get(1); if (message.getId() == -1) { TimeUnit.SECONDS.sleep(1); continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); //只要 RowData 數據類型的 if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>對表 {} 進行操作", tableName); ByteString storeValue = entry.getStoreValue(); RowChange rowChange = RowChange.parseFrom(storeValue); //行改變 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(RowChange rowChange) { log.info(">>>>執行刪除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue()); } } } private void updateHandler(RowChange rowChange) { log.info(">>>執行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入數據. 只有后的數據. * * @param rowChange 行改變 */ private void insertHandler(RowChange rowChange) { log.info(">>>執行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue()); } } }
插入,更新,刪除,分別進行了處理.
先啟動測試程序:
不打印任何信息。
主表執行添加語句:
insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用戶4',25,'男','學習canal4');
會打印信息:
這個可讀性就非常高了.
主表執行修改的操作.
update springboot.user set name='開開心心',age=26,description='岳澤霖' where id =4;
更新時,若每一個字段都跟原先一樣,不會產生日志消費。
主表執行刪除的操作:
delete from springboot.user where id =4;
上面的獲取,都是一條數據一條數據獲取的。效率比較低
/** * 一次性獲取多條數據。 * sql 執行多條。 */ @Test public void dataMoreTest() throws Exception { //1. 創建 canal連接對象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 訂閱哪個對象 canalConnector.subscribe("springboot.user"); for (; ; ) { // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3); if (message.getId() == -1) { // 未獲取到數據 continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>對表{} 執行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //對類型進行處理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>執行刪除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>執行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入數據. 只有后的數據. * * @param rowChange 行改變 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>執行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue()); } } }
修改點:
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3);
.get(3) 表示 一次性獲取3條記錄.
canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之內獲取3條記錄,
有兩個觸發條件,一個是獲取了3條,一個是到了5秒。
效果展示信息與之前是一致的,就不重新演示了。
/** * 一次性獲取多條數據。 * sql 執行多條。 */ @Test public void dataMoreTest() throws Exception { //1. 創建 canal連接對象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 訂閱哪個對象 canalConnector.subscribe("springboot.user"); for (; ; ) { Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS); if (message.getId() == -1) { // 未獲取到數據 TimeUnit.MILLISECONDS.sleep(500); continue; } log.info(">>>>獲取對應的 id: {}",message.getId()); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>對表{} 執行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //對類型進行處理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } //進行回滾 // canalConnector.rollback(); //確認ack 配置 canalConnector.ack(message.getId()); } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>執行刪除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>執行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入數據. 只有后的數據. * * @param rowChange 行改變 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>執行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue()); } } }
主要信息:
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
//進行回滾 // canalConnector.rollback();
//確認ack 配置canalConnector.ack(message.getId());
手動確認消息消費了.
當消息 rollback() 回滾后,會再次消費這條消息.
canalConnector.rollback();
執行語句:
insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用戶5',25,'男','學習canal5');
如果變成 手動確認,
canalConnector.ack(message.getId());
則只消費一次.
以上就是關于“SpringBoot怎么整合Canal方法”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。