中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ消費中Broker端處理邏輯的示例分析

發布時間:2021-11-18 09:37:00 來源:億速云 閱讀:211 作者:小新 欄目:大數據

這篇文章主要介紹RocketMQ消費中Broker端處理邏輯的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

一、問題思考

1.Broker是如何處理消費流程的?
2.消費進度是如何流轉的?
說明:本文分析均為PUSH消費模式

二、Broker處理消費流程

本部分將消費的切分成三塊梳理:Broker消費處理流程概覽、查找消息流程、以及消息查詢結果處理流程。

1.Broker消費處理流程概覽

RocketMQ消費中Broker端處理邏輯的示例分析

小結:在拉取消息時會進行Broker和主題讀權限的判斷,實戰中若有必要可以封鎖Broker的拉取權限從而禁止從該broker進行消費;或者封鎖某主題的讀權限禁止消費組從該主題消費消息。

2.查找消息流程

RocketMQ消費中Broker端處理邏輯的示例分析

小結:如果需要從磁盤拉取消息則一次默認最多拉取8條,一次消息的消息大小最大為64K。如果從緩存中拉取默認最多32條,一次拉取的消息大小最大256K。使用tagcode會在查找消息前進行過濾,使用SQL92過濾再消息查找出來后進行過濾。

3.消息查詢結果處理流程

RocketMQ消費中Broker端處理邏輯的示例分析

小結:建議開啟slaveReadEnable=true,當拉取的消息超過Broker內存40%時會從Slave節點消費,Master不必從磁盤重新讀取數據;transferMsgByHeap默認為true即消息先拉取到堆空間再返回到客戶端;如果設置為false則使用Netty#FileRegion,可用零字節拷貝不必再拷貝到堆內存提高性能。

三、消費進度流轉1.客戶端上報消費進度

//@1 順序消費/并發消費流程相同
//ConsumeMessageOrderlyService#processConsumeResult
//ConsumeMessageConcurrentlyService#processConsumeResult
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消費進度偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
@2 RemoteBrokerOffsetStore#updateOffset
AtomicLong offsetOld = this.offsetTable.get(mq);
MixAll.compareAndIncreaseOnly(offsetOld, offset);
@3 offsetTable存儲結構:key為MessageQueue value為消費的偏移量進度
ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
@4 定時同步消費進度
//持久化消息消費進度,默認5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
@5 RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
this.updateConsumeOffsetToBroker(mq, offset.get());

小結:PUSH消費中消費進度存儲在offsetTable中,定時任務每5秒鐘上報Broker一次。

2.Broker端處理消費進度處理客戶端定時上報消費進度

//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset
this.brokerController.getConsumerOffsetManager().commitOffset
//@2 ConsumerOffsetManager#commitOffset
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Long storeOffset = map.put(queueId, offset);
//@3 消費進度緩存結構
//key=topic@group
//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>
offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//@4 5秒鐘一次存儲消費進度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//@5 consumerOffset.json文件格式
"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}

小結:Broker接到客戶端消費進度上報后更新緩存offsetTable,每隔5秒中定時任務將offsetTable消費進度存儲在磁盤文件consumerOffset.json中。

消息拉取后實時更新消費進度

//@1 PullMessageProcessor#processRequest
if (storeOffsetEnable) {
//更新消費進度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

小結:PUSH消費客戶端拉取消息后會實時更新消費的進度。

3.消費進度流轉示意圖

RocketMQ消費中Broker端處理邏輯的示例分析

以上是“RocketMQ消費中Broker端處理邏輯的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

海淀区| 高台县| 江城| 独山县| 苗栗县| 扎兰屯市| 阳城县| 高陵县| 喜德县| 莆田市| 长治县| 得荣县| 鄂尔多斯市| 庐江县| 田林县| 通道| 柳河县| 缙云县| 泾源县| 平乡县| 东乌珠穆沁旗| 榆社县| 疏勒县| 文化| 辽阳县| 扎兰屯市| 营口市| 望江县| 石景山区| 手游| 龙里县| 建湖县| 新平| 湘乡市| 炎陵县| 本溪市| 荔浦县| 武义县| 吉隆县| 化德县| 西峡县|