中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot中使用RabbitMQ的RPC功能案例分析

發布時間:2021-11-16 09:05:18 來源:億速云 閱讀:177 作者:iii 欄目:開發技術

這篇文章主要講解了“SpringBoot中使用RabbitMQ的RPC功能案例分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“SpringBoot中使用RabbitMQ的RPC功能案例分析”吧!

一、RabbitMQ的RPC簡介

實際業務中,有的時候我們還需要等待消費者返回結果給我們,或者是說我們需要消費者上的一個功能、一個方法或是一個接口返回給我們相應的值,而往往大型的系統軟件,生產者跟消費者之間都是相互獨立的兩個系統,部署在兩個不同的電腦上,不能通過直接對象.方法的形式獲取想要的結果,這時候我們就需要用到RPC(Remote Procedure Call)遠程過程調用方式。
RabbitMQ實現RPC的方式很簡單,生產者發送一條帶有標簽(消息ID(correlation_id)+回調隊列名稱)的消息到發送隊列,消費者(也稱RPC服務端)從發送隊列獲取消息并處理業務,解析標簽的信息將業務結果發送到指定的回調隊列,生產者從回調隊列中根據標簽的信息獲取發送消息的返回結果。

二、SpringBoot中使用RabbitMQ的RPC功能

注意:springboot中使用的時候,correlation_id為系統自動生成的,reply_to在加載AmqpTemplate實例的時候設置的。

實例:
說明:隊列1為發送隊列,隊列2為返回隊列

1.先配置rabbitmq

package com.ws.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/*
 * rabbitMQ配置類
 */
@Configuration
public class RabbitMQConfig {
	public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Autowired
    ConnectionFactory connectionFactory;
    
    @Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory() {
    	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    	connectionFactory.setHost(host);
    	connectionFactory.setPort(port);
    	connectionFactory.setUsername(username);
    	connectionFactory.setPassword(password);
    	connectionFactory.setVirtualHost("/");
    	return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate() {
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    	//設置reply_to(返回隊列,只能在這設置)
    	rabbitTemplate.setReplyAddress(TOPIC_QUEUE2);
    	rabbitTemplate.setReplyTimeout(60000);
    	return rabbitTemplate;
    }
    //返回隊列監聽器(必須有)
    @Bean(name="replyMessageListenerContainer")
    public SimpleMessageListenerContainer createReplyListenerContainer() {
         SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
         listenerContainer.setConnectionFactory(connectionFactory);
         listenerContainer.setQueueNames(TOPIC_QUEUE2);
         listenerContainer.setMessageListener(rabbitTemplate());
         return listenerContainer;
    }
    

    
    //創建隊列
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2);
    }
    
    //創建交換機
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    
    //交換機與隊列進行綁定
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1);
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2);
    }
}

2.發送消息并同步等待返回值

@Autowired
private RabbitTemplate rabbitTemplate;


//報文body
String sss = "報文的內容";
//封裝Message
Message msg = this.con(sss);
log.info("客戶端--------------------"+msg.toString());
//使用sendAndReceive方法完成rpc調用
Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg);
//提取rpc回應內容body
String response = new String(message.getBody());
log.info("回應:" + response);
log.info("rpc完成---------------------------------------------");


public Message con(String s) {
	MessageProperties mp = new MessageProperties();
	byte[] src = s.getBytes(Charset.forName("UTF-8"));
	//mp.setReplyTo("adsdas");   加載AmqpTemplate時設置,這里設置沒用
	//mp.setCorrelationId("2222");   系統生成,這里設置沒用
	mp.setContentType("application/json");
	mp.setContentEncoding("UTF-8");
	mp.setContentLength((long)s.length());
	return new Message(src, mp);
}

3.寫消費者

package com.ws.listener.mq;

import java.nio.charset.Charset;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ws.common.RabbitMQConfig;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class Receiver {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1)
	public void receiveTopic1(Message msg) {
		log.info("隊列1:"+msg.toString());
		String msgBody = new String(msg.getBody());
		//數據處理,返回的Message
		Message repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId());
		
		rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);
		
    }
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2)
	public void receiveTopic2(Message msg) {
		log.info("隊列2:"+msg.toString());
		
    }
	
	public Message con(String s, String id) {
		MessageProperties mp = new MessageProperties();
		byte[] src = s.getBytes(Charset.forName("UTF-8"));
		mp.setContentType("application/json");
		mp.setContentEncoding("UTF-8");
		mp.setCorrelationId(id);
		
		return new Message(src, mp);
	} 
}

日志打印:

2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客戶端--------------------(Body:‘報文的內容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 隊列1:(Body:‘報文的內容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回應:報文的內容返回了

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------

感謝各位的閱讀,以上就是“SpringBoot中使用RabbitMQ的RPC功能案例分析”的內容了,經過本文的學習后,相信大家對SpringBoot中使用RabbitMQ的RPC功能案例分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宣威市| 井研县| 正阳县| 武义县| 黑水县| 昭通市| 菏泽市| 邵阳县| 和顺县| 贵阳市| 金川县| 新昌县| 南汇区| 台州市| 渭源县| 济源市| 锦屏县| 时尚| 伊川县| 商都县| 淮阳县| 夹江县| 伊宁县| 安国市| 浑源县| 故城县| 深州市| 连南| 沙坪坝区| 仲巴县| 祁阳县| 通化市| 朝阳区| 陇川县| 安福县| 巴林右旗| 山阳县| 枣庄市| 辽中县| 蕲春县| 台南县|