中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spring?boot中如何操作ActiveMQ

發布時間:2021-11-25 13:18:10 來源:億速云 閱讀:133 作者:柒染 欄目:開發技術

本篇文章為大家展示了spring boot中如何操作ActiveMQ,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

前言

消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題,實現高性能、高可用、可伸縮和最終一致性架構,是大型分布式系統不可缺少的中間件。

目前在生產環境中使用較多的消息隊列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

特性

  • 異步性:將耗時的同步操作通過以發送消息的方式進行了異步化處理,減少了同步等待的時間。

  • 松耦合:消息隊列減少了服務之間的耦合性,不同的服務可以通過消息隊列進行通信,而不用關心彼此的實現細節,只要定義好消息的格式就行。

  • 分布式:通過對消費者的橫向擴展,降低了消息隊列阻塞的風險,以及單個消費者產生單點故障的可能性(當然消息隊列本身也可以做成分布式集群)。

  • 可靠性:消息隊列一般會把接收到的消息存儲到本地硬盤上(當消息被處理完之后,存儲信息根據不同的消息隊列實現,有可能將其刪除),這樣即使應用掛掉或者消息隊列本身掛掉,消息也能夠重新加載。

JMS 規范

JMS 即 Java 消息服務(Java Message Service)應用程序接口,是一個 Java 平臺中關于面向消息中間件(MOM)的 API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java 消息服務是一個與具體平臺無關的 API,絕大多數 MOM 提供商都對 JMS 提供支持。

JMS 的消息機制有 2 種模型,一種是 Point to Point,表現為隊列的形式,發送的消息,只能被一個接收者取走;另一種是 Topic,可以被多個訂閱者訂閱,類似于群發。

ActiveMQ 是 JMS 的一個實現。

ActiveMQ 介紹

ActiveMQ 是 Apache 軟件基金下的一個開源軟件,它遵循 JMS1.1 規范(Java Message Service),是消息驅動中間件軟件(MOM)。它為企業消息傳遞提供高可用、出色性能、可擴展、穩定和安全保障。ActiveMQ 使用 Apache 許可協議,因此,任何人都可以使用和修改它而不必反饋任何改變。

ActiveMQ 的目標是在盡可能多的平臺和語言上提供一個標準的,消息驅動的應用集成。ActiveMQ 實現 JMS 規范并在此之上提供大量額外的特性。ActiveMQ 支持隊列和訂閱兩種模式的消息發送。

Spring Boot 提供了 ActiveMQ 組件 spring-boot-starter-activemq,用來支持 ActiveMQ 在 Spring Boot 體系內使用,下面我們來詳細了解如何使用。

添加依賴

主要添加組件:spring-boot-starter-activemq。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置文件

在 application.properties 中添加配置。

# 基于內存的 ActiveMQ
spring.activemq.in-memory=true
# 不適應連接池
spring.activemq.pool.enabled=false
 
# 獨立安裝的 ActiveMQ
#spring.activemq.broker-url=tcp://192.168.0.1:61616
#spring.activemq.user=admin
#spring.activemq.password=admin

在使用 ActiveMQ 時有兩種使用方式,一種是使用獨立安裝的 ActiveMQ,在生產環境推薦使用這種;另一種是使用基于內存 ActiveMQ ,在調試階段建議使用這種方式。

隊列(Queue)

隊列發送的消息,只能被一個消費者接收。

創建隊列

@Configuration
public class MqConfig {
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("neo.queue");
    }
}

使用 @Configuration 注解在項目啟動時,定義了一個隊列 queue 命名為:neo.queue。

消息生產者

創建一個消息的生產者:

@Component
public class Producer{
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Queue queue;
    public void sendQueue(String msg) {
        System.out.println("send queue msg :"+msg);
        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
    }
}

JmsMessagingTemplate 是 Spring 提供發送消息的工具類,使用 JmsMessagingTemplate 和創建好的 queue 對消息進行發送。

消息消費者

@Component
public class Consumer {
 
    @JmsListener(destination = "neo.queue")
    public void receiveQueue(String text) {
        System.out.println("Consumer queue msg : "+text);
    }
}

使用注解 @JmsListener(destination = "neo.queue"),表示此方法監控了名為 neo.queue 的隊列。當隊列 neo.queue 中有消息發送時會觸發此方法的執行,text 為消息內容。

測試

創建 SampleActiveMqTests 測試類,注入創建好的消息生產者。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SampleActiveMqTests {
    @Autowired
    private Producer producer;
    @Rule
    public OutputCapture outputCapture = new OutputCapture();
}

OutputCapture 是 Spring Boot 提供的一個測試類,它能捕獲 System.out 和 System.err 的輸出,我們可以利用這個特性來判斷程序中的輸出是否執行。

@Test
public void sendSimpleQueueMessage() throws InterruptedException {
    this.producer.sendQueue("Test queue message");
    Thread.sleep(1000L);
    assertThat(this.outputCapture.toString().contains("Test queue")).isTrue();
}

創建測試方式,使用 producer 發送消息,為了保證容器可以接收到消息,讓測試方法等待 1 秒,最后使用 outputCapture 判斷是否執行成功。

測試多消費者

上面的案例只是一個生產者一個消費者,我們在模擬一個生產者和多個消費者隊列的執行情況。我們復制上面的消費者 Consumer 重新命名為 Consumer2,并且將輸出內容加上 2 的關鍵字,如下:

