中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Message Queue Selector如何實現順序消費

發布時間:2021-10-19 20:31:31 來源:億速云 閱讀:250 作者:柒染 欄目:大數據

Message Queue Selector如何實現順序消費,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

順序消息的定義:

順序消息是指消息的消費順序和生產順序相同,在某些場景下,必須保證順序消息。比如訂單的生成、付款、發貨.順序消息又分為全局順序消息和部分順序消息,全局順序消息指某一個topic下的所有消息都要保證順序;部分順序消息只要保證某一組消息被順序消費。對于訂單消息來說,只要保證同一個訂單ID的生成、付款、發貨消息按照順序消費即可。

部分順序消費實現原理:

1. 發送端:保證相同訂單ID的各種消息發往同一個MessageQueue(同一個Topic下的某一個queue)

2.消費端:保證同一個MessageQueue里面的消息不被并發處理 (同一個Topic的不同MessageQueue是可以同時消費的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
		producer.setNamesrvAddr("10.76.0.38:9876");
		producer.start();
		for (int i = 0; i < 1000; i++) {
			Order order  = new Order();
			order.orderId = i;
			order.status = "生成";

			Message msg1 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult1={}",sendResult1);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="付款";

			Message msg2 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult2={}",sendResult2);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="發貨";
			Message msg3 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);


			SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					Integer id = (Integer) arg;
					int index = id % mqs.size();
					return mqs.get(index);
				}
				//MessageQueueSelector保證同一個orderId的消息都存儲在同一個MessageQueue。
			}, order.orderId);
			log.info("sendResult3={}",sendResult1);
		}

消費端主要邏輯如下,主要MessageListenerOrderly回調實現同一個MessageQueue里面的消息不會被并發消費:

       //同一個MessageQueue里面的消息要順序消費,不能并發消費。
		//但是同一個Topic的不同MessageQueue是可以同時消費的
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
		consumer.setNamesrvAddr("10.76.0.38:9876");
		consumer.subscribe("test", "");
		consumer.setPullBatchSize(1);
		consumer.setConsumeThreadMin(1);
		consumer.setConsumeThreadMax(1);
	//	consumer.registerMessageListener(new MessageListenerConcurrently() {
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				List<String> messages = new ArrayList<>();

				for (MessageExt msg : msgs) {
					messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
				}
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		consumer.start();
		Thread.currentThread().join();

源碼分析:

我們知道在RocketMQ中是可以給一個消費者實例設置多個線程并發消費的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保證某一個時刻,只有一個消費者的某一個線程在消費某一個MessageQueue的呢?

就在Client模塊的 ConsumeMessageOrderlyService里面,消費者端并不是簡單的禁止并發處理,而是給每一個Consumer Queue加鎖,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消費每個消息之前,需要先獲取這個消息對應的Consumer Queue所對應的鎖,保證同一個Consumer Queue的消息不會被并發消費,但是不同的Consumer Queue的消息是可以并發處理的。

看完上述內容,你們掌握Message Queue Selector如何實現順序消費的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

赣州市| 儋州市| 灵山县| 嘉义市| 广东省| 穆棱市| 剑川县| 齐齐哈尔市| 桓台县| 兴仁县| 鄂托克旗| 宝兴县| 丰宁| 新建县| 全南县| 万盛区| 黄石市| 从化市| 黑龙江省| 公主岭市| 南皮县| 新平| 广南县| 灵台县| 武鸣县| 峡江县| 黄浦区| 广河县| 仁化县| 玉环县| 理塘县| 保德县| 屯昌县| 宝丰县| 时尚| 确山县| 新疆| 新野县| 墨竹工卡县| 宜宾市| 台中县|