中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JMS與ActiveMQ的示例分析

發布時間:2021-08-23 12:28:55 來源:億速云 閱讀:129 作者:小新 欄目:編程語言

這篇文章給大家分享的是有關JMS與ActiveMQ的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

一、異步通信

之前接觸到的RMI,Hessian等技術都是同步通信機制。當客戶端調用遠程方法時,客戶端必須等到遠程方法完成后,才能繼續執行。這段時間客戶端一直會被阻塞(這樣造成的用戶體驗很不好)。

JMS與ActiveMQ的示例分析

(同步通信)

同步通信有并不是程序之間交互的唯一方式,異步通信機制中,客戶端不需要等待服務處理消息,可以繼續執行,并且最終能夠收到并處理消息。

JMS與ActiveMQ的示例分析

(異步通信)

異步通信的優勢

無需等待。客戶端只需要將消息發送給消息代理,不需要等待就可以繼續執行別的任務,且確信消息會被投遞給相應的目的地。
面向消息和解耦。 客戶端不需要擔心遠程服務的接口規范,只需要把消息放入消息隊列然后獲取結果即可。

二、JMS

1. 簡介

在JMS出現之前,每個消息代理都是有不同的實現,這就使得不同代理之間的消息代碼很難通用。JMS(Java Message Service,Java消息服務)是一個標準,定義了使用消息代理的通用API。即所有遵從規范的實現都使用通用的接口,類似于JDBC為數據庫操作提供通用接口。

JMS幾個重要的要素:

Destination:消息從發送端發出后要走的通道。
ConnectionFactory:連接工廠,用于創建連接的對象。
Connection:連接接口,用于創建session。
Session:會話接口,用于創建消息的發送者,接受者以及消息對象本身。
MessageConsumer:消息的消費者。
MessageProducer:消息的生產者。
XXXMessage:各種類型的消息對象,包括ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage 5種。

2. JMS消息模型

不同的消息系統有不同的消息模型。JMS提供了兩種模型:Queue(點對點)和Topic(發布/訂閱)。

JMS Queue(點對點)模型

在點對點模型中,消息生產者生產消息發送到queue中,然后消息消費者從queue中取出并且消費消息,但不可重復消費。

如圖:

JMS與ActiveMQ的示例分析

發送者1,發送者2,發送者3各發送一條消息到服務器;

消息1,2,3就會按照順序形成一個隊列,隊列中的消息不知道自己會被哪個接收者消費;

接收者1,2,3分別從隊列中取出一條消息進行消費,每取出一條消息,隊列就會將該消息刪除,這樣即保證了消息不會被重復消費。

JMS Queue模型也成為P2P(Point to Point)模型。

JMS Topic(發布/訂閱)模型

JMS Topic模型與JMS Queue模型的最大差別在于消息接收的部分。Topic模型類似于微信公眾號,訂閱了該公眾號的接收者都可以接收到公眾號推送的消息。

如圖:

JMS與ActiveMQ的示例分析

發布者1,2,3分別發布3個主題1,2,3;
這樣訂閱了主題1的用戶群:訂閱者1,2,3即能接收到主題1消息;同理訂閱者4,5,6即能接收到主題2消息,訂閱者7,8,9即能接收到主題3消息。

JMS Topic模型也成為Pus/Sub模型。

兩種模式下各要素的對比:

JMS與ActiveMQ的示例分析

3. 傳統JMS編程模型

Producer:

(1)創建連接工廠ConnectionFactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建生產者;
(7)創建消息類型和消息內容;
(8)發送消息;

Consumer:

(1)創建連接工廠ConnectionFactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建消費者
(7)創建消息類型;
(8)接收消息;

三、 ActiveMQ簡介

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

ActiveMQ 主要特性:

多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
對spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
通過了常見J2EE服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通過JDBC和journal提供高速的消息持久化
從設計上保證了高性能的集群,客戶端-服務器,點對點
支持Ajax
支持與Axis的整合
可以很容易得調用內嵌JMS provider,進行測試

四、 ActiveMQ實戰

下面看看如何ActiveMQ實現一個簡單的消息隊列。

傳統的JMS編程模型

1. JMS Queue模型代碼實現:

Producer:

package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQProducer {
	private static final String URL = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "queue-name";
	public static void main(String[] args) throws JMSException {
		//1 創建連接工廠ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用連接工廠創建連接
		Connection connection = connectionFactory.createConnection();
		//3 啟動連接
		connection.start();
		//4 創建會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 創建消息發送的目的地
		Destination destination = session.createQueue(QUEUE_NAME);
		//6 創建生產者
		MessageProducer messageProducer = session.createProducer(destination);
		//7 創建消息
		TextMessage textMessage = session.createTextMessage();
		for (int i = 1; i <= 100; i++) {
			//8 創建消息內容
			textMessage.setText("發送者- 1 -發送消息:" + i);
			//9 發送消息
			messageProducer.send(textMessage);
		}
		System.out.println("消息發送成功");
		session.close();
		connection.close();
	}
}

Conusmer:

package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQConsumer {
	private static final String URL = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "queue-name";
	public static void main(String[] args) throws JMSException {
		//1 創建連接工廠ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用連接工廠創建連接
		Connection connection = connectionFactory.createConnection();
		//3 啟動連接
		connection.start();
		//4 創建會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 創建消息發送的目的地
		Destination destination = session.createQueue(QUEUE_NAME);
		//6 創建消費者
		MessageConsumer messageConsumer = session.createConsumer(destination);
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				//7 創建消息
				TextMessage textMessage = (TextMessage)message;
				try {
					//7 接收消息
					System.out.println("消費者- 1 -接收消息:【" + textMessage.getText() + "】");
				}
				catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		);
	}
}

