中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ如何實現RPC遠程調用消息隊列

發布時間:2021-11-23 10:24:11 來源:億速云 閱讀:487 作者:小新 欄目:開發技術

這篇文章主要介紹了RabbitMQ如何實現RPC遠程調用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

  客戶端接口

  我們創建一個客戶端類來說明如何使用RPC服務,暴露一個call方法來發送RPC請求和數據獲取結果。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();  
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

  盡管RPC是編程中一種常見的模式,但其也常常飽受批評。因為程序員常常不知道調用的方法是本地方法還是一個RPC方法,這在調試中常常增加一些不必要的復雜性。我們應該簡化代碼,而不是濫用RPC導致代碼變的臃腫。

  回調隊列

  一般來說,通過RabbitMQ實現RPC非常簡單,客戶端發送一個請求消息,服務端響應消息就完成了。為了接收到響應內容,我們在請求中發送”callback“隊列地址,也可以使用默認的隊列。

callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties .Builder().replyTo(callbackQueueName) .build(); 
channel.basicPublish("", "rpc_queue", props, message.getBytes());

  AMQP協議中預定了14個消息屬性,除了下面幾個,其它的都很少使用:

  deliveryMode : 標識消息是持久化還是瞬態的。

  contentType : 描述 mime-type的編碼類型,如JSON編碼為”application/json“。

  replyTo : 通常在回調隊列中使用。

  correlationId : 在請求中關聯RPC響應時使用。

  關聯Id(Correlation Id)

  在前面的方法中,要求在每個RPC請求創建回調隊列,這可真是一件繁瑣的事情,但幸運的是我們有個好方法-在每個客戶端創建一個簡單的回調隊列。

  這樣問題又來了,隊列如何知道這些響應來自哪個請求呢?這時候correlationId就出場了。我們在每個請求中都設置一個唯一的值,這樣我們在回調隊列中接收消息的時候就能知道是哪個請求發送的。如果收到未知的correlationId,就廢棄該消息,因為它不是我們發出的請求。

  你可能會問,為什么拋棄未知消息而不是拋出錯誤呢?這是由服務器競爭資源所導致的。盡管這不太可能,試想一下,如果RPC服務器在發送完響應后而在發送應答消息前死掉了,重啟RPC服務器會重新發送請求。這就是我們在客戶機上優雅地處理重復的反應,RPC應該是等同的。

RabbitMQ如何實現RPC遠程調用消息隊列

  (1)客戶端啟動,創建一個匿名且唯一的回調隊列。

  (2)對每個RPC請求,客戶端發送一個包含replyTo和correlationId兩個屬性的消息。

  (3)請求發送到rpc_queue隊列。

  (4)RPC服務在隊列中等待請求,當請求出現時,根據replyTo字段使用隊列將結果發送到客戶端。

  (5)客戶端在回調隊列中等待數據。當消息出現時,它會檢查correlationId屬性,如果該值匹配的話,就會返回響應結果給應用。

  示例代碼

  RPCServer.java

package com.favccxx.favrabbit;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

	private static final String RPC_QUEUE_NAME = "rpc_queue";

	private static int fib(int n) {
		if (n == 0)
			return 0;
		if (n == 1)
			return 1;
		return fib(n - 1) + fib(n - 2);
	}

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

			channel.basicQos(1);

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

			System.out.println(" [x] Awaiting RPC requests");

			while (true) {
				String response = null;

				QueueingConsumer.Delivery delivery = consumer.nextDelivery();

				BasicProperties props = delivery.getProperties();
				BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId())
						.build();

				try {
					String message = new String(delivery.getBody(), "UTF-8");
					int n = Integer.parseInt(message);

					System.out.println(" [.] fib(" + message + ")");
					response = "" + fib(n);
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));

					channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

  RPCClient.java       

package com.favccxx.favrabbit;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {

	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		connection = factory.newConnection();
		channel = connection.createChannel();

		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	public String call(String message) throws Exception {
		String response = null;
		String corrId = UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

		channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}

		return response;
	}

	public void close() throws Exception {
		connection.close();
	}

	public static void main(String[] argv) {
		RPCClient fibonacciRpc = null;
		String response = null;
		try {
			fibonacciRpc = new RPCClient();

			System.out.println(" [x] Requesting fib(30)");
			response = fibonacciRpc.call("30");
			System.out.println(" [.] Got '" + response + "'");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (fibonacciRpc != null) {
				try {
					fibonacciRpc.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

  先啟動RPCServer,然后運行RPCClient,控制臺輸出如下內容

RPCClient[x] Requesting fib(30)

RPCClient[.] Got '832040'


RPCServer[x] Awaiting RPC requests

RPCServer[.] fib(30)



感謝你能夠認真閱讀完這篇文章,希望小編分享的“RabbitMQ如何實現RPC遠程調用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

永川市| 津南区| 麻栗坡县| 安多县| 日土县| 汾阳市| 新巴尔虎左旗| 龙川县| 梁平县| 海南省| 新乡市| 秭归县| 安徽省| 阿尔山市| 桃园市| 呼图壁县| 泰安市| 峨边| 内黄县| 黄陵县| 成武县| 武山县| 库车县| 扶余县| 高台县| 宁武县| 邵阳市| 垫江县| 儋州市| 自贡市| 大冶市| 朝阳市| 仙游县| 黄浦区| 沽源县| 中超| 东莞市| 黎川县| 扬中市| 库伦旗| 和政县|