中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ主從同步的實例分析以及HA機制原理

發布時間:2021-09-04 11:24:09 來源:億速云 閱讀:132 作者:chen 欄目:大數據

這篇文章主要介紹“RocketMQ主從同步的實例分析以及HA機制原理”,在日常操作中,相信很多人在RocketMQ主從同步的實例分析以及HA機制原理問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ主從同步的實例分析以及HA機制原理”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!


HA 核心類

HA 的實現邏輯放在了 store 存儲模塊的ha目錄中,其核心實現類如下:

  1. HAService:主從同步的核心實現類

  2. HAService$AcceptSocketService:主服務器監聽從服務器連接實現類

  3. HAService$GroupTransferService:主從同步通知類,實現同步復制和異步復制的功能

  4. HAService$HAClient:從服務器連接主服務實現類

  5. HAConnection:主服務端 HA 連接對象的封裝,當主服務器接收到從服務器發過來的消息后,會封裝成一個 HAConnection 對象,其中里面又封裝了讀 Socket 連接實現與 寫 Socket 連接實現:

  • HAConnection$ReadSocketService:主服務器讀實現類

  • HAConnection$WriteSocketService:主服務器寫實現類

RocketMQ 主從同步的整體工作機制大致是:

  1. 從服務器主動建立 TCP 連接主服務器,然后每隔 5s 向主服務器發送 commitLog 文件最大偏移量拉取還未同步的消息;

  2. 主服務器開啟監聽端口,監聽從服務器發送過來的信息,主服務器收到從服務器發過來的偏移量進行解析,并返回查找出未同步的消息給從服務器;

  3. 客戶端收到主服務器的消息后,將這批消息寫入 commitLog 文件中,然后更新 commitLog 拉取偏移量,接著繼續向主服務拉取未同步的消息。

Slave -> Master 過程

從 HA 實現邏輯可看出,可大致分為兩個過程,分別是從服務器上報偏移量,以及主服務器發送未同步消息到從服務器。

從上面的實現類可知,從服務器向主服務器上報偏移量的邏輯在 HAClient 類中,HAClient 類是一個繼承了 ServiceThread 類,即它是一個線程服務類,在 Broker 啟動后,Broker 啟動開一條線程定時執行從服務器上報偏移量到主服務器的任務。

org.apache.rocketmq.store.ha.HAService.HAClient#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      // 主動連接主服務器,獲取socketChannel對象
      if (this.connectMaster()) {
        if (this.isTimeToReportOffset()) {
          // 執行上報偏移量到主服務器
          boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
          if (!result) {
            this.closeMaster();
          }
        }
				// 每隔一秒鐘輪詢一遍
        this.selector.select(1000);

        // 處理主服務器發送過來的消息
        boolean ok = this.processReadEvent();
        if (!ok) {
          this.closeMaster();
        }
        
        // ......
        
      } else {
        this.waitForRunning(1000 * 5);
      }
    } catch (Exception e) {
      log.warn(this.getServiceName() + " service has exception. ", e);
      this.waitForRunning(1000 * 5);
    }
  }

  log.info(this.getServiceName() + " service end");
}

以上是 HAClient 線程 run 方法邏輯,主要是做了主動連接主服務器,并上報偏移量到主服務器,以及處理主服務器發送過來的消息,并不斷循環執行以上邏輯。

org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

private boolean connectMaster() throws ClosedChannelException {
  if (null == socketChannel) {
    String addr = this.masterAddress.get();
    if (addr != null) {
      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
      if (socketAddress != null) {
        this.socketChannel = RemotingUtil.connect(socketAddress);
        if (this.socketChannel != null) {
          this.socketChannel.register(this.selector, SelectionKey.OP_READ);
        }
      }
    }
    this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    this.lastWriteTimestamp = System.currentTimeMillis();
  }
  return this.socketChannel != null;
}

該方法是從服務器連接主服務器的邏輯,拿到主服務器地址并且連接上以后,會獲取一個 socketChannel 對象,接著還會記錄當前時間戳為上次寫入的時間戳,lastWriteTimestamp 的作用時用來計算主從同步時間間隔,這里需要注意一點,如果沒有配置主服務器地址,該方法會返回 false,即不會執行主從復制。

該方法還會調用 DefaultMessageStore 的 getMaxPhyOffset() 方法獲取 commitLog 文件最大偏移量,作為本次上報的偏移量。

