中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中如何實現push consumer順序消費

發布時間:2021-12-17 14:21:25 來源:億速云 閱讀:314 作者:小新 欄目:大數據

這篇文章主要介紹了RocketMQ中如何實現push consumer順序消費,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

順序消費的邏輯實現在類ConsumeMessageOrderlyService中,為了實現消費的有序性需要對queue進行加鎖,包括:

  • 在broker對message queue加鎖,保證當前client占有該隊列

  • consumer端對MessageQueue加鎖,保證當前線程占有該隊列

  • consumer端對ProcessQueue加鎖,保證當前線程占有該隊列

對broker上message queue加鎖是在ConsumeMessageOrderlyService中周期性調度執行的:

    // ConsumeMessageOrderlySerivce    
    public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            // 通過LOCK_BATCH_MQ請求在broker批量鎖定mq
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

ConsumeMessageOrderlyService中的消費請求提交:

    // ConsumeMessageOrderlySerivce  
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) { 
            // 提交ConsumeRequest,丟棄了入參的msgs,每次都從ProcessQueue中順序獲取
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

順序處理了邏輯:

    // ConsumeMessageOrderlyService.ConsumeRequest
    public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            // 1.獲取MessageQueue上的鎖
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) { // 循環處理
                        // ...
                        // 單個ConsumeRequest最長處理時間默認60s
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }

                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                        // 2. 從ProcessQueue順序獲取batchSize個消息
                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            ConsumeMessageContext consumeMessageContext = null;
                            // ....

                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                // 3. 獲取ProcessQueue上的鎖
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                // 4. 推給業務處理邏輯
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                                hasException = true;
                            } finally {
                                this.processQueue.getLockConsume().unlock(); // 解鎖
                            }

                            // ...
                            // 5. 處理消費結果
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false; // ProcessQueue為空,停止本次推送
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

在processConsumeResult中主要會執行2步操作:

  • 在ProcessQueue上執行commit(),將前一次takeMessages返回的msgs從緩存中刪除

  • 更新OffsetStore

感謝你能夠認真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實現push consumer順序消費”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

邯郸市| 府谷县| 五峰| 泗洪县| 柳州市| 安宁市| 英德市| 江陵县| 金门县| 通化市| 米林县| 田东县| 准格尔旗| 兴文县| 巴彦淖尔市| 南昌县| 凤翔县| 通河县| 武穴市| 平罗县| 涞水县| 潢川县| 攀枝花市| 永嘉县| 蓬安县| 壶关县| 郁南县| 伊宁县| 江口县| 肇庆市| 苏州市| 安新县| 县级市| 图木舒克市| 怀来县| 若尔盖县| 洛宁县| 沁水县| 鹤峰县| 阳曲县| 长宁县|