中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ如何實現消息過濾

發布時間:2021-07-08 17:19:35 來源:億速云 閱讀:242 作者:chen 欄目:大數據

這篇文章主要講解了“RocketMQ如何實現消息過濾”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“RocketMQ如何實現消息過濾”吧!

消息過濾包括基于表達式過濾與基于類模式兩種過濾模式。其中表達式過濾又分為TAG和SQL92模式,分別介紹各自的過濾機制,及代碼示例內容,深入探消息過濾的原理。

1、TAG模式過濾

發送消息時我們會為每一條消息設置TAG標簽,同一大類中的消息放在一個主題TOPIC下,但是如果進行分類我們則可以根據TAG進行分類,每一類消費者可能不是關系某個主題下的所有消息,我們就可以通過TAG進行過濾,訂閱關注的某一類數據。

1.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD"};
        for (int i = 0; i < 40; i++) {
            try {
            	String tag = tags[i % tags.length];
            	//構建消息
                Message msg = new Message("GumxTest" /* Topic */,
                		tag /* Tag */,
                    ("RocketMQ消息測試,消息的TAG="+tag+" == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

主題GumxTest,標簽分別是"TagA","TagB","TagC","TagD"每個分別發送10條消息

1.2、consumer

public class Consumer {	
	public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_tags");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("GumxTest", "TagA || TagC");
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	Date date = new Date(msg.getStoreTimestamp());
					    	System.out.println("Consumer1===  存入時間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
			System.out.println("Consumer===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

查看結果:

RocketMQ如何實現消息過濾

消費者組訂閱相同的主題不同的Tag時,如果訂閱是多個Tag則通過“||” 分割

同一個消費者組訂閱的主題,Tag必須相同

2、SQL表達式過濾

SQL92表達式消息過濾,是通過消息的屬性運行SQL過濾表達式進行條件匹配,消息發送時需要設置用戶的屬性putUserProperty方法設置屬性。

支持的語法

  1. 數值比較, 如>, >=, <, <=, BETWEEN, =;

  2. 字符比較, 如=, <>, IN;

  3. IS NULL or IS NOT NULL;

  4. 邏輯連接符AND, OR, NOT;

支持的類型

  1. 數值型, 如123, 3.1415;

  2. 字符型, 如 ‘abc’, 必須用單引號;

  3. NULL, 特殊常數;

  4. 布爾值, TRUE or FALSE;

2.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD"};
        for (int i = 0; i < 40; i++) {
            try {
            	String tag = tags[i % tags.length];
            	//構建消息
                Message msg = new Message("GumxTest" /* Topic */,
                		tag /* Tag */,
                    ("RocketMQ消息測試,消息的TAG="+tag+  ", 屬性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                msg.putUserProperty("age", i+"");
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

2.2、consumer

public class Consumer {	
	public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_sql");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("GumxTest", MessageSelector.bySql("age between 0 and 8"));
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	Date date = new Date(msg.getStoreTimestamp());
					    	System.out.println("Consumer===  存入時間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
			System.out.println("Consumer===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

啟動時:

RocketMQ如何實現消息過濾

啟動消費者時發現啟動失敗,提示不支持SQL92過濾,網上也查了一些資料多說都是說版本太低,但是我使用的是RocketMQ4.2.0版本,已經是很高的版本了。

分析源碼發現BrokerConfig的配置類中有一個屬性 private boolean enablePropertyFilter = false;默認屬性過濾沒有開啟,然而SQL92就是通過屬性來過濾的。問題找到了,我們需要配置broker的屬性在broker配置文件中添加enablePropertyFilter =true,需要依次關閉集群中的Broker、NameSrv服務,配置好后依次啟動NameSrv、Broker服務

再次啟動,啟動成功,查看其結果:

RocketMQ如何實現消息過濾

3、類過濾模式(基于4.2.0版本)

RocketMQ通過定義消息過濾類的接口實現消息過濾

3.1、producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
 
        try {
            for (int i = 0; i < 6000000; i++) {
                Message msg = new Message("TopicFilter",// topic
                    "TagA",// tag
                    ("Hello MetaQ age = " + i ).getBytes());// body
                msg.putUserProperty("age", String.valueOf(i));
 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
                Thread.sleep(1000);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

3.2、consumer

public class Consumer {	 
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupFilter");
        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        // 使用Java代碼,在服務器做消息過濾
        String filterCode = MixAll.file2String("D:\\WorkSoft\\workspace\\rocketmq-example\\src\\main\\java\\cn\\gumx\\rocketmq\\filter\\MessageFilterImpl.java");
        consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                    ConsumeConcurrentlyContext context) {
            	try {
	            	for(MessageExt msg : paramList){
				    	String msgbody = new String(msg.getBody(), "utf-8");
				    	System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgbody);
				    }
            	} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
            	return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

自定義消息的過濾類

public class MessageFilterImpl implements MessageFilter {
	 
	public boolean match(MessageExt msg, FilterContext arg1) {
		String property = msg.getUserProperty("age");
        if (property != null) {
            int age = Integer.parseInt(property);
            if ((age % 3) == 0 && (age > 10)) {
                return true;
            }
        }
        return false;
	}
}

consumer啟動后并沒有出現預期的結果,查了資料也沒有相關介紹,只是和我們代碼一樣的處理邏輯,查看源碼發現,需要啟動filter組件mqfiltersrv服務。

./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &

我們部署的是雙主,mqfiltersrv服務都需要開啟

RocketMQ如何實現消息過濾

RocketMQ如何實現消息過濾

查看服務端結果:

RocketMQ如何實現消息過濾

使用類消息過濾模式,需要額外需要啟動filter組件mqfiltersrv服務,否則消費不了,每個broker都需要啟動一個,相當于加了一層過濾層。

filtersrv 出現了。減少了 Broker 的負擔,又減少了 Consumer 接收無用的消息。當然缺點也是有的,多了一層 filtersrv 網絡開銷

MessageFilterImpl消息過濾實現類中的代碼最好不要帶有中文防止錯誤

注意:RocketMQ4.3.1開始刪除與 mqfilter 服務器相關的腳本,4.3.2刪除客戶端關于mqfilter 客戶端代碼,后面版本不支持該功能。

感謝各位的閱讀,以上就是“RocketMQ如何實現消息過濾”的內容了,經過本文的學習后,相信大家對RocketMQ如何實現消息過濾這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

龙门县| 汉寿县| 北安市| 耒阳市| 福鼎市| 景洪市| 永新县| 浙江省| 雅安市| 获嘉县| 德兴市| 武功县| 平和县| 宁安市| 布尔津县| 阜城县| 高安市| 北票市| 杨浦区| 平和县| 安达市| 阿巴嘎旗| 宣化县| 邯郸市| 平罗县| 大方县| 石阡县| 张家港市| 林西县| 福贡县| 菏泽市| 班玛县| 郯城县| 阿瓦提县| 卢氏县| 中卫市| 内丘县| 上饶市| 蒙山县| 彰化市| 沂南县|