中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Springboot2.3.x整合Canal的方法

發布時間:2022-02-19 17:03:04 來源:億速云 閱讀:179 作者:iii 欄目:開發技術

這篇文章主要介紹“Springboot2.3.x整合Canal的方法”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Springboot2.3.x整合Canal的方法”文章能幫助大家解決問題。

    一、故事背景

    最近工作中遇到了一個數據同步的問題

    我們這邊系統的一個子業務需要依賴另一個系統的數據,當另一個系統數據變更時,我們這邊的數據庫要對數據進行同步…

    那么我自己想到的同步方式呢就兩種:

    1、MQ訂閱,另一個系統數據變更后將變更數據方式到MQ 我們這邊訂閱接受

    2、數據庫的觸發器

    但是呢,兩者都被組長paas了!

    1、MQ呢,會造成代碼侵入,但是另一個系統暫時不會做任何代碼更改…

    2、數據庫的觸發器會直接跟生產數據庫強關聯,會搶占資源,甚至有可能造成生產數據庫的不穩定…

    對此很是苦惱…

    于是啊,只能借由強大的google、百度,看看能不能解決我這個問題!一番搜索,有學習了一個很有趣的東西

    二、什么是Canal

    canal:阿里開源mysql binlog 數據組件

    早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基于trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基于數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支持mysql5.x和oracle部分版本的日志解析

    Springboot2.3.x整合Canal的方法

    canal [k?’næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費

    工作原理

    • canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送 dump 協議

    • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )

    • canal 解析 binary log 對象(原始為 byte 流)

    canal呢,實際是就是運用了Mysql的主從復制原理…

    MySQL主從復制實現

    Springboot2.3.x整合Canal的方法

    復制遵循三步過程:

    • 服務器將更改記錄到binlog中(這些記錄稱為binlog事件,可以通過來查看show binary events

    • 從服務器將主服務器的二進制日志事件復制到其中繼日志。

    • 中繼日志中的從服務器重做事件隨后將更新其舊數據。

    如何運作

    Springboot2.3.x整合Canal的方法

    原理很簡單:

    • Canal模擬MySQL從站的交互協議,偽裝成MySQL從站,然后將轉儲協議發送到MySQL主服務器。

    • MySQL Master接收到轉儲請求,并開始將二進制日志推送到slave(即運河)。

    • 運河將二進制日志對象解析為其自己的數據類型(最初為字節流)

    通過官網的介紹,讓我們了解到,canal實際上就是偽裝為了一個從庫,我們只需要訂閱到數據變更的主庫,那么canal就會以從庫的身份讀取到其主庫的binlog日志!我們拿到canal解析好的binlog日志信息,就等于拿到了變更的數據啦!

    三、Canal安裝

    (1)事前準備

    (1)數據庫開啟binlog

    使用canal呢,有一個前提條件,即被訂閱的數據庫需要開啟binlog

    如何查看是否開啟binlog呢?

    登錄服務器上數據庫或在可視化工具中 執行查詢語句: 如果出現 log_bin ON 表示已開啟Binlog

    show variables like 'log_bin';

    Springboot2.3.x整合Canal的方法

    如果服務器上的數據庫為自己安裝的,則找到配置文件my.conf 添加以下內容,如果買的云實例,則詢問廠商開啟即可

    Springboot2.3.x整合Canal的方法

    在my.conf文件中的 [mysqld] 下添加以下三行內容

    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式 讀行
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
    (2)數據庫新建賬號,開啟MySQL slav權限

    canaltest:作為slave 角色的賬戶 Canal123…:為密碼

    CREATE USER canaltest IDENTIFIED BY 'Canal123..';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%';
    GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ;
    FLUSH PRIVILEGES;

    Springboot2.3.x整合Canal的方法

    連接測試

    Springboot2.3.x整合Canal的方法

    那么到這里,準備工作就好了!

    可能呢,有的小伙伴有點懵,你這是在干啥?那么咱們就來理那么一理! 敲黑板了哈!

    Springboot2.3.x整合Canal的方法

    1、事前準備,是針對于訂閱數據庫的(即主庫)

    2、實際步驟也就兩步 1:更改配置,開啟binlog 2:設置新賬號,賦予slave權限,供canal讀取Binlog橋梁使用

    3、以上操作與canal本身沒啥關系,僅僅是使用canal的前提條件罷遼…

    (2)Canal Admin 安裝

    canal admin 是 一個可視化的 canal web管理運維工程,脫離以往服務器運維,面向web…

    canal-admin設計上是為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作

    canal-admin的限定依賴:

    • MySQL,用于存儲配置和節點等相關數據

    • canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態運維管理接口)

    • 需要JRE 環境 (安裝JDK)

    下載

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

    解壓

    mkdir /usr/local/canal-admin
    tar zxvf canal.admin-1.1.4.tar.gz  -C /usr/local/canal-admin

    進入canal-admin目錄下查看

    cd /usr/local/canal-admin

    修改配置

    vim conf/application.yml

    里邊的配置 按照自己的實際情況更改…

    server:
      port: 8089
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
    #這里是配置canal-admin 所依賴的數據庫,,,存放web管理中設置的配置等,,,
    spring.datasource:
      address: 127.0.0.1:3306
      database: canal_manager
      username: root	
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
      hikari:
        maximum-pool-size: 30
        minimum-idle: 1
    # 連接所用的賬戶密碼 
    canal:
      adminUser: admin
      adminPasswd: leitest

    導入canaladmin 所需要的數據庫文件

    這里需要注意了,要和 application.yml中的數據庫名對應,你可以選擇命令導入,也可以Navicat 可視化拖sql文件導入…一切…看你喜歡.

    我這個玩canal的服務器呢,是新安裝的,mysql直接用docker安裝即可,具體可查看我的博客:

    Docker在CentOS7下不能下載鏡像timeout的解決辦法(圖解)

    CentOS 7安裝Docker

    需要注意的是,使用docker 安裝的mysql 是無法直接使用 mysql -uroot -p 命令的哦,需要先將腳本復制到容器中,docker不熟練或覺得麻煩的同鞋,請直接使用Navicat可視化工具…

    導入canal-admin服務所必需的sql文件

    如果是服務器軟件軟件安裝的mysql 則直接執行以下命令即可

    mysql -uroot -p
    #.........
    # 導入初始化SQL
    > source conf/canal_manager.sql

    Springboot2.3.x整合Canal的方法

    啟動

    直接執行啟動腳本即可

    cd bin
    ./startup.sh

    Springboot2.3.x整合Canal的方法

    默認賬戶密碼:

    admin:123456

    Springboot2.3.x整合Canal的方法

    (3)Canal Server 安裝

    canal-server 才是canal的核心我們前邊所講的canal的功能,實際上講述的就是canal-server的功能…admin 僅僅只是一個web管理而已,不要搞混主次關系…

    下載

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    解壓

    mkdir /usr/local/canal-server
    tar zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/canal-server

    啟動,并連接到canal-admin web端

    首先,我們需要修改配置文件

    cd /usr/local/canal-server
    
    vim /conf/canal_local.properties

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    注意了,密碼如何加密!!!

    要記得,前邊 canal-admin 的 aplication.yml 中設置了賬戶密碼為 admin:leitest

    # 連接所用的賬戶密碼 
    canal:
      adminUser: admin
      adminPasswd: leitest

    所以,我們這里需要對明文 leitest 加密并替換即可

    使用數據庫函數 PASSWORD 加密即可

    SELECT PASSWORD(‘要加密的明文’),然后去掉前邊的* 號就行

    Springboot2.3.x整合Canal的方法

    啟動并連接到admin

    sh bin/startup.sh local

    查看端口看是否有 11110 、11111、11112

    netstat -untlp 看了一下,發現沒有,說明server 沒有啟動成功

    Springboot2.3.x整合Canal的方法

    看下日志

    vim logs/canal/canal.log

    Springboot2.3.x整合Canal的方法

    解決辦法:

    1、canal-admin 先停止后從起

    2、canal server 先以之前的形式運行,不輸入后邊 local 命令

    3、關閉canal server

    4、再以canal server 連接 admin 形式啟動

    Springboot2.3.x整合Canal的方法

    admin頁面上新建server

    Springboot2.3.x整合Canal的方法

    修改配置,注釋 (instance連接信息,我們還是以前邊設置的 admin:leitest 為準,所有這里需要注釋掉,如果不注釋,那么我們代碼中連接則需要使用此賬號以及密碼)

    Springboot2.3.x整合Canal的方法

    接下來咱們創建instance

    如何理解server 和instance 呢,我認為,可以把它當做 java 中的 class 和 bean 即 類和對象

    server 為類 instance 為其具體的實例對象 ,可創建多個不同的實例…

    而我們這邊監聽到主庫變化的呢,則是根據業務,對不同的實例即(instance )做不同配置即可…

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    根據自己情況進行過濾數據

    canal.instance.filter.regexmysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\) 常見例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打頭的表:canal\.canal.* 4. canal schema下的一張表:canal\.test15. 多個規則組合使用:canal\…*,mysql.test1,mysql.test2 (逗號分隔)  
    canal.instance.filter.druid.ddl是否使用druid處理所有的ddl解析來獲取庫和表名true 
    canal.instance.filter.query.dcl是否忽略dcl語句false 
    canal.instance.filter.query.dml是否忽略dml語句 (mysql5.6之后,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL文檔)false 
    canal.instance.filter.query.ddl是否忽略ddl語句false 

    更多設置請見官網:https://github.com/alibaba/canal/wiki/AdminGuide

    如此一來,一個簡單的canal環境就搭建好了,接下來,咱們開始測試吧!

    (4)springboot demo示例

    引入canal所需依賴

    <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.4</version>
            </dependency>

    配置

    canal:
      # instance 實例所在ip
      host: 192.168.96.129
      # tcp通信端口
      port: 11111
      # 賬號  canal-admin application.yml 設置的
      username: admin
      # 密碼
      password: leitest
      #實例名稱
      instance: test

    代碼

    package com.leilei;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    import java.net.InetSocketAddress;
    import java.util.List;
    /**
     * @author lei
     * @version 1.0
     * @date 2020/9/27 22:23
     * @desc 讀取binlog日志
     */
    @Component
    public class ReadBinLogService implements ApplicationRunner {
        @Value("${canal.host}")
        private String host;
        @Value("${canal.port}")
        private int port;
        @Value("${canal.username}")
        private String username;
        @Value("${canal.password}")
        private String password;
        @Value("${canal.instance}")
        private String instance;
        @Override
        public void run(ApplicationArguments args) throws Exception {
            CanalConnector conn = getConn();
            while (true) {
                conn.connect();
                //訂閱實例中所有的數據庫和表
                conn.subscribe(".*\\..*");
                // 回滾到未進行ack的地方
                conn.rollback();
                // 獲取數據 每次獲取一百條改變數據
                Message message = conn.getWithoutAck(100);
                long id = message.getId();
                int size = message.getEntries().size();
                if (id != -1 && size > 0) {
                    // 數據解析
                    analysis(message.getEntries());
                }else {
                    Thread.sleep(1000);
                }
                // 確認消息
                conn.ack(message.getId());
                // 關閉連接
                conn.disconnect();
            }
        }
        /**
         * 數據解析
         */
        private void analysis(List<CanalEntry.Entry> entries) {
            for (CanalEntry.Entry entry : entries) {
                // 只解析mysql事務的操作,其他的不解析
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                    continue;
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                // 解析binlog
                CanalEntry.RowChange rowChange = null;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("解析出現異常 data:" + entry.toString(), e);
                if (rowChange != null) {
                    // 獲取操作類型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    // 獲取當前操作所屬的數據庫
                    String dbName = entry.getHeader().getSchemaName();
                    // 獲取當前操作所屬的表
                    String tableName = entry.getHeader().getTableName();
                    // 事務提交時間
                    long timestamp = entry.getHeader().getExecuteTime();
                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                        dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);
                        System.out.println("-------------------------------------------------------------");
                    }
         * 解析具體一條Binlog消息的數據
         *
         * @param dbName    當前操作所屬數據庫名稱
         * @param tableName 當前操作所屬表名稱
         * @param eventType 當前操作類型(新增、修改、刪除)
        private static void dataDetails(List<CanalEntry.Column> beforeColumns,
                                        List<CanalEntry.Column> afterColumns,
                                        String dbName,
                                        String tableName,
                                        CanalEntry.EventType eventType,
                                        long timestamp) {
            System.out.println("數據庫:" + dbName);
            System.out.println("表名:" + tableName);
            System.out.println("操作類型:" + eventType);
            if (CanalEntry.EventType.INSERT.equals(eventType)) {
                System.out.println("新增數據:");
                printColumn(afterColumns);
            } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                System.out.println("刪除數據:");
                printColumn(beforeColumns);
            } else {
                System.out.println("更新數據:更新前數據--");
                System.out.println("更新數據:更新后數據--");
            System.out.println("操作時間:" + timestamp);
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
         * 獲取連接
        public CanalConnector getConn() {
            return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
    }

    測試查看

    數據庫修改數據庫時

    Springboot2.3.x整合Canal的方法

    數據新增數據時

    Springboot2.3.x整合Canal的方法

    刪除數據(把我們才添加的小明刪掉)

    Springboot2.3.x整合Canal的方法

    當我們操作監控的數據庫DM L操作的時候呢,會被canal監聽到&hellip;我們呢,通過canal監聽,拿到修改的庫,修改的表,修改的字段,便可以根據自己業務進行數據處理了!

    哎,這個時候啊,可能有小伙伴就要問了,那么,我能不能直接獲取其操作的sql語句呢?

    目前,我是自己解析其列來手動拼接的sql語句實現了

    話不多說,先上效果:

    canal 監聽到主庫sql變化----> update students set  id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
    canal 監聽到主庫sql變化----> delete from students where id=6
    canal 監聽到主庫sql變化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','測試新增','深圳','2020-09-27 22:46:53','')
    canal 監聽到主庫sql變化----> update students set  id = '89', age = '98', name = '測試新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89

    Springboot2.3.x整合Canal的方法

    實際上呢,我們也就是拿到其執行前列數據變化 執行后列數據變化,自己拼接了一個sql罷了&hellip;附上代碼

    package com.leilei;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.exception.CanalClientException;
    import com.google.protobuf.InvalidProtocolBufferException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    /**
     * @author lei
     * @version 1.0
     * @date 2020/9/27 22:33
     * @desc 讀取binlog日志
     */
    @Component
    public class ReadBinLogToSql implements ApplicationRunner {
        //讀取的binlog sql 隊列緩存 一邊Push 一邊poll
        private Queue<String> canalQueue = new ConcurrentLinkedQueue<>();
        @Value("${canal.host}")
        private String host;
        @Value("${canal.port}")
        private int port;
        @Value("${canal.username}")
        private String username;
        @Value("${canal.password}")
        private String password;
        @Value("${canal.instance}")
        private String instance;
        @Override
        public void run(ApplicationArguments args) throws Exception {
            CanalConnector conn = getConn();
            while (true) {
                try {
                    conn.connect();
                    //訂閱實例中所有的數據庫和表
                    conn.subscribe(".*\\..*");
                    // 回滾到未進行ack的地方
                    conn.rollback();
                    // 獲取數據 每次獲取一百條改變數據
                    Message message = conn.getWithoutAck(100);
                    long id = message.getId();
                    int size = message.getEntries().size();
                    if (id != -1 && size > 0) {
                        // 數據解析
                        analysis(message.getEntries());
                    } else {
                        Thread.sleep(1000);
                    }
                    // 確認消息
                    conn.ack(message.getId());
                } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 關閉連接
                    conn.disconnect();
                }
            }
        }
        private void analysis(List<Entry> entries) throws InvalidProtocolBufferException {
            for (Entry entry : entries) {
                if (EntryType.ROWDATA == entry.getEntryType()) {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    EventType eventType = rowChange.getEventType();
                    if (eventType == EventType.DELETE) {
                        saveDeleteSql(entry);
                    } else if (eventType == EventType.UPDATE) {
                        saveUpdateSql(entry);
                    } else if (eventType == EventType.INSERT) {
                        saveInsertSql(entry);
                    }
                }
            }
        }
        /**
         * 保存更新語句
         *
         * @param entry
         */
        private void saveUpdateSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> dataList = rowChange.getRowDatasList();
                for (RowData rowData : dataList) {
                    List<Column> afterColumnsList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("update " +
                            entry.getHeader().getTableName() + " set ");
                    for (int i = 0; i < afterColumnsList.size(); i++) {
                        sql.append(" ")
                                .append(afterColumnsList.get(i).getName())
                                .append(" = '").append(afterColumnsList.get(i).getValue())
                                .append("'");
                        if (i != afterColumnsList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(" where ");
                    List<Column> oldColumnList = rowData.getBeforeColumnsList();
                    for (Column column : oldColumnList) {
                        if (column.getIsKey()) {
                            sql.append(column.getName()).append("=").append(column.getValue());
                            break;
                        }
                    }
                    canalQueue.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        /**
         * 保存刪除語句
         *
         * @param entry
         */
        private void saveDeleteSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> columnList = rowData.getBeforeColumnsList();
                    StringBuffer sql = new StringBuffer("delete from " +
                            entry.getHeader().getTableName() + " where ");
                    for (Column column : columnList) {
                        if (column.getIsKey()) {
                            sql.append(column.getName()).append("=").append(column.getValue());
                            break;
                        }
                    }
                    canalQueue.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        /**
         * 保存插入語句
         *
         * @param entry
         */
        private void saveInsertSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> datasList = rowChange.getRowDatasList();
                for (RowData rowData : datasList) {
                    List<Column> columnList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("insert into " +
                            entry.getHeader().getTableName() + " (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append(columnList.get(i).getName());
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(") VALUES (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append("'" + columnList.get(i).getValue() + "'");
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(")");
                    canalQueue.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        /**
         * 獲取連接
         */
        public CanalConnector getConn() {
            return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
        }
        /**
         * 模擬消費canal轉換的sql語句
         */
        public void executeQueueSql() {
            int size = canalQueue.size();
            for (int i = 0; i < size; i++) {
                String sql = canalQueue.poll();
                System.out.println("canal 監聽到主庫sql變化----> " + sql);
            }
        }
    }

    關于“Springboot2.3.x整合Canal的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    平陆县| 松原市| 民乐县| 泰顺县| 图们市| 勃利县| 玛多县| 崇仁县| 通道| 如东县| 同仁县| 淮南市| 武定县| 阳信县| 清河县| 东阿县| 五莲县| 麟游县| 大姚县| 西吉县| 临洮县| 扬中市| 巴林左旗| 新建县| 金寨县| 福建省| 尤溪县| 隆尧县| 醴陵市| 汪清县| 蓬溪县| 剑阁县| 英超| 上林县| 马边| 成都市| 台安县| 四会市| 屏东县| 中方县| 比如县|