您好,登錄后才能下訂單哦!
這篇文章主要講解了“RocketMQ如何實現消息過濾”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“RocketMQ如何實現消息過濾”吧!
消息過濾包括基于表達式過濾與基于類模式兩種過濾模式。其中表達式過濾又分為TAG和SQL92模式,分別介紹各自的過濾機制,及代碼示例內容,深入探消息過濾的原理。
發送消息時我們會為每一條消息設置TAG標簽,同一大類中的消息放在一個主題TOPIC下,但是如果進行分類我們則可以根據TAG進行分類,每一類消費者可能不是關系某個主題下的所有消息,我們就可以通過TAG進行過濾,訂閱關注的某一類數據。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = {"TagA","TagB","TagC","TagD"}; for (int i = 0; i < 40; i++) { try { String tag = tags[i % tags.length]; //構建消息 Message msg = new Message("GumxTest" /* Topic */, tag /* Tag */, ("RocketMQ消息測試,消息的TAG="+tag+" == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
主題GumxTest,標簽分別是"TagA","TagB","TagC","TagD"每個分別發送10條消息
public class Consumer { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_tags"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("GumxTest", "TagA || TagC"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("Consumer===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
查看結果:
消費者組訂閱相同的主題不同的Tag時,如果訂閱是多個Tag則通過“||” 分割
同一個消費者組訂閱的主題,Tag必須相同
SQL92表達式消息過濾,是通過消息的屬性運行SQL過濾表達式進行條件匹配,消息發送時需要設置用戶的屬性putUserProperty方法設置屬性。
支持的語法:
數值比較, 如>
, >=
, <
, <=
, BETWEEN
, =
;
字符比較, 如=
, <>
, IN
;
IS NULL
or IS NOT NULL
;
邏輯連接符AND
, OR
, NOT
;
支持的類型:
數值型, 如123, 3.1415;
字符型, 如 ‘abc’, 必須用單引號;
NULL
, 特殊常數;
布爾值, TRUE
or FALSE
;
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = {"TagA","TagB","TagC","TagD"}; for (int i = 0; i < 40; i++) { try { String tag = tags[i % tags.length]; //構建消息 Message msg = new Message("GumxTest" /* Topic */, tag /* Tag */, ("RocketMQ消息測試,消息的TAG="+tag+ ", 屬性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.putUserProperty("age", i+""); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
public class Consumer { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_sql"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("GumxTest", MessageSelector.bySql("age between 0 and 8")); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("Consumer===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
啟動時:
啟動消費者時發現啟動失敗,提示不支持SQL92過濾,網上也查了一些資料多說都是說版本太低,但是我使用的是RocketMQ4.2.0版本,已經是很高的版本了。
分析源碼發現BrokerConfig的配置類中有一個屬性 private boolean enablePropertyFilter = false;默認屬性過濾沒有開啟,然而SQL92就是通過屬性來過濾的。問題找到了,我們需要配置broker的屬性在broker配置文件中添加enablePropertyFilter =true,需要依次關閉集群中的Broker、NameSrv服務,配置好后依次啟動NameSrv、Broker服務
再次啟動,啟動成功,查看其結果:
RocketMQ通過定義消息過濾類的接口實現消息過濾
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); try { for (int i = 0; i < 6000000; i++) { Message msg = new Message("TopicFilter",// topic "TagA",// tag ("Hello MetaQ age = " + i ).getBytes());// body msg.putUserProperty("age", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupFilter"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); // 使用Java代碼,在服務器做消息過濾 String filterCode = MixAll.file2String("D:\\WorkSoft\\workspace\\rocketmq-example\\src\\main\\java\\cn\\gumx\\rocketmq\\filter\\MessageFilterImpl.java"); consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext context) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgbody); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
自定義消息的過濾類
public class MessageFilterImpl implements MessageFilter { public boolean match(MessageExt msg, FilterContext arg1) { String property = msg.getUserProperty("age"); if (property != null) { int age = Integer.parseInt(property); if ((age % 3) == 0 && (age > 10)) { return true; } } return false; } }
consumer啟動后并沒有出現預期的結果,查了資料也沒有相關介紹,只是和我們代碼一樣的處理邏輯,查看源碼發現,需要啟動filter組件mqfiltersrv服務。
./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &
我們部署的是雙主,mqfiltersrv服務都需要開啟
查看服務端結果:
使用類消息過濾模式,需要額外需要啟動filter組件mqfiltersrv服務,否則消費不了,每個broker都需要啟動一個,相當于加了一層過濾層。
filtersrv 出現了。減少了 Broker 的負擔,又減少了 Consumer 接收無用的消息。當然缺點也是有的,多了一層 filtersrv 網絡開銷
MessageFilterImpl消息過濾實現類中的代碼最好不要帶有中文防止錯誤
注意:RocketMQ4.3.1開始刪除與 mqfilter 服務器相關的腳本,4.3.2刪除客戶端關于mqfilter 客戶端代碼,后面版本不支持該功能。
感謝各位的閱讀,以上就是“RocketMQ如何實現消息過濾”的內容了,經過本文的學習后,相信大家對RocketMQ如何實現消息過濾這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。