您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關JMS消息activemq的使用分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
對于JMS,百度百科,是這樣介紹的:JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
簡短來說,JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity),提供了應用程序之間異步通信的功能。
JMS1.0是jsr 194里規定的規范(關于jsr規范,請點擊)。目前最新的規范是JSR 343,JMS2.0。
好了,說了這么多,其實只是在說,JMS只是sun公司為了統一廠商的接口規范,而定義出的一組api接口。
描述如下:
JMS提供者(JMS的實現者,比如activemq jbossmq等)
JMS客戶(使用提供者發送消息的程序或對象,例如在12306中,負責發送一條購票消息到處理隊列中,用來解決購票高峰問題,那么,發送消息到隊列的程序和從隊列獲取消息的程序都叫做客戶)
JMS生產者,JMS消費者(生產者及負責創建并發送消息的客戶,消費者是負責接收并處理消息的客戶)
JMS消息(在JMS客戶之間傳遞數據的對象)
JMS隊列(一個容納那些被發送的等待閱讀的消息的區域)
JMS主題(一種支持發送消息給多個訂閱者的機制)
連接工廠(connectionfactory)客戶端使用JNDI查找連接工廠,然后利用連接工廠創建一個JMS連接。
JMS連接 表示JMS客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。
JMS會話 session 標識JMS客戶端和服務端的會話狀態。會話建立在JMS連接上,標識客戶與服務器之間的一個會話進程。
JMS目的 Destinatio 又稱為消息隊列,是實際的消息源
生產者和消費者
消息類型,分為隊列類型(優先先進先出)以及訂閱類型
從官網下載安裝包, http://activemq.apache.org/download.html 賦予運行權限 chmod +x,windows可以忽略此步 運行 ./active start | stop 啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基于web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。
==》啟動MQ服務器:
根據操作系統不同,進入相應win64/win32位目錄,雙擊activemq.bat啟動MQ。
ActiveMQ默認啟動時,啟動了內置的jetty服務器,提供一個用于監控ActiveMQ的admin應用 進入ActionMQ服務監控地址:瀏覽器輸入http://127.0.0.1:8161/admin
Spring最厲害的地方就是它的Bean了,還有它特有的IOC(控制反轉)和AOP(面向切面編程)技術。有了這些,我們就可以不用new關鍵字構造對象,同時,可以方便地使用注入往類中的屬性進行初始化。如果你編寫過ActiveMQ之類的JMS應用程序,無論對于消息的生產者還是消費者,最重要的接口有以下兩個: 1.ConnectionFactory 2.Destination
ConnectionFactory是一切的基礎,有了它才有了Connection,然后才有Session,只有通過Session對象,我們才能創建消息隊列、構建生產者/消費者,繼而發送/接收消息。 Destination是一切的歸宿,它就像總線一樣,生產者發出消息要發到它上面,消費者取消息也要從這上面取。
試想,如果這一切都能借助Spring強大的Bean管理的話,我們在編寫程序的時候會更加的方便簡潔。幸運的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~
Spring官方提供了一個叫JmsTemplate的類,這個類就專門用來處理JMS的,在該類的Bean配置標簽中有兩個屬性connectionFactory-ref和defaultDestination-ref正好對應JMS中的ConnectionFactory和Destination,如果你有興趣查看源碼的話,就可以發現JmsTemplate幫我們做了大量創建的工作,我們只需要用它來進行收發信息就ok了,而ActiveMQ官方也提供了對應的實現包。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd"> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"></bean> <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="jmsConnectionFactory" p:sessionCacheSize="10"></bean> <bean id="queue_test" class="org.apache.activemq.command.ActiveMQQueue"> <!--消息隊列名稱 --> <constructor-arg value="queue_test" /> </bean> <!-- Spring JMS Template --> <!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="cachedConnectionFactory" p:defaultDestination-ref="queue_test"></bean> --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="jmsConnectionFactory" /> <!-- pub/sub模型(發布/訂閱) --> <property name="pubSubDomain" value="true" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> <!-- Session.AUTO_ACKNOWLEDGE 1消息自動簽收 Session.CLIENT_ACKNOWLEDGE 2客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 3不必必須簽收,消息可能會重復發送 --> <property name="sessionAcknowledgeMode" value="2" /> </bean> <jms:listener-container container-type="default" connection-factory="jmsConnectionFactory" acknowledge="auto"> <jms:listener destination="queue_test" ref="simpleMsgListener" method="onMessage"></jms:listener> </jms:listener-container> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:util="http://www.springframework.org/schema/util" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd "> <context:annotation-config /> <context:component-scan base-package="com.troy" /> <!-- 讀取配置文件 --> <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <array> <value>classpath:conf/spring-config.properties</value> </array> </property> </bean> <!-- 連接 activemq --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq_url}" userName="${activemq_username}" password="${activemq_password}" /> <!-- 這里可以采用連接池的方式連接PooledConnectionFactoryBean --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 配置連接 --> <property name="targetConnectionFactory" ref="amqConnectionFactory" /> <!-- 會話的最大連接數 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定義消息隊列topic類型,queue的方式差不多 --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 定義名稱 --> <constructor-arg index="0" value="topic" /> </bean> <!-- 配置JMS模板(topic),Spring提供的JMS工具類,它發送、接收消息。 --> <!-- 為了測試發送消息,保留jmsTemplate的配置,實際不存在發送,只需要配置監聽即可 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topic" /> <!-- 非pub/sub模型(發布/訂閱),true為topic,false為queue --> <property name="pubSubDomain" value="true" /> </bean> <!-- 監聽方式,這種方式更實用,可以一直監聽消息 --> <bean id="topicMessageListen" class="com.troy.activemq.TopicMessageListen" /> <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <!-- 注冊activemq名稱 --> <property name="destination" ref="topic" /> <property name="messageListener" ref="topicMessageListen" /> </bean> </beans>
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; @Component(value = "simpleMsgListener") public class SimpleMsgListener implements MessageListener { //收到信息時的動作 @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("收到的信息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
spring boot jsm官網
springboot與ActiveMQ整合
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
# ActiveMQ-------------------------------------------------------------------------------------------------------------- # Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified. #spring.activemq.in-memory=false # URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616` spring.activemq.broker-url=tcp://127.0.0.1:61616 # Login user of the broker. spring.activemq.user=admin # Login password of the broker. spring.activemq.password=admin # Trust all packages. #spring.activemq.packages.trust-all=false # Comma-separated list of specific packages to trust (when not trusting all packages). #spring.activemq.packages.trusted= # See PooledConnectionFactory. #spring.activemq.pool.configuration.*= # Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory. spring.activemq.pool.enabled=true # Maximum number of pooled connections. spring.activemq.pool.max-connections=50 # Connection expiration timeout in milliseconds. spring.activemq.pool.expiry-timeout=10000 # Connection idle timeout in milliseconds. spring.activemq.pool.idle-timeout=30000 spring.jms.pub-sub-domain=false
import javax.jms.Queue; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; @Configuration public class ActiveMQConfig { @Value("${queueName}") private String queueName; @Value("${topicName}") private String topicName; @Value("${spring.activemq.user}") private String usrName; @Value("${spring.activemq.password}") private String password; @Value("${spring.activemq.broker-url}") private String brokerUrl; /** * <p>@describe: 配置隊列 </p> * <p>@author: whh </p> * <p>@date: 2019年8月9日 下午5:56:22</p> * @return */ @Bean public Queue queue() { return new ActiveMQQueue(queueName); } /** * <p>@describe:配置主題 </p> * <p>@author: whh </p> * <p>@date: 2019年8月9日 下午5:56:40</p> * @return */ @Bean public Topic topic() { return new ActiveMQTopic(topicName); } /** * <p>@describe:連接工廠 </p> * <p>@author: whh </p> * <p>@date: 2019年8月9日 下午5:57:14</p> * @return */ @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(usrName, password, brokerUrl); } /** * <p>@describe: 監聽隊列bean </p> * <p>@author: whh </p> * <p>@date: 2019年8月9日 下午5:57:43</p> * @param connectionFactory * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; } /** * <p>@describe:監聽主題 bean </p> * <p>@author: whh </p> * <p>@date: 2019年8月9日 下午5:58:17</p> * @param connectionFactory * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //設置為發布訂閱方式, 默認情況下使用的生產消費者方式 bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } }
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @Service public class JmsSendDemo { private static final Logger LOGGER = LoggerFactory.getLogger(JmsSendDemo.class); @Autowired private JmsTemplate jmsTemplate; public void execute(){ LOGGER.debug("執行定時任務-----------------直接違規數據定時:下發至醫院端首頁進行展示-----------------"); Destination destination = (Destination) AppContext.getBean("queue_test"); jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello")); //啟動另一個方法 boolean flag=true; LOGGER.debug("執行定時任務結束------------------------------"); } public void sendMessage(Destination destination,final String message) { LOGGER.debug("發送消息到AMQ,消息內容為:"+message); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Receiver { @JmsListener(destination = "mailbox", containerFactory = "myFactory") public void receiveMessage(Email email) { System.out.println("Received <" + email + ">"); } }
關于JMS消息activemq的使用分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。