您好,登錄后才能下訂單哦!
本文檔主要是rocketmq實際代碼使用,常見詞語介紹等查看其他文檔
http://rocketmq.apache.org/release_notes/release-notes-4.3.2/ 二進制文件下載地址,下載后可以直接解壓運行
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-source-release.zip 源碼方式下載地址, 下載后需要自己打包
進入rocketmq的bin目錄
nohup sh mqnamesrv &
進入bin目錄
nohup sh mqbroker -n localhost:9876? autoCreateTopicEnable=true &
集群方式參考集群配置文件RocketMQ集群
默認情況下,我們的服務器都是單獨的獨立服務器,不會出現這種情況,但是我們在測試過程中使用的是虛擬機, 配置不夠,會導致無法啟動
修改runbroker.sh 和 runserver.sh
分別找到下圖中的指示位置
修改內存大小即可,大小請自己按照自己虛擬機的配置適當調整,比如我修改為了以下值
此處非必須,實際開發中使用較少
下載rocketmq-console源碼:https://github.com/apache/rocketmq-externals
進入子目錄rocketmq-console下
執行mvn命令打包
mvn clean package -DskipTests
進入target目錄
rocketmq-console-ng-1.0.0.jar即為springBoot項目
在該目錄下CMD執行命令:
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.89.0.65:9876?
其中
--server.port為運行的這個web應用的端口,如果不設置的話默認為8080;--rocketmq.config.namesrvAddr為RocketMQ命名服務地址,如果不設置的話默認為“”
OK了,訪問下http://localhost:12581試試吧。
或者打包成 war 包扔到 tomcat 中運行
此案例中使用的是一個消費者,所以消費者代碼只有一個
??? <dependencies>
? ??????<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
??????? <dependency>
??????????? <groupId>org.apache.rocketmq</groupId>
??????????? <artifactId>rocketmq-client</artifactId>
??????????? <version>4.3.2</version>
??????? </dependency>
??? </dependencies>
原理:同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。
應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
4.2.1 生產者
/**
?* Created by jackiechan on 18-8-19/下午8:37.
?* 原理:同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。
?*
?* 應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等
?*/
public class SyncProducer01 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new
??????????????? DefaultMQProducer("group1");//groupname 同一個group代表是集群
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設置nameserver地址
??????? //設置實例名字
??????? producer.setInstanceName("producer");//默認不需要設置,會以ip@pid作為名字, ip是機器ip,pid是jvmpid
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //topic和tags在消費者那邊獲取到消息后都可以獲取, 可以通過tag區分消息
??????????? Message msg = new Message("TopicTest" /* Topic 消息所屬的topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? SendResult sendResult = producer.send(msg);
??????????? System.out.printf("%s%n", sendResult);
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}
原理:異步發送是指發送方發出數據后,不等接收方發回響應,接著發送下個數據包的通訊方式。MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback),在執行消息的異步發送時,應用不需要等待服務器響應即可直接返回,通過回調接口接收務器響應,并對服務器的響應結果進行處理。
應用場景:異步發送一般用于鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如用戶視頻上傳后通知啟動轉碼服務,轉碼完成后通知推送轉碼結果等。
4.3.1 生產者
/**
?* Created by jackiechan on 18-8-19/下午10:05
?*
?* @author jackiechan
?* 原理:異步發送是指發送方發出數據后,不等接收方發回響應,接著發送下個數據包的通訊方式。MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback),在執行消息的異步發送時,應用不需要等待服務器響應即可直接返回,通過回調接口接收務器響應,并對服務器的響應結果進行處理。
?*
?* 應用場景:異步發送一般用于鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如用戶視頻上傳后通知啟動轉碼服務,轉碼完成后通知推送轉碼結果等。
?*/
public class AsyncProducer02 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設置nameserver地址
??????? producer.start();
??????? producer.setRetryTimesWhenSendAsyncFailed(0);
??????? for (int i = 0; i < 100; i++) {
??????????? final int index = i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //消息的keys可以作為標記或者傳遞其他消息內容,可以在消費者獲取到消息后獲取keys進行區分
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
? ??????????????????"OrderID188",
??????????????????? "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? //發送異步消息, 通過設置回調來接受服務器給我們返回的消息
??????????? producer.send(msg, new SendCallback() {
??????????????? //當發送成功的時候執行的方法
??????????????? @Override
??????????????? public void onSuccess(SendResult sendResult) {
??????????????????? System.out.printf("%-10d OK %s %n", index,
??????????????????????????? sendResult.getMsgId());
??????????????? }
??????????????? //當發送失敗的時候執行
??????????????? @Override
??????????????? public void onException(Throwable e) {
??????????????????? System.out.printf("%-10d Exception %s %n", index, e);
??????????????????? e.printStackTrace();
??????????????? }
??????????? });
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? //當發送異步消息的時候,producer 不要shutdown,因為回調是異步的,可能在收到回調的時候producer關閉了會出錯
????? //? producer.shutdown();
??? }
}
原理:單向(Oneway)發送特點為只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。
應用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
4.4.1 生產者
/**
?* Created by jackiechan on 18-8-19/下午10:25
?*
?* @author jackiechan
?* 原理:單向(Oneway)發送特點為只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。
?*
?* 應用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
?*/
public class OnewayProducer03 {
??? public static void main(String[] args) throws Exception{
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設置nameserver地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTest" /* Topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? producer.sendOneway(msg);
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}
此消費者可以接收上面三種不同的消息
/**
?* Created by jackiechan on 18-8-19/下午9:50
?*
?* @authoe jackiechan
?*/
public class MqConsumer {
??? public static void main(String[] args) {
?????? ?//同一個group代表是集群
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
??????? consumer.setNamesrvAddr("192.168.3.8:9876");
??????? try {
??????????? consumer.subscribe("TopicTest", "TagA||TagB");//可訂閱多個tag,但是一個消息只能有一個tag
??????????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????????? consumer.registerMessageListener(new MessageListenerConcurrently() {
??????????????? @Override
??????????????? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
??????????????????? Message msg = list.get(0);
??????????????????? //輸出消息內容
??????????????????? System.out.println("收到消息了:"+new String(msg.getBody()));
??????????????????? //此處可以根據消息的tag或者keys來區分消息
??????????????????? if (msg.getTags() != null&&msg.getTags().equals("TagA")) {
??????????????????????? //執行TagA的邏輯
??????????????????????? System.out.println("收到的是taga的消息");
??????????????????? }
??????????????????? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
??????????????? }
??????????? });
??????????? consumer.start();
??????? } catch (MQClientException e) {
??????????? System.out.println("出錯了");
??????? }
??? }
}
消息順序
消息順序是只可以按照消息發送的順序進行消費。一個訂單產生3條消息,訂單創建、付款、訂單完成。消費時只有按照順序消費才有意義,不可能先消費付款消息再消費訂單創建消息,這樣就亂了。另外,多筆訂單又可以并行消費。如何保證呢?
一個訂單產生的消息只能發送給同一個MQ服務器中的同一個分區,并且按順序發送,這樣才能在理論上保證消費者消費時是按照順序消費的,因為一個分區就是一個邏輯隊列。生產者雖然按順序發送,但是第一條消息到達MQ的耗時比第二條多,那么第二條則會被先消費,這樣就又導致消費時不是順序的。那么如何解決呢?可以采取只有第一條被消費者消費成功后再發送第二條。看下圖:
但是如果第一條被發送到消費者后,消費者沒有響應(消費者發送響應但是因為網絡問題丟失或者消費者就沒有收到消息),那么在這種情況下你是繼續發送第二條還是重發第一條呢?如果是嚴格消息順序,那肯定是重發第一條,但是如果是消費者消費后的響應丟失了,那么重發第一條就會造成重復消費。
從另外一方面看,如果不考慮網絡異常,那么要實現嚴格消息,就必須采取一種一對一關系,生產者A的消息對應到MQ服務器1的X隊列,消費者A消費X隊列。這樣串行結構就會造成系統吞吐量太低;更多異常需要處理比如消費端出現問題,那么整個消息隊列就會出現阻塞。RocketMQ通過輪詢所有隊列來確定消息發送到哪一個隊列(負載均衡),比如相同訂單號的消息會被先后發送到統一隊列中。所以RocketMQ
消息重復
造成消費重復的根本原因是網絡不可達,只要有網絡,這種網絡的不穩定因素就存在你無法規避。所以解決這個問題的最好辦法就是繞過它。這就變成了,消費端收到兩個一樣的消息后如何處理,而不是從發送端解決不發送2個一樣的消息。對于消費端的要求就是:
消費端處理業務消息要保持冪等性,也就是同一個東西執行多次會得到相同結果
保證每條消息都有唯一編號切保證消息處理成功與去重表的日志同時出現
第一條好理解,第二條就是利用一張日志表來記錄已經處理成功的消息ID,如果新到的消息ID已經存在表中那么就不再處理這個消息。第一條是在消費端實現的,不屬于消息系統的功能;第二條可以是消息系統實現也可以是業務端實現,處于對消息系統的吞吐量和高可用考慮最好還是由消費端去處理。所以這也就是RocketMQ不解決消息重復的原因。
/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?*/
public class OrderedProducer {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? MQProducer producer = new DefaultMQProducer("example_group_name");
??????? ((DefaultMQProducer) producer).setNamesrvAddr(ServerUtil.SERVERADD);//設置服務器地址,請替換為自己的服務器地址
??? ????//Launch the instance.
??????? producer.start();
??????? String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
??????? for (int i = 0; i < 100; i++) {
??????????? int orderId = i % 10;
??????????? int a=i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
??????????????????? ("Hello RocketMQ==> " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
???????? ???SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
??????????????? @Override
??????????????? public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
??????????????????? // arg的值其實就是orderId
??????????????????? Integer id = (Integer) arg;
??????????????????? // mqs是隊列集合,也就是topic所對應的所有隊列
??????????????????? int index = id % mqs.size();
??????????????????? // 這里根據前面的id對隊列集合大小求余來返回所對應的隊列
??????????????????? System.out.println(index+"====>"+a);
??????????????????? return mqs.get(index);
??????????????? }
??????????? }, orderId);
?????????? // System.out.printf("%s%n", sendResult);
??????? }
??????? //server shutdown
??????? producer.shutdown();
??? }
}
消費者有多個,代碼一致
/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?* 順序消費的場景,一個業務需要從頭到尾按照固定順序執行, 比如訂單的順序是 創建訂單-支付-發貨,必須按照這個順序執行, 就可以通過順序消費來解決這個問題
?*/
public class OrderedConsumer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
??????? consumer.setNamesrvAddr(ServerUtil.SERVERADD);//設置服務器地址,實際開發替換為自己的地址
??????? /**
???????? * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
???????? * 如果非第一次啟動,那么按照上次消費的位置繼續消費
???????? * 這里設置的是一個consumer的消費策略
???????? *? CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
???????? *? CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
???????? *? CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
???????? *
???????? */
??????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????? consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
??????? //設置一個Listener,主要進行消息的邏輯處理
??????? //注意這里使用的是MessageListenerOrderly這個接口
??????? consumer.registerMessageListener(new MessageListenerOrderly() {
??????????? AtomicLong consumeTimes = new AtomicLong(0);
??????????? @Override
??????????? public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
?????? ????????????????????????????????????????????????ConsumeOrderlyContext context) {
??????????????? //返回消費狀態
??????????????? //SUCCESS 消費成功
??????????????? //SUSPEND_CURRENT_QUEUE_A_MOMENT 消費失敗,暫停當前隊列的消費
??????????????? context.setAutoCommit(false);//手動提交
??????????????? System.out.printf(Thread.currentThread().getName()+"消費者1===>" + msgs.get(0).getQueueId() +? "%n"+new String(msgs.get(0).getBody())+ "%n");
??????????????? this.consumeTimes.incrementAndGet();
??????????????? //以下內容模擬收消息失敗,或者回滾等操作
// ???????????????if ((this.consumeTimes.get() % 2) == 0) {
//??????????????????? return ConsumeOrderlyStatus.SUCCESS;
//??????????????? } else if ((this.consumeTimes.get() % 3) == 0) {
//??????????????????? return ConsumeOrderlyStatus.ROLLBACK;
//?????????? ?????} else if ((this.consumeTimes.get() % 4) == 0) {
//??????????????????? return ConsumeOrderlyStatus.COMMIT;
//??????????????? } else if ((this.consumeTimes.get() % 5) == 0) {
//??????????????????? context.setSuspendCurrentQueueTimeMillis(3000);
//???? ???????????????return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//??????????????? }
??????????????? return ConsumeOrderlyStatus.SUCCESS;
??????????? }
??????? });
??????? consumer.start();
??????? System.out.printf("Consumer Started.%n");
??? }
}
經過測試發現,不同隊列的消息收取是無序的,但是同一隊列中消息的收取順序是按照發送順序收取的
/**
?* Created by jackiechan on 2018/8/20/上午10:22
?*/
public class BroadcastProducer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
??????? producer.setNamesrvAddr(ServerUtil.SERVERADD);//設置服務器地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++){
??????????? //發送消息
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
??????????????????? "OrderID188",
??????????????????? ("Hello world==>"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? SendResult sendResult = producer.send(msg);
?? ?????????System.out.printf("%s%n", sendResult);
??????? }
??????? producer.shutdown();
??? }
}
消費者有多個,代碼一致
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。
/**
?* Created by jackiechan on 2018/8/20/上午10:23
?* 廣播模式的應用場景, 一個業務執行完成后需要多個不同的后續業務都執行,那么他們都需要知道前置業務完成,所以大家監聽相同消息,同時獲取消息
?* 比如 電商中商品更新完成后, 可能會需要同時更新 redis 緩存與 solr 搜索引擎
?*/
public class BroadcastConsumer1 {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
??????? consumer.setConsumeMessageBatchMaxSize(10);//每次拉取十條
??????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????? consumer.setNamesrvAddr(ServerUtil.SERVERADD);
??????? //set to broadcast mode,設置消費模式為廣播
??????? consumer.setMessageModel(MessageModel.BROADCASTING);
??????? consumer.subscribe("TopicTest", "TagA || TagC || TagD");
??????? consumer.registerMessageListener(new MessageListenerConcurrently() {
??????????? @Override
??????????? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
??????????????????????????????????????????????????????????? ConsumeConcurrentlyContext context) {
??????????????? System.out.printf(Thread.currentThread().getName() + " 消費者1收到消息 : " + new String(msgs.get(0).getBody()) + "%n");
???????????????