中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMq的5種開發方案是什么

發布時間:2021-10-20 10:39:22 來源:億速云 閱讀:136 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關RabbitMq的5種開發方案是什么,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

1、安裝

    windows安裝rabbitmq

    mac安裝rabbitmq 需要注意的是,mac安裝rabbitmq,啟動的時候命令前,需要加 sudo,不然會報錯誤 

RabbitMq的5種開發方案是什么

2、rabbitmq 開發概念詞

RabbitMq的5種開發方案是什么

    2.1 Producer(生產者)

    2.2 Consumer(消費者)

    2.3 Exchange(交換機)

    2.4 Queue(隊列)

    2.5 rountingKey(交換機與隊列之間的關系)

3、RabbitMQ開發方案

官網的6中模式,可以點開這個網址,顯示6中模式,第6中模式RPC遠程調用我們不需要用該模式,所以我們只要關注前五種就可以了。

接下來我們就直接簡單教學rabbitmq的簡單使用

3.0 代碼講解
/**
 * 聲明隊列,五個參數列表,如果直接使用默認channel.queueDeclare("queue"),那么其他參數都會自動默認設置屬性,所以一般我們幾乎都默認它
 * String queue  隊列名稱
 * boolean durable 隊列是需要持久化,意思就是rabbitmq重啟的時候,如果不是持久化,那么該隊列就會消失,默認true
 * boolean exclusive 如果你想創建一個只有自己可見的隊列,不允許其它用戶訪問RabbitMQ允許你將一個Queue聲明成為排他性的,只對首次聲明它的連接(Connection)可見,會在其連接斷開的時候自動刪除。所以我們開發中一般不需要此操作,默認false
 * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重啟的時候,如果為true,那么關閉期間接受的消息就會自動消失,默認false
 * Map<String, Object> arguments 這個參數我們只會在使用延遲隊列中才會用到,就是延遲隊列的相關配置等屬性
 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null);
 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * 綁定隊列到交換機 
 * String queue 隊列名
 * String exchange 交換機名
 * String routingKey 綁定關系 如 大頭兒子,小頭爸爸 他們的rountingKey就是父子
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
3.1 pom.xml

版本隨意,本博客任何版本高版本也行。

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.7</version>
</dependency>
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.3.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>1.4.0.RELEASE</version>
</dependency>
3.2 工具類準備
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //設置賬號信息,用戶名、密碼、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}
3.3 簡單模式(simple)

理解:該rabbit服務器只有一個單一的隊列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創建通道
        Channel channel = connection.createChannel();
        // 聲明(創建)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和連接
        channel.close();
        connection.close();
    }
}
public class Recv {

    private final static String QUEUE_NAME = "mujiutian_queue";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //創建channel
        Channel channel = connection.createChannel();
        //創建隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監聽隊列,發送消息
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取消息,該線程一直進行
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("獲取到消息"+message);
        }
    }
}

先打開rece的main函數,這樣就可以執行send發送消息。

3.4 工作模式(work)

理解:rabbit服務器有一個exchange(交換機),該交換機下有兩個隊列,總共發送了100條消息,A隊列效率高可以搶到80條消息給消費者,B隊列只能搶到20條消息給發送者,他們的總和是100條,為工作模式。

先執行,先執行消費者main函數,在看結果圖:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(10);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到消息'" + message + "'");
            // 返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

如圖:

RabbitMq的5種開發方案是什么

RabbitMq的5種開發方案是什么

// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);

跟這個有關系,這也就是我們所說的工作模式

3.5 訂閱模式(Publish/Subribe)

理解:就是群發,該交換機下的所有隊列,都會接受相同的所有交換機發來的消息,類似于qq群一樣

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
3.6 路由模式(Routing)

理解就是:你是人,如果性別是女,請進女廁,是男性,請進男廁,如同,一個交換機下面的多個隊列,根據rountingKey判斷該接受的消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
3.7 通配符模式(Topics)

理解就是:路徑aas.# A隊列接受這種所有路徑,B隊列接受路徑下的所有隊列aab.#

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

關于RabbitMq的5種開發方案是什么就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

安庆市| 石河子市| 高州市| 柳林县| 文水县| 吕梁市| 商洛市| 泸定县| 蚌埠市| 通江县| 镇平县| 工布江达县| 河南省| 博乐市| 阿坝县| 西城区| 邹平县| 武宣县| 什邡市| 罗平县| 正蓝旗| 于都县| 社旗县| 郎溪县| 颍上县| 清水河县| 溧水县| 石狮市| 霍林郭勒市| 宁化县| 东宁县| 密云县| 兴业县| 葫芦岛市| 万盛区| 清流县| 平凉市| 泰和县| 丹凤县| 黄浦区| 灵宝市|