中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ基本概念及原理是什么

發布時間:2021-12-03 17:35:47 來源:億速云 閱讀:133 作者:柒染 欄目:云計算

這篇文章將為大家詳細講解有關RocketMQ基本概念及原理是什么,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

RocketMQ使用
基本概念
ProducerGroup
    通常具有同樣屬性(處理的消息種類-topic、以及消息處理邏輯流程—分布式多個客戶端)的一些producer可以歸為同一個group。在事務消息機制中,如果某條發送某條消息的producer-A宕機,使得事務消息一直處于PREPARED狀態并超時,則broker會回查 同一個group的其他producer,確認這條消息應該commit還是rollback。
ConsumerGroup
    具有同樣邏輯消費同樣消息的consumer,可以歸并為一個group。同一個group內的消費者,可以共同消費(CLUSTERING)對應topic的消息,達到分布式并行處理的功能。
Topoic
    消息的邏輯管理單位。
Queue
    消息的物理管理單位。一個Topic下可以有多個Queue,Queue的引入使得消息存儲可以分布式集群化,具有了水平擴展的能力。
消費進度管理
    RocketMQ的broker端,不負責推送消息,無論消費者是否消費消息,都將消息存儲起來。誰要消費消息,就向broker發請求獲取消息,消費記錄由consumer來維護。RocketMQ提供了兩種存儲方式來保留消費記錄:一種是保留在consumer所在的服務器上;另一種是保存在broker服務器上。用戶還可以自己實現相應的消費進度存儲接口。
    默認情況下,采用集群消費(CLUSTERING),會將記錄保存在broker端;而采用廣播消費(BROADCASTING)則會將消費記錄保存在本地。
順序消息
    用戶實現MessageQueueSelector為某一批消息(通常是有同樣的唯一的標示ID),選擇同一個Queue,則這一批消息的消費將是順序消費(并由同一個consumer完成消費)。
事務消息
    這樣的消息有多個狀態,并且其發送是兩階段的。第一個階段發送PREPARED狀態的消息,此時consumer是看不見這種狀態的消息的,發送完畢后回調用戶的TransactionExecutor接口,執行相應的事務操作(如數據庫),當事務操作成功時,則對此條消息返回commit,讓broker對該消息執行commit操作,成為commit狀態的消息對consumer是可見的。

基本原理
    總覽
RocketMQ以Topic來管理不同應用的消息。對于生產者而言,發送消息是,需要指定消息的Topic,對于消費者而言,在啟動后,需要訂閱相應的Topic,然后可以消費相應的消息。Topic是邏輯上的概念,在物理實現上,一個Topic由多個Queue組成,采用多個Queue的好處是可以將Broker存儲分布式化,提高系統性能。
    RocketMQ中,producer將消息發送給Broker時,需要制定發送到哪一個隊列中,默認情況下,producer會輪詢的將消息發送到每個隊列中(所有broker下的Queue合并成一個List去輪詢)。
    對于consumer而言,會為每個consumer分配固定的隊列(如果隊列總數沒有發生變化),consumer從固定的隊列中去拉取沒有消費的消息進行處理。
 
Producer
Producer端(屬于client)的邏輯概述:
 
producer端的邏輯都比較簡單,將消息發送到某個Queue中即可,具體發送到那個Queue可以由用戶控制(MessageQueueSelector接口),默認情況下,將輪詢方式選擇Queue。在producer端,會從NameServer將所有Broker的Topic及對應的Queue信息(即:TopicRoute信息)拉取到本地,然后根據<brokerName, queueId>組建成一個List。因此在MessageQueueSelector,可以看到所有的Queue信息。
    RocketMQ將topic的消息以多個Queue來管理,使得其較為容易的就可以進行水平擴展,提供系統吞吐力。這樣分布帶來的問題,就是從全局上不能做到順序性(很多時候也并不需要全局上的順序性)。
RocketMQ提到支持順序消息,實際上是指基于Queue級別的順序。用戶將某些需要滿足順序的一批消息(比如電商某個訂單號的一系列后續操作、比如數據庫的某個主鍵的insert、delete、update等操作)發送到固定的某個Queue中,則從這個Queue消費消息的consumer,針對這一批消息是順序消費。
問題1:針對順序消息的隊列,是否可以做到不停服務下的集群動態擴展?
Consumer
    consumer邏輯稍微復雜一點。初步思考,consumer端至少需要處理:
