中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中事務消息狀態回查的示例分析

發布時間:2021-12-18 11:20:57 來源:億速云 閱讀:246 作者:小新 欄目:大數據

這篇文章主要介紹RocketMQ中事務消息狀態回查的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

學習事務狀態消息回查,我們知道,第一次提交到消息服務器時消息的主題被替換為RMQ_SYS_TRANS_HALF_TOPIC,本地事務執行完后如果返回本地事務狀態為UN_KNOW時,第二次提交到服務器時將不會做任何操作,也就是說此時消息還存在與RMQ_SYS_TRANS_HALF_TOPIC主題中,并不能被消息消費者消費,那這些消息最終如何被提交或回滾呢?

原來RocketMQ使用TransactionalMessageCheckService線程定時去檢測
RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀態。TransactionalMessageCheckService的檢測頻率默認1分鐘,可通過在broker.conf文件中設置transactionCheckInterval的值來改變默認值,單位為毫秒。

接下來將深入分析該線程的實現原理,從而解開事務消息回查機制。

TransactionalMessageCheckService#onWaitEndprotected void onWaitEnd() {        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();         // @1
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();    // @2
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());       // @3
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

代碼@1:從broker配置文件中獲取transactionTimeOut參數值。
代碼@2:從broker配置文件中獲取transactionCheckMax參數值,表示事務的最大檢測次數,如果超過檢測次數,消息會默認為丟棄,即回滾消息。

接下來重點分析TransactionalMessageService#check的實現邏輯:

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
TransactionalMessageServiceImpl#check
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) {
      log.warn("The queue of topic is empty :" + topic);      return;
}

step1:根據主題名稱,獲取該主題下所有的消息隊列。

TransactionalMessageServiceImpl#checkfor (MessageQueue messageQueue : msgQueues) {    // ...}

Step2:循環遍歷消息隊列,從單個消息消費隊列去獲取消息。

TransactionalMessageServiceImpl#checklong startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);if (halfOffset < 0 || opOffset < 0) {
     log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);     continue;
}

Step3:獲取對應的操作隊列,其主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC,然后獲取操作隊列的消費進度、待操作的消費隊列的消費進度,如果任意一小于0,忽略該消息隊列,繼續處理下一個隊列。

TransactionalMessageServiceImpl#check
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);if (null == pullResult) {
      log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
      messageQueue, halfOffset, opOffset);      continue;
}

Step4:調用fillOpRemoveMap主題填充removeMap、doneOpOffset數據結構,這里主要的目的是避免重復調用事務回查接口,這里說一下RMQ_SYS_TRANS_HALF_TOPIC、RMQ_SYS_TRANS_OP_HALF_TOPIC這兩個主題的作用。
RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主題,事務消息首先先進入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當消息服務器收到事務消息的提交或回滾請求后,會將消息存儲在該主題下。

