中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中broker消息存儲之如何實現拉取消息

發布時間:2021-12-17 14:22:02 來源:億速云 閱讀:259 作者:小新 欄目:大數據

這篇文章給大家分享的是有關RocketMQ中broker消息存儲之如何實現拉取消息的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

在consumer拉取消息時,broker首先會根據待拉取的topic+queueId得到對應的ConsumeQueue,再根據消費offset從ConsumeQueue相應的偏移位置中獲取該消息在commitlog里真實的offset/msgsize/tagscode信息,最后再從commitlog查出消息體。

消息拉取在broker存儲層的調用入口為DefaultMessageStore.getMessage方法。核心邏輯如下:

    // DefaultMessageStore.java
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        // ...

        // 1. 定位ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();

            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                // 2. 從ConsumeQueue中讀取消費偏移offset處的內容
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 單個請求最大拉取數據量
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();  // commitlog offset 8bytes
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes

                            // ...

                            // 3. 通過tagscode快速過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }

                            // 4. 從commitlog獲取消息體
                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }

                            // 5. 通過消息體過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                // release...
                                selectResult.release();
                                continue;
                            }

                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                            // 6.添加到返回結果
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }

                        // ...
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    // ...
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        // ...
        return getResult;
    }

ConsumeQueue中存儲的是固定長度(每個消息20字節)的內容,因此訪問比較簡單:

    // ConsumeQueue.java    
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消費者offset * 固定20字節長度
        if (offset >= this.getMinLogicOffset()) {
            // 定位到所屬的MappedFile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 從MappedFile中讀取實際的數據
                return result;
            }
        }
        return null;
    }

通過ConsumeQueue獲取消息在commitlog中的偏移量以及消息大小之后,獲取消息體的方法如下

    // CommitLog.java
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // 定位消息所在的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size); // 從MappedFile中獲取消息體
        }
        return null;
    }

消息拉取整體流程如下

RocketMQ中broker消息存儲之如何實現拉取消息

感謝各位的閱讀!關于“RocketMQ中broker消息存儲之如何實現拉取消息”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

沅陵县| 泽普县| 大竹县| 平安县| 正定县| 微山县| 东乡县| 阿克| 蒙自县| 台东市| 靖远县| 民和| 肃南| 江华| 浪卡子县| 浑源县| 阜新| 新乡市| 阿图什市| 黄平县| 武清区| 新丰县| 石泉县| 贺州市| 阿合奇县| 岑溪市| 洛南县| 广平县| 揭阳市| 融水| 禹州市| 晋州市| 昭苏县| 玉龙| 石嘴山市| 会宁县| 永年县| 黄龙县| 绵阳市| 江永县| 吴桥县|