中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何理解RocketMQ消費位置

發布時間:2021-10-20 17:56:32 來源:億速云 閱讀:206 作者:柒染 欄目:大數據

這篇文章給大家介紹如何理解RocketMQ消費位置,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

RocketMQ創建消費者的時指定了Topic主題及Tag,我們發現新創建的消費者消費不了歷史的數據,只能消費掉創建以后消費者發送的數據。這是什么原因,我們能把所有的消息都消費嗎?,我們可以指定需要消費的消息的時間嗎?答案是肯定的,下面我們具體分析一下。

前提:我們討論是集群模式下的,廣播模式也是一樣的,只是示例代碼我們用集群模式來討論。

消息消費的位置目前提供了三種方式CONSUME_FROM_LAST_OFFSET(隊列尾部消費)、CONSUME_FROM_FIRST_OFFSET(隊列頭部消費)、CONSUME_FROM_TIMESTAMP(指定消費時間點)。

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}

分析源碼我們看到有6種方式,其他三種已經廢棄掉了,不做討論。

1、從隊列尾部消費(默認)

我們從新創建一個消費組來消費某個主題下的消息時,歷史消息沒有被消費,當生產者重新發送消息時則會接收到最新的,我們分析下其在哪設置的。

如何理解RocketMQ消費位置

當創建消費者的時候內置了一些參數,從隊列尾部消費。

從隊列尾部消費導致歷史消息消費不了,會丟失一部分數據,如果僅僅是狀態數據則可以這樣設置,如果是業務數據導致數據丟失。

對于設置這個參數僅對于消費組第一次創建時生效,后面再次設置不生效,因為該消費組在服務端已經記錄了消費的進度,已有進度位置。

查看消費進度文件的位置,我們根據上幾節的內容查看TopicTest主題下的這個consumer_test_clustering消費組的消息消費的進度。查看Broker-a服務器節點上的信息。

查看消費的消費進度先根據可視化界面查看

如何理解RocketMQ消費位置

查看服務器文件上的消費進度信息:/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

如何理解RocketMQ消費位置

2、從隊列頭部消費

編寫Consumer

public class Consumer1 {	
	public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_first_offset");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	Date date = new Date(msg.getStoreTimestamp());
					    	System.out.println("Consumer1===  存入時間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
			System.out.println("Consumer1===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

設置了消費組:consumer.setConsumerGroup("consumer_first_offset");

設置了消費位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

查看其結果

如何理解RocketMQ消費位置

從頭開始消費是指目前還儲存在broker的上的消息全部消費一遍,因為RocketMQ會將消息持久化到磁盤文件中,時間長就會導致磁盤文件會很多,RocketMQ有一種機制,只是保留一段時間的消息,之前的消息會刪除,可以指定時間點刪除(無論消息是否被消費,到時間點文件都會被刪除)

3、從指定時間點消費

消費者代碼

public class Consumer1 {	
	public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_time_offset");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			//可以設置從什么時間開始消費,配合setConsumeTimestamp一起使用默認半小時之前的,格式yyyyMMddhhmmss
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
			consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L)); 
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	Date date = new Date(msg.getStoreTimestamp());
					    	System.out.println("Consumer1===  存入時間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
			System.out.println("Consumer1===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

設置消費位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); 設置消費的時間點:consumer.setConsumeTimestamp("20181222171201");

如果從消息進度服務OffsetStore讀取到MessageQueue中的偏移量大于等于0,則使用讀取到的偏移量,只有讀取到的偏移量小于0時上述策略才會生效。

關于如何理解RocketMQ消費位置就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

名山县| 平利县| 乡城县| 甘德县| 天柱县| 彩票| 千阳县| 罗甸县| 蕲春县| 丰顺县| 肥西县| 万全县| 宜宾市| 双江| 长乐市| 南澳县| 平山县| 黄冈市| 邢台市| 如皋市| 大关县| 庄浪县| 巴彦县| 德清县| 济南市| 阿尔山市| 甘南县| 晋江市| 凉城县| 沧州市| 天津市| 威远县| 曲阜市| 吉安市| 赞皇县| 仪陇县| 抚宁县| 邵阳县| 新晃| 安陆市| 无极县|