您好,登錄后才能下訂單哦!
這篇文章主要介紹了RocketMQ中如何實現push consumer順序消費,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
順序消費的邏輯實現在類ConsumeMessageOrderlyService中,為了實現消費的有序性需要對queue進行加鎖,包括:
在broker對message queue加鎖,保證當前client占有該隊列
consumer端對MessageQueue加鎖,保證當前線程占有該隊列
consumer端對ProcessQueue加鎖,保證當前線程占有該隊列
對broker上message queue加鎖是在ConsumeMessageOrderlyService中周期性調度執行的:
// ConsumeMessageOrderlySerivce public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } public synchronized void lockMQPeriodically() { if (!this.stopped) { // 通過LOCK_BATCH_MQ請求在broker批量鎖定mq this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }
ConsumeMessageOrderlyService中的消費請求提交:
// ConsumeMessageOrderlySerivce public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 提交ConsumeRequest,丟棄了入參的msgs,每次都從ProcessQueue中順序獲取 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
順序處理了邏輯:
// ConsumeMessageOrderlyService.ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 1.獲取MessageQueue上的鎖 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 循環處理 // ... // 單個ConsumeRequest最長處理時間默認60s long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 2. 從ProcessQueue順序獲取batchSize個消息 List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; // .... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { // 3. 獲取ProcessQueue上的鎖 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 4. 推給業務處理邏輯 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); // 解鎖 } // ... // 5. 處理消費結果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; // ProcessQueue為空,停止本次推送 } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
在processConsumeResult中主要會執行2步操作:
在ProcessQueue上執行commit(),將前一次takeMessages返回的msgs從緩存中刪除
更新OffsetStore
感謝你能夠認真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實現push consumer順序消費”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。