中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ事務消息怎樣實現

發布時間:2021-12-18 11:18:19 來源:億速云 閱讀:134 作者:小新 欄目:大數據

這篇文章主要介紹RocketMQ事務消息怎樣實現,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

RocketMQ4.3.0版本開始支持事務消息,后續分享將開始將剖析事務消息的實現原理。首先從官方給出的Demo實例入手,以此通往RocketMQ事務消息的世界中。

官方版本未發布之前,從apache rocketmq第一個版本上線后,代碼中存在與事務消息相關的代碼,例如COMMIT、ROLLBACK、PREPARED,在事務消息未開源之前網上對于事務消息的“聲音”基本上是使用類似二階段提交,主要是根據消息系統標志MessageSysFlag中定義來推測的:

  • TRANSACTION_PREPARED_TYPE

  • TRANSACTION_COMMIT_TYPE

  • TRANSACTION_ROLLBACK_TYPE

消息發送者首先發送TRANSACTION_PREPARED_TYPE類型的消息,然后根據事務狀態來決定是提交或回滾事務發送commit請求或rollback請求,如果commit/rollback請求丟失后,rocketmq會在指定超時時間后回查事務狀態來決定提交或回滾事務。

讓我們各自帶著自己的理解和猜測,從閱讀RocketMQ官方提供的Demo程序入手,試圖窺探一些大體的信息。

Demo示例程序位于:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。該包中未放置消息消費者,為了驗證事務的消息消費情況,我們可以從其他包copy一個消費者,從而先運行生產者,然后運行消費者,判斷事務消息的預發放、提交、回滾等效果,二話不說,先運行一下,看下效果再說:
消息發送端運行結果:

SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]

消息消費端效果:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='C0A8010518DC6D06D69C8D5768640007'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='C0A8010518DC6D06D69C8D57680F0001'}]]

綜上所述,服務端發送了10條消息,而消費端只收到3條消息,應該是由于事務回滾,造成只提交了3條消息,為了更加嚴謹,可以安裝一個rocketmq-consonse,更加直觀的觀察shangshagn's上述結果:
RocketMQ事務消息怎樣實現cdn.com/97db9b9f7bc18478739f472e134cf48eb3af8cad.png">

接下來對示例代碼進行解讀:

1、生產者端代碼解讀:

public class TransactionProducer {    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();        // @1
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");                return thread;
            }
        });      // @2
        producer.setExecutorService(executorService);                                // @3
        producer.setTransactionListener(transactionListener);                      // @4
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};        for (int i = 0; i < 10; i++) {                                                                    // @5
            try {
                Message msg =                    new Message("transaction_topic_test", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }        for (int i = 0; i < 100000; i++) {     //這里只是阻止生產者過早退出,導致事務消息的相關機制無法運行
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

代碼@1:創建TransactionListener 實例,字面理解為事務消息事件監聽器,下文詳細對其進行展開。
代碼@2:ExecutorService executorService,創建一個線程池,其線程的名稱前綴”client-transaction-msg-check-thread“,從字面理解為客戶端事務消息狀態檢測線程,我們可以大膽的猜測一下是不是這個線程池調用TransactionListener方法,完成對事務消息的檢測呢?【這里只是作者的猜測,大家不能當真,在作者后續文章發布后,如果該觀點錯誤,會加以修復,這里寫出來,主要是想分享一下我讀源碼的方法】。
代碼@3:為事務消息發送者設置線程池。
代碼@4:為事務消息發送者設置事務監聽器。
代碼@5:發送10條消息。

2、TransactionListener代碼解讀

public class TransactionListenerImpl implements TransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        int value = transactionIndex.getAndIncrement();        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);        return LocalTransactionState.UNKNOW;
    }    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());        if (null != status) {            switch (status) {                case 0:                    return LocalTransactionState.UNKNOW;                case 1:                    return LocalTransactionState.COMMIT_MESSAGE;                case 2:                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  1. executeLocalTransaction方法:記錄本地事務的事務狀態,這里其實現就是循環設置事務消息的狀態為0,1,2,demo中是把消息的狀態數據存放在一個Map中。實際應用時通常會持久化消息的事務狀態,例如數據庫或緩存。

  2. checkLocalTransaction方法,事務回查業務實現,查本地事務表,判斷事務的狀態如為0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。這里就能解釋,生產者連續發10條消息,因為只有3條消息的事務狀態為COMMIT_MESSAGE,故消息消費者只能消費3條。

以上是“RocketMQ事務消息怎樣實現”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

永和县| 滦平县| 三台县| 周口市| 江门市| 普宁市| 桂平市| 肥城市| 寻乌县| 古交市| 洱源县| 城步| 太白县| 潼关县| 庐江县| 崇州市| 章丘市| 灌阳县| 凤台县| 陆河县| 宜良县| 灵台县| 凌源市| 肃北| 巧家县| 囊谦县| 城固县| 永昌县| 容城县| 永胜县| 霸州市| 庆城县| 都江堰市| 习水县| 西华县| 昌黎县| 周口市| 德阳市| 元氏县| 咸宁市| 旺苍县|