中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spring boot集成rabbitmq的示例分析

發布時間:2021-07-08 13:44:23 來源:億速云 閱讀:281 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“spring boot集成rabbitmq的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“spring boot集成rabbitmq的示例分析”這篇文章吧。

一、RabbitMQ的介紹  

RabbitMQ是消息中間件的一種,消息中間件即分布式系統中完成消息的發送和接收的基礎軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現已經轉讓給apache).

消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.具體過程如下:

spring boot集成rabbitmq的示例分析

從上圖可看出,對于消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,并且當消息隊列收到消息之后,接收消息隊列傳來的消息,并且給予相應的處理.消息隊列常用于分布式系統之間互相信息的傳遞.

對于RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的示例分析

緊接著說一下交換機.交換機的主要作用是接收相應的消息并且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout.

Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創建消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發送到該消息隊列中.

topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發送到該消息隊列中.

headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被發送到匹配的消息隊列中.

Fanout是路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略. 

概念:

  • 生產者 消息的產生方,負責將消息推送到消息隊列

  • 消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息

  • 隊列 消息的寄存器,負責存放生產者發送的消息

  • 交換機 負責根據一定規則分發生產者產生的消息

  • 綁定 完成交換機和隊列之間的綁定

模式:

1、direct

直連模式,用于實例間的任務分發

2、topic

話題模式,通過可配置的規則分發給綁定在該exchange上的隊列

3、headers

適用規則復雜的分發,用headers里的參數表達規則

4、fanout

分發給所有綁定到該exchange上的隊列,忽略routing key

安裝

單機版安裝很簡單,大概步驟如下:

# 安裝erlang包
 yum install erlang
# 安裝socat
 yum install socat
# 安裝rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 啟動服務
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用戶:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安裝,可參考這篇文章:

     rabbitmq集群安裝

以上就是rabbitmq的介紹,下面開始本文的正文:spring boot 集成rabbitmq ,本人在學習rabbitmq時發現網上很少有系統性介紹springboot和rabbitmq如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。

二、springboot配置

廢話少說直接上代碼:

配置參數

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config讀取參數

/**
 * RabbitMq配置文件讀取類
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;

 // 構建mq實例工廠
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生產者配置

主要配置了直連和話題模式,其中話題模式設置兩個隊列(queueTopicTest1、queueTopicTest2),此兩個隊列在和交換機綁定時分別設置不同的routingkey(.TEST.以及lazy.#)來驗證匹配模式。

/**
 * 用于配置交換機和隊列對應關系
 * 新增消息隊列應該按照如下步驟
 * 1、增加queue bean,參見queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

 /**
 * @Author:chenhf
 * @Description: 主題型交換機
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主題型交換機bean實例化");
 return contractTopicExchange;
 }
 /**
 * 直連型交換機
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直連型交換機bean實例化");
 return contractDirectExchange;
 }

 //在此可以定義隊列

 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("測試隊列實例化完成");
 return queue;
 }

 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測試隊列1實例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測試隊列2實例化完成");
 return queue;
 }


 //在此處完成隊列和交換機綁定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與直連型交換機綁定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與話題交換機1綁定完成");
 return binding;
 }

 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與話題交換機2綁定完成");
 return binding;
 }

}

在這里用到枚舉類:RabbitMqEnum

/**
 * 定義rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

 /**
 * @param
 * @Author:chenhf
 * @Description:定義數據交換方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "點對點");

 private String code;
 private String name;

 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

 /**
 * describe: 定義隊列名稱
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "測試隊列"),
 TOPICTEST1("TOPICTEST1", "topic測試隊列"),
 TOPICTEST2("TOPICTEST2", "topic測試隊列");

 private String code;
 private String name;

 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }

 }

 /**
 * describe: 定義routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "測試隊列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic測試隊列key"),
 TESTTOPICQUEUE2("lazy.#", "topic測試隊列key");


 private String code;
 private String name;

 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

}

以上完成消息生產者的定義,下面封裝調用接口

測試時直接調用此工具類,testUser類需自己實現

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq發送消息工具類
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

 private RabbitTemplate rabbitTemplate;

 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }

 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }

 /**
 * 發送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }

 /**
 * 所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消費者配置

springboot注解方式監聽隊列,無法手動指定回調,所以采用了實現ChannelAwareMessageListener接口,重寫onMessage來進行手動回調,詳見以下代碼,詳細介紹可以在spring的官網上找amqp相關章節閱讀

直連消費者

通過設置TestUser的name來測試回調,分別發兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通過設置TestUser的name來測試回調,分別發兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }

 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }

 }
 };
 }

}

topic消費者1

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

 }
 };
 }




}

topic消費者2

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

以上是“spring boot集成rabbitmq的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

浦县| 伽师县| 乌什县| 富宁县| 玛纳斯县| 临汾市| 铜山县| 安阳市| 丁青县| 富川| 镇坪县| 黎平县| 恩施市| 龙门县| 杭州市| 南皮县| 潮安县| 商都县| 仁怀市| 余姚市| 绵竹市| 阿尔山市| 海兴县| 东台市| 元谋县| 尚志市| 高雄市| 临漳县| 政和县| 桂东县| 湘阴县| 新竹市| 平昌县| 宽城| 伊宁市| 新余市| 辽源市| 青州市| 塔河县| 客服| 新源县|