中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ普通消息同步發送怎么實現

發布時間:2022-08-23 11:17:37 來源:億速云 閱讀:125 作者:iii 欄目:開發技術

本篇內容介紹了“RocketMQ普通消息同步發送怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

普通消息同步發送

同步消息是指發送出消息后,同步等待,直到接收到Broker發送成功的響應才會繼續發送下一個消息。這個方式可以確保消息發送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發送方式
        SendResult send = producer.send(msg);
        //確認返回
        System.out.println(send);
    }
    //關閉producer
    producer.shutdown();
}

普通消息異步發送

異步消息發送方在發送了一條消息后,不等接收方發回響應,接著進行第二條消息發送。發送方通過回調接口的方式接收服務器響應,并對響應結果進行處理。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback會接收異步返回結果的回調
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是過早關閉producer,會拋出The producer service state not OK, SHUTDOWN_ALREADY的錯
    Thread.sleep(10000);
    //關閉producer
    producer.shutdown();
}

普通消息單向發送

單項發送不關心發送的結果,只發送請求不等待應答。發送消息耗時極短。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發送方式
        producer.sendOneway(msg);
    }
    //關閉producer
    producer.shutdown();
}

集群消費模式

消費者采用負載均衡的方式消費消息,同一個Group下的多個Consumer共同消費Queue里的Message,每個Consumer處理的消息不同。

一個Consumer Group中的各個Consumer實例分共同消費消息,即一條消息只會投遞到一個Group下面的一個實例,并且只消費一遍。

例如某個Topic有3個隊列,其中一個Consumer Group 有 3 個實例,那么每個實例只消費其中的1個隊列。集群消費模式是消費者默認的消費方式。

public static void main(String[] args) throws Exception {
    //實例化消息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注冊回調實現類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該消息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者實例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

廣播消費模式

廣播消費模式中把消息對一個Group下的各個Consumer實例都投遞一遍。也就是說消息也會被 Group 中的每個Consumer都消費一次。

實際上,是一個消費組下的每個消費者實例都獲取到了topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例。

public static void main(String[] args) throws Exception {
    //實例化消息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注冊回調實現類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該消息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者實例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

“RocketMQ普通消息同步發送怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

灵璧县| 甘洛县| 调兵山市| 林甸县| 玉环县| 万安县| 砀山县| 丰宁| 万年县| 东丰县| 博野县| 大同县| 财经| 家居| 唐山市| 肃宁县| 东光县| 云霄县| 奉化市| 塔城市| 宝丰县| 大兴区| 元江| 宣化县| 天镇县| 同江市| 太和县| 临泉县| 五家渠市| 资讯| 宕昌县| 博兴县| 松原市| 新闻| 天全县| 平原县| 广灵县| 兴仁县| 东山县| 郴州市| 沙河市|