2. JMS Topic模型代碼實現:

Producer:

package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 發布訂閱模式
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQProducer {
	private static final String URL = "tcp://localhost:61616";
	private static final String TOPIC_NAME = "topic-name";
	public static void main(String[] args) throws JMSException {
		//1 創建連接工廠ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用連接工廠創建連接
		Connection connection = connectionFactory.createConnection();
		//3 啟動連接
		connection.start();
		//4 創建會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 創建帶有主題的消息發送的目的地
		Destination destination = session.createTopic(TOPIC_NAME);
		//6 創建生產者
		MessageProducer messageProducer = session.createProducer(destination);
		//7 創建消息
		TextMessage textMessage = session.createTextMessage();
		for (int i = 1; i <= 100; i++) {
			//8 創建消息內容
			textMessage.setText("發送者- 1 -發送消息:" + i);
			//9 發送消息
			messageProducer.send(textMessage);
		}
		System.out.println("消息發送成功");
		session.close();
		connection.close();
	}
}

Consumer:

package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 發布訂閱模式
 * Created by GenshenWang.nomico on 2017/10/19.
 */
public class ActiveMQConsumer {
	private static final String URL = "tcp://localhost:61616";
	private static final String TOPIC_NAME = "topic-name";
	public static void main(String[] args) throws JMSException {
		//1 創建連接工廠ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		//2 使用連接工廠創建連接
		Connection connection = connectionFactory.createConnection();
		//3 啟動連接
		connection.start();
		//4 創建會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 創建消息發送的目的地
		Destination destination = session.createTopic(TOPIC_NAME);
		//6 創建消費者
		MessageConsumer messageConsumer = session.createConsumer(destination);
		messageConsumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				//7 創建消息
				TextMessage textMessage = (TextMessage)message;
				try {
					//7 接收消息
					System.out.println("消費者- 1 -接收消息:【" + textMessage.getText() + "】");
				}
				catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		);
	}
}

使用Spring的JMS模板

雖然JMS為所有的消息代理提供了統一的接口,但如同JDBC一樣,在處理連接,語句,結果集和異常時會顯得很繁雜。不過,Spring為我們提供了JmsTemplate來消除冗余和重復的JMS代碼。
下面看看如何使用JmsTemplate來實現消息隊列。

1. JMS Queue模型代碼實現:

配置文件:
producer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

  <context:annotation-config/>

  <!-- ActiveMQ提供的ConnectionFactory-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>

  <!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
  </bean>

  <!-- 配置JmsTemplate,用于發送消息 -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
  </bean>

  <!-- 配置隊列目的地的名稱-->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置隊列目的地的名稱-->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>

  <bean id="producerServiceImpl" class="com.wgs.jms.producer.ActiveMQProducerServiceImpl"/>


</beans>

consumer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

  <context:annotation-config/>

  <!-- ActiveMQ提供的ConnectionFactory-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>

  <!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
  </bean>

  <!-- 配置隊列目的地的名稱-->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置消息監聽器-->
  <bean id="consumerMessageListener" class="com.wgs.jms.consumer.ConsumerMessageListener"/>
  <!-- 配置隊列目的地的名稱-->
  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="destination" ref="queueDestination"/>
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="messageListener" ref="consumerMessageListener"/>
  </bean>
  <!-- 配置隊列目的地的名稱-->
  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>
</beans>

生產者Producer:

(1)先寫一個接口:

package com.wgs.jms.producer;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public interface ActiveMQProducerService {
  void sendMessage(final String message);
}

(2)接口的實現:

package com.wgs.jms.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQProducerServiceImpl implements ActiveMQProducerService {
	@Autowired
	  JmsTemplate jmsTemplate;
	@Resource(name = "queueDestination")
	  Destination destination;
	public void sendMessage(final String message) {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage(message);
				return textMessage;
			}
		}
		);
		System.out.println("生產者- 1 -發送消息成功:" + message);
	}
}

(3)測試:

package com.wgs.jms.producer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQProducerMain {
	public static void main(String[] args) {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
		ActiveMQProducerService service = context.getBean(ActiveMQProducerService.class);
		for (int i = 0; i < 100; i++) {
			service.sendMessage("test" + i);
		}
		context.close();
	}
}

消費者:

(1)創建消息監聽器:

package com.wgs.jms.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ConsumerMessageListener implements MessageListener {
	public void onMessage(Message message) {
		try {
			TextMessage textMessage = (TextMessage) message;
			System.out.println("消費者- 1 -接收消息:" + textMessage.getText());
		}
		catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

(2)測試:

package com.wgs.jms.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Created by GenshenWang.nomico on 2017/10/20.
 */
public class ActiveMQConsumerMain {
	public static void main(String[] args) {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
	}
}

2. JMS Topic模型代碼實現:

將上述代碼中出現的queueDestination改為topicDestination即可。

感謝各位的閱讀!關于“JMS與ActiveMQ的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

库尔勒市| 大厂| 绥德县| 襄汾县| 永年县| 安吉县| 和林格尔县| 荥经县| 临沂市| 阿拉善左旗| 尼勒克县| 衡阳市| 互助| 永川市| 叶城县| 郑州市| 湄潭县| 凤山市| 岢岚县| 富锦市| 冷水江市| 荔浦县| 石首市| 潼关县| 南开区| 友谊县| 微博| 偏关县| 武威市| 宁陕县| 大姚县| 韶山市| 铜山县| 万山特区| 思南县| 龙泉市| 杭锦后旗| 中超| 大冶市| 镇康县| 巴彦淖尔市|