1、    消息的獲取
2、    offset(消費進度)的管理與存儲
3、    集群消費模式下,Queue的分配問題(rebalance)
RocketMQ對外提供了兩種不同形式的Consumer:PushConsumer和PullConsumer。顧名思義,對于PullConsumer而言,用戶需要主動調用相應的接口去拉取未消費的消息。對于PushConsumer而言,用戶提供消息處理的CallBack,有未曾消費的消息時,會主動回調這個CallBack來處理消息。雖從用戶角度而言,Consumer存在主動(pull)和被動(push),但RocketMQ本身的broker端僅僅保存所有的消息,并不負責push消息,因此PushConsumer的底層實現也是有一個長連接主動去broker上拉取未消費的消息,然后回調用戶的callback邏輯。

    PushConsumer端邏輯概述:
        自己使用PushConsumer的代碼非常簡單:
        
1
2
3
4
5
6
7
8
9
10
11    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“groupName”);
consumer.subscribe(“TopicName”, “*”); // a | b | c
consumer.registerMessaeListener( new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                                  ConsumeConcurrentlyContext context) {
  System.out.println(“Consume Message Num: ” + msgs.size());
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// add shutdown hook to execute consumer.shutdown();
第2行的subscribe即定于某個topic下,符合某些標簽(tag)的消息。這個過濾會在服務端過濾(其實在consumer端也有過濾邏輯)。tag之間用”|”分割開。這些tag會被解析成SubscriptionData來保存信息,主要存儲tag的字符串集合,以及這些tag對應的hashcode集合(在broker端的存儲和過濾其實都是tag值對應的hashcode,可能是為了加速過濾以及節約存儲空間)。
    主要的邏輯在第10行調用start函數之后開始。Consumer的主要實現是DefaultMQPushConsumerImpl,其包含的對象關系簡單如下圖:
 
DefaultMQPushConsumerImpl中各個對象的主要功能如下:
RebalancePushImpl:    主要負責決定,當前的consumer應該從哪些Queue中消費消息;
PullAPIWrapper:        長連接,負責從broker處拉取消息,然后利用ConsumeMessageService回調用戶的Listener執行消息消費邏輯;
ConsumeMessageService:    實現所謂的“Push-被動”消費機制;從Broker拉取的消息后,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回調用戶的Listener消費消息;
OffsetStore:            維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local存儲在本地磁盤上,適用于BROADCASTING廣播消費模式;而Remote則將消費進度存儲在Broker上,適用于CLUSTERING集群消費模式;
MQClientFactory:    大雜燴,負責管理client(consumer、producer),并提供多中功能接口供各個Service(Rebalance、PullMessage等)調用;大部分邏輯均在這個類中完成;


使用
Producer返回值
發送消息時,只有拋出異常,才是發送失敗,其他情況下,根據如下返回值,應用層做相應取舍處理邏輯:
SendStatus
返回值    解釋
SEND_OK    發送成功
FLUSH_DISK_TIMEOUT    發送成功,但broker刷盤失敗,此時如若服務器宕機,消息會丟失;
FLUSH_SLAVE_TIMEOUT    寫從失敗;如果主宕機,消息丟失;
SLAVE_NOT_AVAILABLE    從不可用;
注意:當配置多master無slave的集群時,若master的brokerRole為SYNC_MASTER,則發送消息會一直返回這個值;最新版本(3.1.14以上)事務消息將一直發送失敗(事務消息中處理了返回值不為SEND_OK,則直接進行ROLL_BACK);
當應用方明確指出,producer發送成功為SEND_OK狀態的消息對consumer才是可見的。可以采用事務消息來完成這個功能,RocketMQ 從3.0.14版本開始,對于事務消息,開始檢查SendStatus,如果不為SEND_OK,則直接執行事務消息的回滾。
Consumer返回值
當使用PushConsumer(使用callBack回調執行應用消費邏輯)
非順序消息(ConsumeConcurrentlyStatus)
返回值    解釋
CONSUME_SUCCESS    消費成功
RECONSUME_LATER    消費失敗,稍后重新消費這一批消息
RECONSUMER_LATER的解釋:
    這一批消息均會sendBack到broker上,稍后會重新消費這一批消息。可以通過設置參數,使得批量消費的“批量”為一條,這樣可以一定程度避免重復消費。但這樣設置后,可能效率較低。另外一種方法是在用戶指定的CallBack(MessageListenerConcurrently)中,通過對應的ConsumeConcurrentlyContext參數來控制本批消息從哪一條之后重復消費。
具體方法是控制context的ackIndex變量。這個變量的意思是對于這一批消息(List<MessageExt>),[0, ackIndex]內的消息是成功消費的,而(ackIndex, Lst.size)內的消息是消費失敗的,如果返回值為RECONSUME_LATER,則對于失敗范圍的消息將調用sendBack回發到broker上(從代碼看來,這個功能只對CLUSTERING消費模型的consumer生效,BROADCASTING的直接丟棄)。這里還有個小的tips,在調用SendBack失敗后,會在consumer本地去嘗試重復消費這些回發失敗的消息(構造相應的ConsumeRequest)。這個處理模式(先消費,消費失敗的消息嘗試回發給broker,回發給broker失敗的消息嘗試在consumer端重新消費)一直嘗試,直到消費成功或者回發到broker成功。
順序消息(ConsumerOrderlyStatus)
返回值    解釋
SUCCESS    消息處理成功
ROLLBACK    回滾消息—似乎用在了事務消息中
COMMIT    提交消息—似乎用在了事務消息中
SUSPEND_CURRENT_QUEUE_A_MOMENT    當前隊列掛起一段時間
問題:消費端的ROLLBACK、COMMIT如何理解???
普通消息
    
使用TIPS
集群搭建
1.    基本配置
使用./bin/mqbroker –p >conf/broker.conf,查看所有參數的默認取值;根據自己集群的需要修改對應的配置;
2.    集群選擇
RocketMQ集群支持如下一些模式的配置:
集群模式    特點    適用場景
單Master    一個Broker實例;
或者多個Broker實例,但Topic只在某一個Broker上配置了;    測試
多Master 無Slave    多個Broker實例構成集群,且brokerID均為0(即角色都為Master)    
master掛掉后,這個master上未被消費的消息,暫時不能被消費;
可以容忍消息丟失(未被consumer消費)的應用場景;如日志收集

多Master多Slave    每個master有一個backup的slave    HA高可用;當master-slave采用同步雙寫時(采用異步時,任可能存在部分未寫入slave的消息丟失),master掛掉,也可以從slave處消費消息;但當master掛掉后,目前不支持自動的Failover(因此不支持producer的寫);

疑問: 手動如何切換?是否需要修改Slave的配置為Master,然后重啟broker實例?代價有點高,支持發送命令切換否?

當使用多master無slave的集群搭建方式時,master的brokerRole配置必須為ASYNC_MASTER。如果配置為SYNC_MASTER,則producer發送消息時,返回值的SendStatus會一直是SLAVE_NOT_AVAILABLE。
3.    系統參數優化
參考bin/os.sh中的參數,重點注意物理內存預留參數(vm.min_free_kbytes)。
4.    broker的啟動
采用集群模式時,啟動broker時,需要制定nameserver 地址的list,這里一定要注意,要將所有的nameserver 地址都包含進去;因為rocketmq的nameserver之間并不會同步,均需要broker主動匯報;如果有3個nameserver: A B C,啟動時只制定了A,忘記制定了B C,那么客戶端如果剛好鏈接了B或C去獲取broker的信息,則會獲取失敗。
消息使用
    
問題整理
問題:
1、服務器磁盤配置大概是怎樣(重點想了解磁盤配置、磁盤總容量,例如: N * 4T SATA 7200 )?
2、服務器的磁盤做raid了嗎?做的哪一種raid?刷磁盤模式是SYNC還是ASYNC?
3、rocketmq采用的哪種方式搭建(多master無slave、多master多slave)?如果是master-slave方式,主從同步是SYNC還是ASYNC方式?

解答:
1、一般是3T磁盤,實際是12個600,12* 600G的sas 15000轉的磁盤做了raid10
2、刷盤模式通常是異步方式
3、大部分集群是不開啟slave的, 有少部分集群會開啟 sync方式的slave 

關于RocketMQ基本概念及原理是什么就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

双辽市| 新兴县| 萨嘎县| 东阳市| 辽中县| 顺平县| 平凉市| 夏河县| 青龙| 通城县| 武邑县| 麻栗坡县| 方正县| 遵义市| 高雄市| 启东市| 临邑县| 吉隆县| 称多县| 东城区| 清徐县| 克拉玛依市| 伽师县| 个旧市| 晴隆县| 萨迦县| 湘潭市| 梅州市| 龙口市| 台安县| 双江| 北碚区| 简阳市| 比如县| 广州市| 庆元县| 卢湾区| 阿拉尔市| 嘉禾县| 松江区| 连州市|