中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ事務消息怎么保證消息的可靠性和一致性

發布時間:2023-04-24 15:26:42 來源:億速云 閱讀:130 作者:iii 欄目:開發技術

今天小編給大家分享一下RocketMQ事務消息怎么保證消息的可靠性和一致性的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

在發送事務消息的時候,會加一個標識,表示這個消息是事務消息。broker接收到消息后,在我們之前看的代碼里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage會判斷是否是事務消息。

if (sendTransactionPrepareMessage) {
    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

sendTransactionPrepareMessage=true表示是事務消息,所以走了一個單獨的邏輯。

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

這里parseHalfMessageInner這個方法里面開始了偷梁換柱,把topic和queueId都改了,把原本的信息先存在變量里面。所以實際上這個消息發到了半消息專有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

然后其他代碼還是和普通的消息一樣,就是把事務消息做了轉發,存在了RMQ_SYS_TRANS_HALF_TOPIC里面。

到這里發送半消息就成功了,然后最后客戶端發送了半消息之后,會查一下本地事務的情況是否完成。這里有3種情況:commit、rollback、未知。完成和回滾都是確認的狀態,這個比較好處理,比較難的是未知。我們先看能得到確認結果的情況。

如果完成和回滾,會給客戶端發送結束事務的消息,這個消息叫END_TRANSACTION,包括消息里面包括了之前發送的半消息的id和offset。

broker處理的代碼在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根據offset拿到半消息,然后如果是commit,就是把原本的topic和queueId還原,發到原本的隊列里面,這樣就可以正常消費了。然后把這個半消息“刪除”。如果是rollBack,也是拿到這個半消息,然后直接“刪除”就可以了。接下來看一下怎么“刪除”。

為什么我刪除會打引號呢?因為半消息其實就是跟正常的消息一樣,存在commitLog文件里面,mq的設計,就沒有刪除這個功能。所以所謂的刪除其實就是把這個消息消費掉,不做任何處理,就是刪除了。

想象一下,這個半消息有commit/rollBack/未知,3種狀態,未知的肯定不能刪除,那他怎么知道哪些消息是可以刪除的呢?總不能所有的都再去客戶端查一下事務的結果吧?mq怎么做的呢?前面提到的刪除其實就是把這些commit和rollBack處理過后的半消息,再保存起來,后面消費半消息的數據的時候,只要從里面查一下是否需要刪除就可以了。

這里又有一個問題,怎么把需要刪除的半消息存起來呢?mq存儲數據就是commitLog,所以其實這些需要刪除的數據,就是又發到了一個特定的topic里面。這個topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意區分,原本半消息的topic名字是half_topic,這個topic名字是op_half_topic,存儲的是處理過后,可以刪除的半消息。

所以說前面提到的帶引號的“刪除”,就是把消息發到op_half_topic就表示是刪除了,這個op_half_topic消息的內容就是half_topic的offset。那么現在需要有個地方,來消費half_topic,然后判斷是否存在于op_half_topic,如果是表示可以刪除了,如果不是,就接著保存起來。

處理邏輯就在TransactionalMessageCheckService這個定時任務中。具體是在TransactionalMessageServiceImpl#check方法里面

    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 先拿到半消息
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 拿到半消息的最小偏移量
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 拿到op_half的最小偏移量
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                // 拉取op的消息(32條),op消息內容是half的offset,跟half_topic的最小offset比較,如果op的小于最小的,就說明已經處理過了,放在doneOpOffset,反之,則說明還沒處理過,就先放在removeMap里面
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                // 然后對half_topic進行處理
                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果這個offset已經處理過了,就接著處理下一個
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // 如果沒有處理過,就要把數據撈出來重新投遞
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                            || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                            || valueOfCurrentMinusBorn <= -1;
                        if (isNeedCheck) {
                            // 重新投遞
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 再重新確認事務
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                // 更新offset
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }

我講解一下這個代碼做了啥。我們先明確這個代碼是要實現什么功能。就是消費half_topic,然后去根據op_half_topic的數據來判斷half_topc的消息是否被處理過,處理過了就直接忽略、丟棄,如果沒有處理過,就“保留”這個消息,等待后面事務確認了再處理。

這里“保留”我也是加了引號,因為mq消費是一條一條按順序消費,如果中間有一個數據卡住了,后面數據就沒法消費了。所以這里“保留”,其實也是消費了,只是他消費到了不確定結果的消息,他是重新投遞到了half_topic,來實現“保留”的目的。

好了,明確了這個代碼實現的功能,我們來一步步看一下細節。

首先是拿到half_topic和op_half_topic的offset,知道現在是消費到了哪里。然后去拉取op_half_topic,每次32條,op_half消息內容存的是half_topic的offset,只要判斷這條op_half里面的offset小于half_topic的offset,就表示已經消費過了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示還沒消費,放入removeMap,就表示這個半消息可以放心刪除了。

這一步,通過消費op_half,跟half_topic的minOffset做比較,構建了doneOpOffset,和removeMap。

然后就是消費half_topic的消息,只要判斷每條消息的offset是否在removeMap中,就表示可以刪除,放入doneOpOffset中,直接消費下一條數據,所以這里其實也不用真的拉取half_topic的消息,只要用offset來判斷就行,消費過了,offset+1,就可以去判斷下一條消息。

如果half_topic的offset沒有在removeMap中,就表示暫時還不知道結果,這時候就重新發送到half_topic,重新投遞之后,然后給客戶端發送一個檢查事務的請求,客戶端檢測過后,還是用之前的END_TRANSACTION命令,再發給broker,broker就會放到op_half里面,等于就是重新發了一個半消息的流程,實現了閉環。

最后就是更新兩個topic的offset了。之前的doneOpOffset保存下來,就是為了更新op_half的offset,只有都處理過了,才會更新,如果中間有一個沒有處理,就會阻塞在那條消息。

以上就是“RocketMQ事務消息怎么保證消息的可靠性和一致性”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

婺源县| 抚松县| 台山市| 额尔古纳市| 南安市| 临武县| 茂名市| 长兴县| 酉阳| 江华| 临猗县| 高邑县| 邻水| 泰兴市| 昌图县| 山西省| 榆林市| 黑水县| 大邑县| 桦川县| 邵阳县| 迁西县| 满城县| 新余市| 开江县| 华阴市| 奉化市| 铜山县| 宝丰县| 龙里县| 都江堰市| 晴隆县| 岑巩县| 宁明县| 宜都市| 合山市| 孝感市| 河源市| 柳州市| 临西县| 巴青县|