TransactionalMessageServiceImpl#check// single threadint getMessageNullCount = 1;long newOffset = halfOffset;long i = halfOffset;                         // @1 while (true) {                                   
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {                                        // @2
          log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);        break;
      }      if (removeMap.containsKey(i)) {         // @3
            log.info("Half offset {} has been committed/rolled back", i);
            removeMap.remove(i);
      } else {
            GetResult getResult = getHalfMsg(messageQueue, i);      // @4
            MessageExt msgExt = getResult.getMsg();   
            if (msgExt == null) {       // @5
                if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {      
                    break;
                }                if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                       log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                       messageQueue, getMessageNullCount, getResult.getPullResult());                       break;
               } else {
                       log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                       i = getResult.getPullResult().getNextBeginOffset();
                       newOffset = i;                       continue;
               }
       }       if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {    // @6
                listener.resolveDiscardMsg(msgExt);
                newOffset = i + 1;
                i++;                continue;
       }       if (msgExt.getStoreTimestamp() >= startTime) {
               log.info("Fresh stored. the miss offset={}, check it later, store={}", i,                                new Date(msgExt.getStoreTimestamp()));               break;
       }       long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();     // @7
       long checkImmunityTime = transactionTimeout;                                                                           
       String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);       if (null != checkImmunityTimeStr) {  // @8
             checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);             if (valueOfCurrentMinusBorn < checkImmunityTime) {                   if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
                          newOffset = i + 1;
                          i++;                         continue;
                    }
              }
        } else {   // @9
              if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                    log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));                    break;
               }
       }
       List<MessageExt> opMsg = pullResult.getMsgFoundList();       boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                 || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                 || (valueOfCurrentMinusBorn <= -1);     // @10
        if (isNeedCheck) {                if (!putBackHalfMsgQueue(msgExt, i)) {    // @11
                       continue;
                }
                listener.resolveHalfMsg(msgExt);
        } else {
                pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);   // @12
                log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);                continue;
        }
   }
  newOffset = i + 1;
  i++;
}if (newOffset != halfOffset) {    // @13
     transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {  // @14                       
     transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}

本段代碼比較長,卻是事務狀態回查的重點實現。
代碼@1:先解釋幾個局部變量的含義。

  • getMessageNullCount :獲取空消息的次數

  • newOffset :當前處理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新進度

  • i:當前處理消息的隊列偏移量,其主題依然為RMQ_SYS_TRANS_HALF_TOPIC。

代碼@2:這段代碼應該不陌生,這是RocketMQ處理任務的一個通用處理邏輯,就是一個任務處理,可以限制每次最多處理的時間,RocketMQ為待檢測主題RMQ_SYS_TRANS_HALF_TOPIC的每個隊列,做事務狀態回查,一次最多不超過60S,目前該值不可配置。

代碼@3:如果removeMap中包含當前處理的消息,則繼續下一條,removeMap中的值是通過Step3中填充的,具體實現邏輯是從RMQ_SYS_TRANS_OP_HALF_TOPIC主題中拉取32條,如果拉取的消息隊列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId當前的處理進度時,會添加到removeMap中,表示已處理過。
代碼@4:根據消息隊列偏移量i從消費隊列中獲取消息。
代碼@5:如果消息為空,則根據允許重復次數進行操作,默認重試一次,目前不可配置。其具體實現為:

  1. 如果超過重試次數,直接跳出,結束該消息隊列的事務狀態回查。

  2. 如果是由于沒有新的消息而返回為空(拉取狀態為:PullStatus.NO_NEW_MSG),則結束該消息隊列的事務狀態回查。
    1.其他原因,則將偏移量i設置為: getResult.getPullResult().getNextBeginOffset(),重新拉取。

代碼@6:判斷該消息是否需要discard(吞沒,丟棄,不處理)、或skip(跳過),其依據如下:

  1. needDiscard 依據:如果該消息回查的次數超過允許的最大回查次數,則該消息將被丟棄,即事務消息提交失敗,不能被消費者消費,其做法,主要是每回查一次,在消息屬性TRANSACTION_CHECK_TIMES中增1,默認最大回查次數為5次。

  2. needSkip依據:如果事務消息超過文件的過期時間,默認72小時(具體請查看RocketMQ過期文件相關內容),則跳過該消息。

代碼@7:處理事務超時相關概念,先解釋幾個局部變量:、

  • valueOfCurrentMinusBorn :該消息已存儲的時間,等于系統當前時間減去消息存儲的時間戳。

  • checkImmunityTime :立即檢測事務消息的時間。

  • transactionTimeout:事務消息的超時時間,其設計的意義是,應用程序在發送事務消息后,事務不會馬上提交,該時間就是假設事務消息發送成功后,應用程序事務提交的時間,在這段時間內,RocketMQ任務事務未提交,故不應該在這個時間段向應用程序發送回查請求。

    代碼@8:如果消息指定了事務消息過期時間屬性(PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS),如果當前時間已超過該值。

代碼@9:如果當前時間還未過(應用程序事務結束時間),則跳出本次回查處理的,等下一次再試。

代碼@10:判斷是否需要發送事務回查消息,具體邏輯:

  1. 如果從操作隊列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中沒有已處理消息并且已經超過(應用程序事務結束時間),參數transactionTimeOut值。

  2. 如果操作隊列不為空,并且最后一天條消息的存儲時間已經超過transactionTimeOut值。

代碼@11:如果需要發送事務狀態回查消息,則先將消息再次發送到RMQ_SYS_TRANS_HALF_TOPIC主題中,發送成功則返回true,否則返回false,這里還有一個實現關鍵點:

if (putMessageResult != null
            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            msgExt.setQueueOffset(
                putMessageResult.getAppendMessageResult().getLogicsOffset());
            msgExt.setCommitLogOffset(
                putMessageResult.getAppendMessageResult().getWroteOffset());
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
}

如果發送成功,會將該消息的queueOffset、commitLogOffset設置為重新存入的偏移量,為什么需要這樣呢,答案在listener.resolveHalfMsg(msgExt)中。

AbstractTransactionalMessageCheckListener#resolveHalfMsgpublic void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {            @Override
            public void run() {                try {
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    }

發送具體的事務回查機制,這里用一個線程池來異步發送回查消息,為了回查進度保存的簡化,這里只要發送了回查消息,當前回查進度會向前推動,如果回查失敗,上一步驟新增的消息將可以再次發送回查消息,那如果回查消息發送成功,那會不會下一次又重復發送回查消息呢?這個可以根據OP隊列中的消息來判斷是否重復,如果回查消息發送成功并且消息服務器完成提交或回滾操作,這條消息會發送到OP隊列中,然后fillOpRemoveMap根據處理進度獲取一批已處理的消息,來與消息判斷是否重復,由于fillopRemoveMap一次只拉32條消息,那又如何保證一定能拉取到與當前消息的處理記錄呢?其實就是通過代碼@10來實現的,如果此批消息最后一條未超過事務延遲消息,則繼續拉取更多消息進行判斷(@12)和(@14),op隊列也會隨著回查進度的推進而推進。

代碼@12:如果無法判斷是否發送回查消息,則加載更多的已處理消息進行刷選。

代碼@13:保存(Prepare)消息隊列的回查進度。

代碼@14:保存處理隊列(op)的進度。

上述講解了TransactionalMessageCheckService回查定時線程的發送回查消息的整體流程與實現細節,接下來重點分析一下上述步驟@11,通過異步方式發送消息回查的實現過程。

AbstractTransactionalMessageCheckListener#sendCheckMessagepublic void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();   
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());    // @1
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);      // @2
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);     // @3
        Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);        if (channel != null) {
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);     // @4
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

