中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MySQL特定表全量、增量數據同步到消息隊列怎么實現

發布時間:2021-11-29 09:19:18 來源:億速云 閱讀:218 作者:iii 欄目:開發技術

本篇內容主要講解“MySQL特定表全量、增量數據同步到消息隊列怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MySQL特定表全量、增量數據同步到消息隊列怎么實現”吧!

    1、原始需求

    既要同步原始全量數據,也要實時同步MySQL特定庫的特定表增量數據,同時對應的修改、刪除也要對應。

    數據同步不能有侵入性:不能更改業務程序,并且不能對業務側有太大性能壓力。

    應用場景:數據ETL同步、降低業務服務器壓力。

    2、解決方案

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    3、canal介紹、安裝

    canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。

    工作原理:mysql主備復制實現

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    從上層來看,復制分成三步:

    1. master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);

    2. slave將master的binary log events拷貝到它的中繼日志(relay log);

    3. slave重做中繼日志中的事件,將改變反映它自己的數據。

    canal的工作原理

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    原理相對比較簡單:

    1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議

    2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)

    3. canal解析binary log對象(原始為byte流)

    架構

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    說明:

    • server代表一個canal運行實例,對應于一個jvm

    • instance對應于一個數據隊列 (1個server對應1..n個instance)

    instance模塊:

    • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)

    • eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)

    • eventStore (數據存儲)

    • metaManager (增量訂閱&消費信息管理器)

    安裝

    1、mysql、kafka環境準備

    2、canal下載:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

    3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz

    4、對目錄conf里文件參數配置

    對canal.properties配置:

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    進入conf/example里,對instance.properties配置:

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    5、啟動:bin/startup.sh

    6、日志查看:

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    4、驗證

    1、開發對應的kafka消費者

    package org.kafka;
    
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    
    /**
     *
     * Title: KafkaConsumerTest
     * Description:
     *  kafka消費者 demo
     * Version:1.0.0
     * @author pancm
     * @date 2018年1月26日
     */
    public class KafkaConsumerTest implements Runnable {
    
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUPID = "groupA";
    
        public KafkaConsumerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.7.193:9092");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "latest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            System.out.println("---------開始消費---------");
            try {
                for (; ; ) {
                    msgList = consumer.poll(1000);
                    if (null != msgList && msgList.count() > 0) {
                        for (ConsumerRecord<String, String> record : msgList) {
                            //消費100條就打印 ,但打印的數據不一定是這個規律的
    
                                System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
    
    
    //                            String v = decodeUnicode(record.value());
    
    //                            System.out.println(v);
    
                            //當消費了1000條就退出
                            if (messageNo % 1000 == 0) {
                                break;
                            }
                            messageNo++;
                        }
                    } else {
                        Thread.sleep(11);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    
        public static void main(String args[]) {
            KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    
    
        /*
         * 中文轉unicode編碼
         */
        public static String gbEncoding(final String gbString) {
            char[] utfBytes = gbString.toCharArray();
            String unicodeBytes = "";
            for (int i = 0; i < utfBytes.length; i++) {
                String hexB = Integer.toHexString(utfBytes[i]);
                if (hexB.length() <= 2) {
                    hexB = "00" + hexB;
                }
                unicodeBytes = unicodeBytes + "\\u" + hexB;
            }
            return unicodeBytes;
        }
    
        /*
         * unicode編碼轉中文
         */
        public static String decodeUnicode(final String dataStr) {
            int start = 0;
            int end = 0;
            final StringBuffer buffer = new StringBuffer();
            while (start > -1) {
                end = dataStr.indexOf("\\u", start + 2);
                String charStr = "";
                if (end == -1) {
                    charStr = dataStr.substring(start + 2, dataStr.length());
                } else {
                    charStr = dataStr.substring(start + 2, end);
                }
                char letter = (char) Integer.parseInt(charStr, 16); // 16進制parse整形字符串。
                buffer.append(new Character(letter).toString());
                start = end;
            }
            return buffer.toString();
    
        }
    }

    2、對表bak1進行增加數據

    CREATE TABLE `bak1` (
      `vin` varchar(20) NOT NULL,
      `p1` double DEFAULT NULL,
      `p2` double DEFAULT NULL,
      `p3` double DEFAULT NULL,
      `p4` double DEFAULT NULL,
      `p5` double DEFAULT NULL,
      `p6` double DEFAULT NULL,
      `p7` double DEFAULT NULL,
      `p8` double DEFAULT NULL,
      `p9` double DEFAULT NULL,
      `p0` double DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    
    show create table bak1;
    
    insert into bak1 select '李雷abcv',
      `p1` ,
      `p2` ,
      `p3` ,
      `p4` ,
      `p5` ,
      `p6` ,
      `p7` ,
      `p8` ,
      `p9` ,
      `p0`  from moci limit 10

    3、查看輸出結果:

    MySQL特定表全量、增量數據同步到消息隊列怎么實現

    到此,相信大家對“MySQL特定表全量、增量數據同步到消息隊列怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    通辽市| 团风县| 浠水县| 崇州市| 上栗县| 六盘水市| 宁远县| 万荣县| 岳阳县| 台南县| 芦溪县| 阳春市| 岗巴县| 黔东| 苏尼特右旗| 高州市| 理塘县| 元阳县| 墨江| 托克逊县| 通化市| 竹山县| 胶州市| 武鸣县| 庆安县| 宁河县| 怀柔区| 镇雄县| 仪陇县| 胶南市| 宁晋县| 麻阳| 开平市| 盘锦市| 长白| 广丰县| 满洲里市| 石屏县| 巴林右旗| 德阳市| 景宁|