中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

發布時間:2020-08-07 19:12:26 來源:網絡 閱讀:425 作者:Java_老男孩 欄目:編程語言

一、發送消息到隊列(生產者)

新建一個maven項目,在pom.xml文件加入以下依賴

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
</dependencies><br>

新建一個P1類

package com.rabbitMQ.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-11:23
 */
public class P1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊列名字
        String queueName="queue";
        //實例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設置地址
        connectionFactory.setHost("192.168.128.233");
        //設置端口
        connectionFactory.setPort(5672);
        //設置用戶名
        connectionFactory.setUsername("mowen");
        //設置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創建通道
        Channel channel = connection.createChannel();
        //聲明隊列。
        //參數1:隊列名
        //參數2:持久化 (true表示是,隊列將在服務器重啟時依舊存在)
        //參數3:獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)
        //參數4:當所有消費者客戶端連接斷開時是否自動刪除隊列
        //參數5:隊列的其他參數
        channel.queueDeclare(queueName,true,false,false,null);

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發布消息
            // 第一個參數為交換機名稱、
            // 第二個參數為隊列映射的路由key、
            // 第三個參數為消息的其他屬性、
            // 第四個參數為發送信息的主體
            channel.basicPublish("",queueName,null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運行后再瀏覽器進入RabbitMQ的控制臺,切換到queue看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

二、獲取隊列消息(消費者)

新建一個C1類

package com.rabbitMQ.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-13:12
 */
public class C1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊列名字
        String queueName="queue";
        //實例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設置地址
        connectionFactory.setHost("192.168.128.233");
        //設置端口
        connectionFactory.setPort(5672);
        //設置用戶名
        connectionFactory.setUsername("mowen");
        //設置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創建通道
        Channel channel = connection.createChannel();

        // 創建一個消費者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費收到消息的時候調用的回調
                System.out.println("C3接收到:" + new String(body));
            }
        };

        //把消費著綁定到指定隊列
        //第一個是隊列名
        //第二個是 是否自動確認
        //第三個是消費者
        channel.basicConsume(queueName,true,consumer);

    }
}

運行后輸出為

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

消費者一般都不會關閉,會一直等待隊列消息,可以手動關閉程序。

channel.basicConsume(queueName,true,consumer);中的true為收到消息后自動確認,改為false取消自動確認。

在handleDelivery方法最后面用

channel.basicAck(envelope.getDeliveryTag(),false);

來收到手動確認消息。消費者可以有多個并且可以同時消費一個隊列;

當有多個消費者同時消費同一個隊列時,收到的消息是平均分配的(消費者沒收到之前已經確認每個消費者受到的消息),

但當其中一個消費者性能差的話,會影響其他的消費者,因為還要等它收完消息,這樣會拖累其他消費者。

可以設置channel 的basicQos方法

//設置最多接受消息數量
// 設置了這個參數之后要吧自動確認關掉
channel.basicQos(1);

三、扇形(fanout)交換機

扇形交換機是基本的交換機類型,會把收到的消息以廣播的形式發送到綁定的隊列里,因為不需要經過條件篩選,所以它的速度最快。