org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false;
    }
  }
  return !this.reportOffset.hasRemaining();
}

該方法向主服務器上報已拉取偏移量,具體做法是將 ByteBuffer 讀取位置 position 值為 0,其實跳用 flip() 方法也可以,然后調用 putLong() 方法將 maxOffset 寫入 ByteBuffer,將 limit 設置為 8,跟寫入 ByteBuffer 中的 maxOffset(long 型)大小一樣,最后采取 for 循環將 maxOffset 寫入網絡通道中,并調用 hasRemaining() 方法,該方法的邏輯為判斷 position 是否小于 limit,即判斷 ByteBuffer 中的字節流是否全部寫入到通道中。

Master -> Slave 過程

org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);
      Set<SelectionKey> selected = this.selector.selectedKeys();

      if (selected != null) {
        for (SelectionKey k : selected) {
          if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

            if (sc != null) {
              HAService.log.info("HAService receive new connection, "
                                 + sc.socket().getRemoteSocketAddress());

              try {
                HAConnection conn = new HAConnection(HAService.this, sc);
                conn.start();
                HAService.this.addConnection(conn);
              } catch (Exception e) {
                log.error("new HAConnection exception", e);
                sc.close();
              }
            }
          } else {
            log.warn("Unexpected ops in select " + k.readyOps());
          }
        }

        selected.clear();
      }
    } catch (Exception e) {
      log.error(this.getServiceName() + " service has exception.", e);
    }
  }

  log.info(this.getServiceName() + " service end");
}

主服務器收到從服務器的拉取偏移量后,會封裝成一個 HAConnection 對象,前面也說過 HAConnection 封裝主服務端 HA 連接對象的封裝,其中有讀實現類和寫實現類,start() 方法即開啟了讀寫線程:

org.apache.rocketmq.store.ha.HAConnection#start:

public void start() {
  this.readSocketService.start();
  this.writeSocketService.start();
}

org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;

  if (!this.byteBufferRead.hasRemaining()) {
    this.byteBufferRead.flip();
    this.processPostion = 0;
  }

  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        readSizeZeroTimes = 0;
        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
          int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
          // 從網絡通道中讀取從服務器上報的偏移量
          long readOffset = this.byteBufferRead.getLong(pos - 8);
          this.processPostion = pos;

          // 同步從服務器偏移量
          HAConnection.this.slaveAckOffset = readOffset;
          if (HAConnection.this.slaveRequestOffset < 0) {
            HAConnection.this.slaveRequestOffset = readOffset;
            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
          }

          // 這里主要是同步后需要喚醒相關消息發送線程,實現主從同步是異步還是同步的功能
          HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
        return false;
      }
    } catch (IOException e) {
      log.error("processReadEvent exception", e);
      return false;
    }
  }

  return true;
}

從以上源碼可看出,主服務器接收到從服務器上報的偏移量后,主要作了兩件事:

  1. 獲取從服務器上報的偏移量;

  2. 喚醒主從同步消費者發送消息同步返回的線程,該方法實現了主從同步-同步復制的功能。

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

