中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot整合rockerMQ消息隊列的方法

發布時間:2022-07-27 09:56:16 來源:億速云 閱讀:117 作者:iii 欄目:開發技術

這篇文章主要介紹“SpringBoot整合rockerMQ消息隊列的方法”,在日常操作中,相信很多人在SpringBoot整合rockerMQ消息隊列的方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”SpringBoot整合rockerMQ消息隊列的方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    Springboot整合RockerMQ

    1、maven依賴

    <dependencies>
        <!-- springboot-web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    2、yml配置文件

    rocketmq:
      ###連接地址nameServer
      name-server: www.kaicostudy.com:9876;
      producer:
        group: kaico_producer
    server:
      port: 8088

    3、生產者

    @RequestMapping("/sendMsg")
        public String sendMsg() {
            OrderEntity orderEntity = new OrderEntity("123456","騰訊視頻會員");
            SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);
            System.out.println("返回發送消息狀態:" + kaicoTopic);
            return "success";
        }

    4、消費者

    @Service
    @RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag1", consumerGroup = "kaico_consumer", messageModel = MessageModel.CLUSTERING)
    public class OrdeConsumer2 implements RocketMQListener<OrderEntity> {
        @Override
        public void onMessage(OrderEntity o) {
            System.out.println("kaico_consumer2消費者接收對象:" + o.toString());
        }
    }

    使用總結

    消費模式

    集群消費
    當 consumer 使用集群消費時,每條消息只會被 consumer 集群內的任意一個 consumer 實例消費一次。
    同時記住一點,使用集群消費的時候,consumer 的消費進度是存儲在 broker 上,consumer 自身是不存儲消費進度的。消息進度存儲在 broker 上的好處在于,當你 consumer 集群是擴大或者縮小時,由于消費進度統一在broker上,消息重復的概率會被大大降低了。
    注意: 在集群消費模式下,并不能保證每一次消息失敗重投都投遞到同一個 consumer 實例。

    注解配置:messageModel = MessageModel.CLUSTERING

    廣播消費
    當 consumer 使用廣播消費時,每條消息都會被 consumer 集群內所有的 consumer 實例消費一次,也就是說每條消息至少被每一個 consumer 實例消費一次。
    與集群消費不同的是,consumer 的消費進度是存儲在各個 consumer 實例上,這就容易造成消息重復。還有很重要的一點,對于廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關注消費失敗的情況。
    雖然廣播消費能保證集群內每個 consumer 實例都能消費消息,但是消費進度的維護、不具備消息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用集群消費,因為集群消費不僅擁有消費進度存儲的可靠性,還具有消息重投的機制。而且,我們通過集群消費也可以達到廣播消費的效果。

    注解配置:messageModel = MessageModel.BROADCASTING

    生產者組和消費者組

    生產者組
    一個生產者組,代表著一群topic相同的Producer。即一個生產者組是同一類Producer的組合。

    如果Producer是TransactionMQProducer,則發送的是事務消息。如果節點1發送完消息后,消息存儲到broker的Half Message Queue中,還未存儲到目標topic的queue中時,此時節點1崩潰,則可以通過同一Group下的節點2進行二階段提交,或回溯。

    使用時,一個節點下,一個topic會對應一個producer

    消費者組
    一個消費者組,代表著一群topic相同,tag相同(即邏輯相同)的Consumer。通過一個消費者組,則可容易的進行負載均衡以及容錯

    使用時,一個節點下,一個topic加一個tag可以對應一個consumer。一個消費者組就是橫向上多個節點的相同consumer為一個消費組。

    首先分析一下producer。習慣上我們不會創建多個訂閱了相同topic的Producer實例,因為一個Producer實例發送消息時是通過ExecutorService線程池去異步執行的,不會阻塞完全夠用,如果創建了多個相同topic的Producer則會影響性能。而Consumer則不同。消息會在一topic下會細分多個tag,需要針對tag需要針對不同的tag創建多個消費者實例。

    注意:多個不同的消費者組訂閱同一個topic、tag,如果設定的是集群消費模式,每一個消費者組中都會有一個消費者來消費。也就是說不同的消費者組訂閱同一個topic相互之間是沒有影響的。

    生產者投遞消息的三種方式

    同步: 發送消息后需等待結果,消息的可靠性高發送速度慢;

     SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);

    異步: 消息發送后,回調通知結果,消息發送速度快,消息可靠性低;

     //異步發送
            rocketMQTemplate.asyncSend("kaicoTopic" + ":" + "tag1", orderEntity, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("異步發送消息成功");
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("異步發送消息失敗");
                }
            });

    單向(oneway):消息發送后,不關心結果,發送速度最快,消息可靠性最差,適用于在大量日志數據和用戶行為數據等場景發送數據。

    //單向(oneway)發送
            rocketMQTemplate.sendOneWay("kaicoTopic"+":"+"tag1", orderEntity);

    如何保證消息不丟失

    主要三個步驟
    1、生產者保證消息發送成功
    采用同步發送消息的方式,發送消息后有返回結果,保證消息發送成功。(代碼見上面)
    返回四個狀態

    • SEND_OK:消息發送成功。需要注意的是,消息發送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。

    • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。

    • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。

    • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。

    2、rocketMQ將消息持久化,保證宕機后消息不會丟失。持久化策略(刷盤策略)

    • 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。

    • 同步刷盤:消息寫入內存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:

    flushDiskType=SYNC_FLUSH

    3、Broker 多副本和高可用
    Broker 為了保證高可用,采用一主多從的方式部署。
    消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。

    這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:
    brokerRole=SYNC_MASTER

    改為同步復制后,消息復制流程如下:

    • slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;

    • master 收到 slave 發送的 offset 后,將 offset 后面的消息批量發送給 slave;

    • slave 把收到的消息寫入 commitLog 文件,并給 master 發送新的 offset;

    • master 收到新的 offset 后,如果 offset >= producer 發送消息后的 offset,給 Producer 返回 SEND_OK。

    4、消費者保證消息消費成功
    消費者消費消息后,如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。

    @Service
    public class NoSpringBootOrderConsumer {
        private DefaultMQPushConsumer defaultMQPushConsumer;
        @Value("${rocketmq.name-server}")
        private String namesrvAddr;
        protected String consumerGroup;
        protected String topic;
        protected String topicTag;
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
        public void setConsumerGroup(String consumerGroup) {
            this.consumerGroup = consumerGroup;
        }
        public void setTopic(String topic) {
            this.topic = topic;
        }
        public void setTopicTag(String topicTag) {
            this.topicTag = topicTag;
        }
        public static String encoding = System.getProperty("file.encoding");
        /*
         * @Author ex_fengkai
         * @Description //TODO 初始化數據(消費者組名稱、topic、topic的tag、nameServer的信息)
         * @Date 2020/11/9 14:36
         * @Param []
         * @return void
         **/
        private void initParam() {
            this.consumerGroup = "kaico_consumer3";
            this.topic = "kaicoTopic";
            this.topicTag = "tag1";
            this.setNamesrvAddr(namesrvAddr);
        }
        @PostConstruct
        private void init() throws InterruptedException, MQClientException {
            initParam();
            // ConsumerGroupName需要由應用來保證唯一,用于把多個Consumer組織到一起,提高并發處理能力
            defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
            defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); //設置nameServer服務器
            defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
            defaultMQPushConsumer.setVipChannelEnabled(false);
            // 設置Consumer第一次啟動是從隊列頭部開始消費
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 訂閱指定Topic下的topicTag
            System.out.println("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag);
            defaultMQPushConsumer.subscribe(topic, topicTag);
            // 設置為集群消費
            defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
            // 通過匿名消息監聽處理消息消費
            defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                // 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) {
                        // 執行topic下對應tag的消費邏輯
                        try {
                            onMessage(new String(msg.getBody(),"utf-8"));
    
                        } catch (UnsupportedEncodingException e) {
                            System.out.println("系統不支持消息編碼格式:" + encoding);
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    
                        } catch (Exception e) {
                            System.out.println("消息處理異常");
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                        System.out.println("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!");
                    }
                    // 如果沒有return success ,consumer會重新消費該消息,直到return success
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // Consumer對象在使用之前必須要調用start初始化,初始化一次即可
            defaultMQPushConsumer.start();
            System.out.println("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + "  start success!");
        }
        @PreDestroy
        public void destroy() {
            defaultMQPushConsumer.shutdown();
        }
        private void onMessage(String s) {
            System.out.println(consumerGroup + "用spring的方式的消費者消費:" + s);
        }
    }

    Consumer 重試
    Consumer 消費失敗,這里有 3 種情況:

    • 返回 RECONSUME_LATER

    • 返回 null

    • 拋出異常

    Broker 收到這個響應后,會把這條消息放入重試隊列,重新發送給 Consumer。

    注意:Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好冪等處理。

    順序消息

    • 生產者投遞消息根據key投遞到同一個隊列中存放

    • 消費者應該訂閱到同一個隊列實現消費

    • 最終應該使用同一個線程去消費消息(不能夠實現多線程消費。)

    生產者代碼

    //發送順序消息
        @RequestMapping("/sendMsg1")
        public String sendMsg1() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Long orderId = System.currentTimeMillis();
            String insertSql = getSqlMsg("insert", orderId);
            String updateSql = getSqlMsg("update", orderId);
            String deleteSql = getSqlMsg("delete", orderId);
            Message insertMsg = new Message("kaicoTopic", "tag6", insertSql.getBytes());
            Message updateMsg = new Message("kaicoTopic", "tag6", updateSql.getBytes());
            Message deleteMsg = new Message("kaicoTopic", "tag6", deleteSql.getBytes());
            DefaultMQProducer producer = rocketMQTemplate.getProducer();
            rocketMQTemplate.getProducer().send(insertMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            // 該消息存放到隊列0中
                            return  mqs.get(0);
                        }
                    }, orderId);
            rocketMQTemplate.getProducer().send(updateMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            // 該消息存放到隊列0中
                            return mqs.get(0);
                        }
                    }, orderId);
            rocketMQTemplate.getProducer().send(deleteMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            // 該消息存放到隊列0中
                            return  mqs.get(0);
                        }
                    }, orderId);
            return orderId + "";
        }

    消費者代碼

    @Service
    @RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag6", consumerGroup = "kaico_consumer1",
            messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
    public class OrdeConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt msg) {
            System.out.println(Thread.currentThread().getName() + "-kaico_consumer1消費者接收對象:隊列" + msg.getQueueId()
                    + "=消息:" +  new String(msg.getBody()));
        }
    }

    分布式事務

    SpringBoot整合rockerMQ消息隊列的方法

    實現思路

    • 生產者(發送方)投遞事務消息到Broker中,設置該消息為半消息 不可以被消費;

    • 開始執行我們的本地事務,將本地事務執行的結果(回滾或者提交)發送給Broker

    • Broker獲取回滾或者提交,如果是回滾的情況則刪除該消息、如果是提交的話,該消息就可以被消費者消費;

    • Broker如果沒有及時的獲取發送方本地事務結果的話,會主動查詢本地事務結果。

    1、生產者發送事務消息sendMessageInTransaction

     public String saveOrder() {
         // 提前生成我們的訂單id
         String orderId = System.currentTimeMillis() + "";
         /**
          * 1.提前生成我們的半消息
          * 2.半消息發送成功之后,在執行我們的本地事務
          */
         OrderEntity orderEntity = createOrder(orderId);
         String msg = JSONObject.toJSONString(orderEntity);
         MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
         stringMessageBuilder.setHeader("msg", msg);
         Message message = stringMessageBuilder.build();
         // 該消息不允許被消費者消費,生產者的事務邏輯代碼在生產者事務監聽類中executeLocalTransaction方法中執行。
         rocketMQTemplate.sendMessageInTransaction("kaicoProducer",
                 "orderTopic", message, null);
         return orderId;
    
    }

    2、事務監聽類

    @Slf4j
    @Component
    @RocketMQTransactionListener(txProducerGroup = "kaicoProducer") //這個mayiktProducer生產者的事務管理
    public class SyncProducerListener implements RocketMQLocalTransactionListener {
        @Autowired
        private OrderMapper orderMapper;
    
        @Autowired
        private TransationalUtils transationalUtils;
        /**
         * 執行我們訂單的事務
         * @param msg
         * @param arg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
            //拿到消息
            Object object = headers.get("msg");
            if (object == null) {
                return null;
            }
            String orderMsg = (String) object;
            OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
            TransactionStatus begin = null;
            try {
                begin = transationalUtils.begin();
                int result = orderMapper.addOrder(orderEntity);
                transationalUtils.commit(begin);
                if (result <= 0) {
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
                // 告訴我們的Broke可以消費者該消息
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                if (begin != null) {
                    transationalUtils.rollback(begin);
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
            //add.Order
            return null;
        }
        /**
         * 提供給我們的Broker定時檢查
         * @param msg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            Object object = headers.get("msg");
            if (object == null) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            String orderMsg = (String) object;
            OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
            String orderId = orderEntity.getOrderId();
            // 直接查詢我們的數據庫
            OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
            if (orderDbEntity == null) {
                //不確認,繼續重試
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            //提交事務
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

    3、消費者消費消息

    @Service
    @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "kaicoTopic")
    public class OrdeConsumer implements RocketMQListener<String> {
        @Autowired
        private DispatchMapper dispatchMapper;
        @Override
        public void onMessage(String msg) {
            OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
            String orderId = orderEntity.getOrderId();
            // 模擬userid為=123456
            DispatchEntity dispatchEntity = new DispatchEntity(orderId, 123456L);
            dispatchMapper.insertDistribute(dispatchEntity);
        }
    }

    到此,關于“SpringBoot整合rockerMQ消息隊列的方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    独山县| 遂宁市| 普安县| 卢龙县| 新沂市| 山西省| 平谷区| 香格里拉县| 梨树县| 新营市| 越西县| 龙海市| 宁安市| 合江县| 张北县| 冕宁县| 东乌珠穆沁旗| 柏乡县| 兴海县| 汝南县| 东光县| 咸丰县| 白银市| 东港市| 泰和县| 临高县| 苗栗市| 禄劝| 若羌县| 巴马| 巴林右旗| 永寿县| 洪洞县| 阜宁县| 三门县| 深州市| 理塘县| 宁国市| 手游| 康定县| 灯塔市|