中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ如何快速入門

發布時間:2021-10-20 17:58:44 來源:億速云 閱讀:95 作者:柒染 欄目:大數據

RocketMQ如何快速入門,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

本章簡單講講RocketMQ的入門操作,消息發送和消息接收。

引入 rocketmq-client

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>

編寫Producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
            	//構建消息
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

查看結果

RocketMQ如何快速入門

編寫Consumer

public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_test_push");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	System.out.println("  MessageBody: "+ msgbody);//輸出消息內容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

查看結果

RocketMQ如何快速入門

看到消費的結果大家可能有疑問,我們生產消息的時候是按照順序生產的消息,消費時候為什么不是順序消費下來的。

MQ消息的無序性,每個主題對應多個隊列,生產消息時是根據算法放置不同的隊列中,消費則就是無序了(有序消息后面討論)

也有可能出現一條消息被消費了多次,RocketMQ的目標就是不丟數據,<u>每條消息至少發送一次</u>,內部通過ACK的確認機制實現的后面會重點討論

消息管控臺

為了方便的查看消息的詳情我們可以通過消息的管控臺更好的管理和查看消息詳情,當然我們也可以通過后臺的提供的命令來為運維提供更多的管理。

RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

可以直接下載到本地之后通過mavne進行編譯獲取jar,該項目是SpringBoot項目

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

丟到linux服務器上啟動

(1)啟動時設置具體的RocketMQ的參數

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876

(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值,默認端口12581

瀏覽器登錄查看控制臺信息

RocketMQ如何快速入門

查看RocketMQ集群的節點信息

RocketMQ如何快速入門

根據主題時間段查詢消息

RocketMQ如何快速入門

查看某條消息的具體信息

RocketMQ如何快速入門

管控臺提供了很多運維功能能極大的提高我們的運維效率,里面的功能包括創建主題、修改主題、發送消息、對消費者的信息進行查看等功能我們不一一介紹,可以簡單的了解使用。

看完上述內容,你們掌握RocketMQ如何快速入門的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西宁市| 阳东县| 陵水| 赤城县| 海晏县| 两当县| 三穗县| 抚松县| 水富县| 溧水县| 祁连县| 永修县| 且末县| 炎陵县| 建德市| 榆林市| 浮山县| 韶山市| 茶陵县| 商水县| 独山县| 昌乐县| 镇雄县| 泾川县| 同心县| 察哈| 克拉玛依市| 仲巴县| 凯里市| 双桥区| 诸城市| 北宁市| 石屏县| 龙口市| 大洼县| 瓮安县| 彰武县| 宜城市| 隆尧县| 江津市| 四川省|