public void run() {
  HAConnection.log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);

      // 如果slaveRequestOffset=-1,說明讀線程還沒有獲取從服務器的偏移量,繼續循環等待
      if (-1 == HAConnection.this.slaveRequestOffset) {
        Thread.sleep(10);
        continue;
      }

      // 如果nextTransferFromWhere=-1,說明線程剛開始執行數據傳輸
      if (-1 == this.nextTransferFromWhere) {
        // 如果slaveRequestOffset=0,說明從服務器是第一次上報偏移量
        if (0 == HAConnection.this.slaveRequestOffset) {
          // 獲取最后一個 commitLog 文件且還未讀取消費的偏移量
          long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
          // 求出最后一個commitLog偏移量的初始偏移量
          masterOffset =
            masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

          if (masterOffset < 0) {
            masterOffset = 0;
          }

          // 更新 nextTransferFromWhere
          this.nextTransferFromWhere = masterOffset;
        } else {
          // 如果slaveRequestOffset!=0,則將該值賦值給nextTransferFromWhere
          this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
        }

        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                 + "], and slave request " + HAConnection.this.slaveRequestOffset);
      }

      // 判斷上次寫事件是否已全部寫完成
      if (this.lastWriteOver) {

        // 計算是否已到發送心跳包時間
        long interval =
          HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
        // 發送心跳包,以保持長連接
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
            .getHaSendHeartbeatInterval()) {
          // Build Header
          this.byteBufferHeader.position(0);
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(this.nextTransferFromWhere);
          this.byteBufferHeader.putInt(0);
          this.byteBufferHeader.flip();
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
            continue;
        }
      } else {
        this.lastWriteOver = this.transferData();
        if (!this.lastWriteOver)
          continue;
      }

      // 獲取同步消息數據
      SelectMappedBufferResult selectResult =      HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
      if (selectResult != null) {
        int size = selectResult.getSize();
        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
          size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
        }

        long thisOffset = this.nextTransferFromWhere;
        this.nextTransferFromWhere += size;

        selectResult.getByteBuffer().limit(size);
        this.selectMappedBufferResult = selectResult;

        // Build Header
        this.byteBufferHeader.position(0);
        this.byteBufferHeader.limit(headerSize);
        this.byteBufferHeader.putLong(thisOffset);
        this.byteBufferHeader.putInt(size);
        this.byteBufferHeader.flip();

        // 傳輸消息到從服務器
        this.lastWriteOver = this.transferData();
      } else {

        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
      }
    } catch (Exception e) {

      HAConnection.log.error(this.getServiceName() + " service has exception.", e);
      break;
    }
  }

  if (this.selectMappedBufferResult != null) {
    this.selectMappedBufferResult.release();
  }

  this.makeStop();

  readSocketService.makeStop();

  haService.removeConnection(HAConnection.this);

  SelectionKey sk = this.socketChannel.keyFor(this.selector);
  if (sk != null) {
    sk.cancel();
  }

  try {
    this.selector.close();
    this.socketChannel.close();
  } catch (IOException e) {
    HAConnection.log.error("", e);
  }

  HAConnection.log.info(this.getServiceName() + " service end");
}

讀實現類實現邏輯比較長,但主要做了以下幾件事情:

  1. 計算需要拉取的偏移量,如果從服務器第一次拉取,則從最后一個 commitLog 文件的初始偏移量開始同步;

  2. 傳輸消息到從服務器;

  3. 發送心跳包到從服務器,保持長連接。

關于第一步,我還需要詳細講解一下,因為之前有想到一個問題:

把 brokerA 的從服務器去掉,再啟動一臺新的從服務器指向brokerA 主服務器,這時的主服務器的消息是否會全量同步到從服務?

org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

// 求出最后一個commitLog偏移量的初始偏移量
masterOffset =
  masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

從以上邏輯可找到答案,如果有新的從服務器同步主服務器消息,則從最后一個 commitLog 文件的初始偏移量開始同步。

回到最開始開啟 HAClient 線程上報偏移量的方法,我們發現里面還做了一件事:

// 處理主服務器發送過來的消息
boolean ok = this.processReadEvent();

org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
        readSizeZeroTimes = 0;
        // 讀取消息并寫入commitLog文件中
        boolean result = this.dispatchReadRequest();
        if (!result) {
          log.error("HAClient, dispatchReadRequest error");
          return false;
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        // TODO ERROR
        log.info("HAClient, processReadEvent read socket < 0");
        return false;
      }
    } catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false;
    }
  }

  return true;
}

該方法用于處理主服務器發送回來的消息數據,這里用了 while 循環的處理,不斷地從 byteBuffer 讀取數據到緩沖區中,最后調用 dispatchReadRequest 方法將消息數據寫入 commitLog 文件中,完成主從復制最后一個步驟。

到此,關于“RocketMQ主從同步的實例分析以及HA機制原理”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

伽师县| 宁阳县| 金山区| 汶上县| 长宁县| 利津县| 托克托县| 报价| 黄梅县| 星子县| 黎城县| 台江县| 彝良县| 昌乐县| 法库县| 湘潭县| 乌兰察布市| 南宁市| 大厂| 柳河县| 荆门市| 杭锦后旗| 浠水县| 增城市| 绵竹市| 乌鲁木齐县| 龙井市| 舒城县| 潍坊市| 广宗县| 巩留县| 息烽县| 井冈山市| 永春县| 哈尔滨市| 武宁县| 健康| 重庆市| 漳州市| 遵化市| 泌阳县|