中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何整合RocketMQ事務消息

發布時間:2021-06-18 15:05:02 來源:億速云 閱讀:193 作者:Leah 欄目:大數據

今天就跟大家聊聊有關如何整合RocketMQ事務消息,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

一、 選擇RocketMQ原因

ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ選型

二、 整合思路

RocketMQ提供了事務消息回查,查看官方Demo

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Value("${demo.rocketmq.transTopic}")
    private String springTransTopic;
    
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // Send transactional messages
        testTransaction();
    }


    private void testTransaction() throws MessagingException {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {

                Message msg = MessageBuilder
                                        .withPayload("Hello RocketMQ " + i)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)
                                        .build();
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
                                                                        springTransTopic + ":" + tags[i % tags.length],
                                                                        msg,
                                                                        null);
                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                                    msg.getPayload(),
                                    sendResult.getSendStatus());

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(transId, status);
            if (status == 0) {
                // Return local transaction with success(commit), in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (status == 1) {
                // Return local transaction with failure(rollback) , in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
            Integer status = localTrans.get(transId);
            if (null != status) {
                switch (status) {
                    case 0:
                        retState = RocketMQLocalTransactionState.UNKNOWN;
                        break;
                    case 1:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                    case 2:
                        retState = RocketMQLocalTransactionState.ROLLBACK;
                        break;
                }
            }
            System.out.printf("------ !!! checkLocalTransaction is executed once," +
                    " msgTransactionId=%s, TransactionState=%s status=%s %n",
                transId, retState, status);
            return retState;
        }
    }

}

需要在testTransaction()中發送消息,然后在TransactionListenerImpl類中實現executeLocalTransaction()方法才能執行整個本地事務,然后在checkLocalTransaction()中實現事務消息回查。

查看源代碼可以知道testTransaction()方法和executeLocalTransaction()是在同一個線程當中,只不過包裝RocketMQTemplate中。

三、問題和解決方法

3.1事務消息面臨的幾個問題:

  1. 消息發送的事務消息回調查詢和本地事務沒嚴格的先后順序,怎么保證,回查時,事務操作肯定已經完成。

  2. 事務消息回調使用transaction_id查詢,那么transaction_id存放在哪里,同時保證transaction_id關聯的業務操作執行成功。

  3. 怎么把事務回調查詢操作隔離出業務,保證不侵入代碼中。

  4. 下游消費者怎么保證接口冪等性。

  5. 下游消費者怎么提高冪等性查詢性能。

  6. 怎么把冪等性操作隔離出業務,保證不侵入代碼中。

3.2 解決方法

  1. 因為數據庫或者其他業務操作可能會存在延時,那么不能保證回查時業務操作已完成,那么可以多次回查,并設置最大回查次數,同時不能丟棄MQ消息持久化,方便手動恢復。

  2. 可以使用本地消息表落地的發送消息,同時可以采用切面、繼承等等方式將落地消息隔離出業務代碼之外,保證本地消息落庫不侵入,注意必須要保證本地消息落庫和本地業務落庫在同一個事務之內!

  3. 事務消息回查可以使用第2點的本地消息表,根據transaction_id查詢,判斷本地事務的執行結果,也和第2點一樣,可以使用一些方式將事務消息回查代碼隔離出業務代碼,保證不侵入。

  4. 冪等性的方法:

    • 數據庫唯一約束

    • 狀態機CAS單向流轉

    • 消息去重表

  5. ,在執行本地業務前,先對redis判斷是業務id是否存在,存在則直接返回消費成功,在執行本地業務之后,可以將消費信息異步落地到redis當中。注意:需要保證本地業務和消息冪等性操作在同一個事務當中,同時redis落地操作在事務之外。

  6. 比較好的方案應該是數據庫唯一約束 + 消息去重表,在消息去重表中對業務id設置唯一約束,同時將消息落地操作隔離出本地業務之外,保證不侵入。

  7. 定時清理歷史的本地消息表(消息去重表)。

看完上述內容,你們對如何整合RocketMQ事務消息有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平顺县| 灵川县| 呼图壁县| 达日县| 甘泉县| 茌平县| 无锡市| 精河县| 涟源市| 长岛县| 蛟河市| 博罗县| 冷水江市| 鸡泽县| 新蔡县| 昌都县| 同心县| 杨浦区| 平和县| 广汉市| 大关县| 西畴县| 温宿县| 高陵县| 夹江县| 鲁山县| 柳河县| 罗城| 波密县| 宾阳县| 民和| 项城市| 乐安县| 辽阳县| 吉隆县| 阳城县| 中方县| 勐海县| 柏乡县| 南城县| 察雅县|