代碼@1:首先構建回查事務狀態請求消息,請求核心參數包括:消息offsetId、消息ID(索引)、消息事務ID、事務消息隊列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)。
代碼@2:恢復原消息的主題、隊列,并設置storeSize為0。
代碼@3:獲取生產者組名稱。
代碼@4:根據生產者組獲取任意一個生產者,通過與其連接發送事務回查消息,回查消息的請求者為【Broker服務器】,接收者為(client,具體為消息生產者)。
其處理類為:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest,其詳細邏輯實現方法為:

ClientRemotingProcessor#checkTransactionStatepublic RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);        if (messageExt != null) {
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);            if (null != transactionId && !"".equals(transactionId)) {
                messageExt.setTransactionId(transactionId);
            }            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);            if (group != null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);                if (producer != null) {                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);     // @1
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }        return null;
    }

代碼@1:最終調用生產者的checkTransactionState方法。

DefaultMQProducerImpl#checkTransactionStatepublic void checkTransactionState(final String addr, final MessageExt msg,        final CheckTransactionStateRequestHeader header) {
        Runnable request = new Runnable() {        // @1
            private final String brokerAddr = addr;            private final MessageExt message = msg;            private final CheckTransactionStateRequestHeader checkRequestHeader = header;            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();            @Override
            public void run() {
                TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();      // @1
                if (transactionCheckListener != null) {
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;                    try {
                        localTransactionState = transactionCheckListener.checkLocalTransaction(message);            // @2
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }                    this.processTransactionState(                                                                                                       // @3
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }            private void processTransactionState(                final LocalTransactionState localTransactionState,                final String producerGroup,                final Throwable exception) {                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true);
                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (uniqueKey == null) {
                    uniqueKey = message.getMsgId();
                }
                thisHeader.setMsgId(uniqueKey);
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());                switch (localTransactionState) {                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);                        break;                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);                        break;                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);                        break;                    default:                        break;
                }
                String remark = null;                if (exception != null) {
                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
                }                try {
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
        };        this.checkExecutor.submit(request);    
    }

上述代碼雖多,其實實現思路非常清晰,先使用一個匿名類( Runnable )構建一個運行任務,然后提交到checkExecutor線程池中執行,這與我第一篇文章的猜測是吻合的,那重點分析一下該任務的允許邏輯,對應在run方法中。
代碼@1:獲取消息發送者的TransactionListener。
代碼@2:執行TransactionListener#checkLocalTransaction,檢測本地事務狀態,也就是應用程序需要實現TransactionListener#checkLocalTransaction,告知RocketMQ該事務的事務狀態,然后返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一個,然后向Broker發送END_TRANSACTION命令即可,
代碼@3:發送END_TRANSACTION到Broker,其具體實現,已經在 https://blog.csdn.net/prestigeding/article/details/81263833 中詳細講解過,在此不重復分析。

到這里,事務消息狀態回查流程就講解完畢,接下來以一張流程圖結束本篇的講解。

RocketMQ中事務消息狀態回查的示例分析cdn.com/bef0a9f8968cd51c850915dde70d2260ee47745e.png">

以上是“RocketMQ中事務消息狀態回查的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

广丰县| 乐山市| 绥化市| 惠安县| 娱乐| 扶绥县| 榕江县| 彭水| 定安县| 晋中市| 绍兴县| 宕昌县| 栖霞市| 呼和浩特市| 平顺县| 雅安市| 安丘市| 牙克石市| 兴义市| 万山特区| 昭苏县| 汉寿县| 昆山市| 龙南县| 宜城市| 阿合奇县| 临安市| 武冈市| 忻州市| 哈尔滨市| 巴彦县| 兴山县| 宾川县| 临泉县| 依安县| 康马县| 绍兴县| 鲜城| 阿图什市| 佛教| 松阳县|