@Component
public class Consumer2 {
    @JmsListener(destination = "neo.queue")
    public void receiveQueue(String text) {
        System.out.println("Consumer2 queue msg : "+text);
    }
}

在剛才的測試類中添加一個 send100QueueMessage() 方法,模式發送 100 條消息時,兩個消費者是如何消費消息的。

@Test
public void send100QueueMessage() throws InterruptedException {
    for (int i=0;i<100;i++){
        this.producer.sendQueue("Test queue message"+i);
    }
    Thread.sleep(1000L);
}

控制臺輸出結果:

Consumer queue msg : Test queue message0

Consumer2 queue msg : Test queue message1

Consumer queue msg : Test queue message2

Consumer2 queue msg : Test queue message3

...

根據控制臺輸出的消息可以看出,當有多個消費者監聽一個隊列時,消費者會自動均衡負載的接收消息,并且每個消息只能有一個消費者所接收。

注意:控制臺輸出 javax.jms.JMSException: peer (vm://localhost#1) stopped. 報錯信息可以忽略,這是 Info 級別的錯誤,是 ActiveMQ 的一個 bug。

廣播(Topic)

廣播發送的消息,可以被多個消費者接收。

創建 Topic

@Configuration
public class MqConfig {
    @Bean
    public Topic topic() {
        return new ActiveMQTopic("neo.topic");
    }
}

使用 @Configuration 注解在項目啟動時,定義了一個廣播 Topic 命名為:neo.topic。

消息生產者

創建一個消息的生產者:

@Component
public class Producer{
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Topic topic;
    public void sendTopic(String msg) {
        System.out.println("send topic msg :"+msg);
        this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
    }
}

和上面的生產者對比只是 convertAndSend() 方法傳入的第一個參數變成了 Topic。

消息消費者

@Component
public class Consumer {
 
    @JmsListener(destination = "neo.topic")
    public void receiveTopic(String text) {
        System.out.println("Consumer topic msg : "+text);
    }
}

消費者也沒有變化,只是監聽的名改為上面的 neo.topic,因為模擬多個消費者,復制一份 Consumer 命名為 Consumer2,代碼相同在輸出中標明來自 Consumer2。

測試

創建 SampleActiveMqTests 測試類,注入創建好的消息生產者。

@Test
public void sendSimpleTopicMessage() throws InterruptedException {
    this.producer.sendTopic("Test Topic message");
    Thread.sleep(1000L);
}

測試方法執行成功后,會看到控制臺輸出信息,如下:

send topic msg :Test Topic message

Consumer topic msg : Test Topic message

Consumer2 topic msg : Test Topic message

可以看出兩個消費者都收到了發送的消息,從而驗證廣播(Topic)是一個發送者多個消費者的模式。

同時支持隊列(Queue)和廣播(Topic)

Spring Boot 集成 ActiveMQ 的項目默認只支持隊列或者廣播中的一種,通過配置項 spring.jms.pub-sub-domain 的值來控制,true 為廣播模式,false 為隊列模式,默認情況下支持隊列模式。

如果需要在同一項目中既支持隊列模式也支持廣播模式,可以通過 DefaultJmsListenerContainerFactory 創建自定義的 JmsListenerContainerFactory 實例,之后在 @JmsListener 注解中通過 containerFactory 屬性引用它。

分別創建兩個自定義的 JmsListenerContainerFactory 實例,通過 pubSubDomain 來控制是支持隊列模式還是廣播模式。

@Configuration
@EnableJms
public class ActiveMQConfig {
 
    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
 
    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

然后在消費者接收的方法中,指明使用 containerFactory 接收消息。

@Component
public class Consumer {
 
    @JmsListener(destination = "neo.queue", containerFactory = "queueListenerFactory")
    public void receiveQueue(String text) {
        System.out.println("Consumer queue msg : "+text);
    }
 
    @JmsListener(destination = "neo.topic", containerFactory = "topicListenerFactory")
    public void receiveTopic(String text) {
        System.out.println("Consumer topic msg : "+text);
    }
}

改造完成之后,再次執行隊列和廣播的測試方法,就會發現項目同時支持了兩種類型的消息收發。

消息中間件廣泛應用在大型互聯網架構中,利用消息中間件隊列和廣播各自的特性可以支持很多業務,比如群發發送短信、給單個用戶發送郵件等。ActiveMQ 是一款非常流行的消息中間件,它的特點是部署簡單、使用方便,比較適合中小型團隊。Spring Boot 提供了集成 ActiveMQ 對應的組件,在 Spring Boot 中使用 ActiveMQ 只需要添加相關注解即可。

上述內容就是spring boot中如何操作ActiveMQ,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

定安县| 屯门区| 遵义县| 娄烦县| 灵武市| 阿克苏市| 湖南省| 万荣县| 昌都县| 锦州市| 曲松县| 扎赉特旗| 岗巴县| 桃园市| 木兰县| 旬阳县| 双城市| 连城县| 安庆市| 五大连池市| 荃湾区| 册亨县| 仁怀市| 保山市| 措美县| 石门县| 缙云县| 信丰县| 凉城县| 隆子县| 内黄县| 宽城| 新田县| 敖汉旗| 乐都县| 浠水县| 高平市| 株洲县| 萨嘎县| 游戏| 利辛县|