在生產者項目新建一個fanout類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class fanout {
    public static void main(String[] args) throws IOException, TimeoutException {
        //交換機名字
        String exchangeName="fanout";
        //交換機名字類型
        String exchangeType="fanout";
        //消息隊列名字
        String queueName1="fanout.queue1";
        String queueName2="fanout.queue2";
        String queueName3="fanout.queue3";
        //實例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設置地址
        connectionFactory.setHost("192.168.128.233");
        //設置端口
        connectionFactory.setPort(5672);
        //設置用戶名
        connectionFactory.setUsername("mowen");
        //設置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創建通道
        Channel channel = connection.createChannel();
        //聲明隊列。
        //參數1:隊列名
        //參數2:持久化 (true表示是,隊列將在服務器重啟時依舊存在)
        //參數3:獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)
        //參數4:當所有消費者客戶端連接斷開時是否自動刪除隊列
        //參數5:隊列的其他參數
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊列綁定到交換機
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        channel.queueBind(queueName3,exchangeName,"");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發布消息
            // 第一個參數為交換機名稱、
            // 第二個參數為隊列映射的路由key、
            // 第三個參數為消息的其他屬性、
            // 第四個參數為發送信息的主體
            channel.basicPublish(exchangeName,"",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運行后在RabbitMQ網頁管理后臺的queue會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

切換到Exchanges會看到一個

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

就是我們聲明的交換機,點擊會看到我們綁定的隊列

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

四、直連(direct)交換機

直連交換機會帶路由功能,隊列通過routing_key與直連交換機綁定,發送消息需要指定routing_key,交換機收到消息時,交換機會根據routing_key發送到指定隊列里,同樣的routing_key可以支持多個隊列。

在生產者項目新建direct類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class direct {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="direct";
        String exchangeType="direct";
        //消息隊列名字
        String queueName1="direct.queue1";
        String queueName2="direct.queue2";
        String queueName3="direct.queue3";
        //實例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設置地址
        connectionFactory.setHost("192.168.128.233");
        //設置端口
        connectionFactory.setPort(5672);
        //設置用戶名
        connectionFactory.setUsername("mowen");
        //設置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創建通道
        Channel channel = connection.createChannel();
        //聲明隊列。
        //參數1:隊列名
        //參數2:持久化 (true表示是,隊列將在服務器重啟時依舊存在)
        //參數3:獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)
        //參數4:當所有消費者客戶端連接斷開時是否自動刪除隊列
        //參數5:隊列的其他參數
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊列綁定到交換機并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"key1");
        channel.queueBind(queueName2,exchangeName,"key2");
        channel.queueBind(queueName3,exchangeName,"key1");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發布消息
            // 第一個參數為交換機名稱、
            // 第二個參數為隊列映射的路由key、
            // 第三個參數為消息的其他屬性、
            // 第四個參數為發送信息的主體
            channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運行后到后臺的queue會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

切換到Exchanges會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

點擊進去

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

五、主題(topic)交換機

主題交換機的routing_key可以有一定的規則,交換機和隊列的routing_key需要采用.#.…..的格式

每個部分用.分開

*代表一個單詞(不是字符)

#代表任意數量(0或n個)單詞

在生產者項目新進topic類

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="topic";
        String exchangeType="topic";
        //消息隊列名字
        String queueName1="topic.queue1";
        String queueName2="topic.queue2";
        String queueName3="topic.queue3";
        //實例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設置地址
        connectionFactory.setHost("192.168.128.233");
        //設置端口
        connectionFactory.setPort(5672);
        //設置用戶名
        connectionFactory.setUsername("mowen");
        //設置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創建通道
        Channel channel = connection.createChannel();
        //聲明隊列。
        //參數1:隊列名
        //參數2:持久化 (true表示是,隊列將在服務器重啟時依舊存在)
        //參數3:獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)
        //參數4:當所有消費者客戶端連接斷開時是否自動刪除隊列
        //參數5:隊列的其他參數
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊列綁定到交換機并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"com.aaa.*");
        channel.queueBind(queueName2,exchangeName,"com.*.topic");
        channel.queueBind(queueName3,exchangeName,"com.bbb.*");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發布消息
            // 第一個參數為交換機名稱、
            // 第二個參數為隊列映射的路由key、
            // 第三個參數為消息的其他屬性、
            // 第四個參數為發送信息的主體
            channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運行后,到后臺queue會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

切換到Exchanges會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

點擊進入會看到

Java操作RabbitMQ添加隊列、消費隊列和三個交換機

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

蓬莱市| 城步| 白河县| 鄄城县| 嘉义市| 安福县| 迭部县| 昔阳县| 津南区| 临夏市| 巴马| 郴州市| 萝北县| 六盘水市| 濉溪县| 顺义区| 中山市| 德昌县| 阳曲县| 萍乡市| 利津县| 祁阳县| 延津县| 稻城县| 凤庆县| 洛浦县| 兴文县| 青神县| 江西省| 朔州市| 玉树县| 吴旗县| 内黄县| 乌鲁木齐县| 通山县| 乌兰浩特市| 获嘉县| 岫岩| 济阳县| 翁牛特旗| 天全县|