中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java高并發熱點數據更新問題怎么解決

發布時間:2023-04-26 16:33:27 來源:億速云 閱讀:111 作者:iii 欄目:開發技術

今天小編給大家分享一下java高并發熱點數據更新問題怎么解決的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

    mysql update的時候到底是鎖行還是鎖表?

    InnoDb 鎖簡單分類

    按照數據操作的粒度

    1)行級鎖:鎖住記錄行

    2)表級鎖:鎖住整張表

    按照對數據操作的類型

    1)讀鎖(共享鎖):針對同一份數據,多個讀操作可以同時進行而不會互相影響。  

     2) 寫鎖(排它鎖):當前操作沒有完成之前,它會阻斷其他寫鎖和讀鎖。

    mysql 在update時會根據where 條件的類型決定鎖行還是鎖表 where的過濾條件列,如果用索引,鎖行,無法用索引,鎖表。按照索引規則,如果能使用索引,鎖行,不能使用索引,鎖表。 行鎖是排他鎖,當一條記錄已經被一條update語句鎖住時會阻斷其他的update操作,在高并發場景下,對于熱點數據來說會進行頻繁的更新操作造成其他update操作鎖等待超時請求失敗

    背景

    以旅游支付場景為例,伴隨著業務量的增加,系統的并發量會逐漸上升,例如“北京長城度假區”的賬戶流水會變得十分頻繁,每次支付或者退款操作都需要去更新一下賬戶余額,并發較低時并不會有什么問題,但當旅游高峰期到來時并發量上升,數據庫更新的時候需要獲得數據行鎖,在未釋放這個行鎖之前,其他事務只能是等待。

    解決方案

    1.支付時異步入賬,退款增加一個欠款墊資戶

    用戶支付入款需要給賬戶加錢時此時商戶對于資金的實時性要求不高,追求準確性,因此可以將賬戶加款放到異步線程池,達到錯峰的目的 然而當用戶發起退款時,我們必須及時并且準確的從賬戶扣款,因此退款采取同步進行,退款的訂單相對于支付來說量就會少很多,滿足要求。但是存在一個問題高并發狀態下某一個熱點賬戶余額時刻在變很有可能退款發起時賬戶余額充足但是實際扣除時由于上一筆支付未入賬,造成金額不足

    1.加分布式鎖

    對特定賬戶加鎖,保證某一刻只有一筆退款請求獲得該賬戶的操作權 弊端:多個用時同時退款時只有一筆成功,對用戶不友好,pass掉

    2.新增墊資商戶

    熱點賬戶增加一個指定透支額度的墊資戶,實際賬戶余額不足時從墊資戶借款,然后定期核對墊資戶透支額度從實際賬戶一次性扣款, 推薦

    2.合并請求

    合并多條需要更新余額的請求

    將一段時間內的請求,先進行阻塞,合并各個賬戶需要更新的金額,一次性處理,然后將結果拆分,喚醒被阻塞的請求

    java高并發熱點數據更新問題怎么解決

    demo實現

     * @Author: xiaokunkun
     * @CreateTime: 2023-04-23  14:37
     * @Description: 合并更新,可以不捕捉異常報錯后外層調用方直接捕獲異常事務回滾
     */
    @Service
    public class CommodityAmountService {
        class Request {
            AcctUpdateDto acctUpdateDto;
            //預留字段 可不使用
            String atomCode;
            //暫定返回結果為true或者false
            CompletableFuture<Boolean> future; // 接受結果
        }
     
        // 積攢請求(每隔N毫秒批量處理一次)
        LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
     
        // 定時任務的實現,N秒鐘處理一次數據
        @PostConstruct
        public void init() {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                // 1、取出queue的請求,生成一次合并更新
                int size = queue.size();
                if (size == 0) {
                    return;
                }
                ArrayList<Request> requests = new ArrayList<>();
                for (int i = 0; i < size; i++) {
                    //隊列出棧
                    Request request = queue.poll();
                    requests.add(request);
                }
                System.out.println("批量處理數據量:" + size);
                // 2、組裝一個合并更新 key為賬戶value為sum(amount)
                Map<String, Long> amountMap = new HashMap<>();
                ArrayList<String> commodityCodes = new ArrayList<>();
                for (Request request : requests) {
                    //todo 根據accountNo分組
                }
                for (String key : amountMap.keySet()) {
                    Long amount = amountMap.get(key);
                    //update mysql
                }
                // 3、將結果響應 分發給每一個單獨的用戶請求。由定時任務處理線程 --> n個用戶的請求線程
                for (Request request : requests) {
                    // 將結果返回到對應的請求線程,只要不報錯此批次全部返回true,否則false
                    request.future.complete(true);
                }}, 0, 1000, TimeUnit.MILLISECONDS);
        }
     
        @Autowired
        CommodityRemoteService commodityRemoteService;
     
        // 合并金額并更新,多個用戶請求
        public Boolean updateMergeAmount(String movieCode)
                throws ExecutionException, InterruptedException {
            // 并非立刻發起接口調用,請求收集起來,再進行
            Request request = new Request();
            request.atomCode = movieCode;
            // 異步編程:獲取異步處理的結果
            CompletableFuture<Boolean> future = new CompletableFuture<>();
            request.future = future;
            queue.add(request);
            return future.get(); // 此處get方法,會阻塞線程運行,直到future有返回
        }
    }

    測試類:

    //模擬500的并發量
    public void updateMerge() {
        AcctCmdDriver acctCmdDriver = new AcctCmdDriver();
        TradeAccntOrderDetail detail = new TradeAccntOrderDetail();
        AcctNoInfo acctNoInfo = new AcctNoInfo();
        OrderConsist consistForOrder = OrderConsist.newInstance("0200_202304", "trade_accnt_merchant_order");
        detail.setConsistForOrder(consistForOrder);
        acctCmdDriver.setDetail(detail);
        acctCmdDriver.setAcctNoInfo(acctNoInfo);
        detail.setAccountCategory(AccountCategoryEnum.MERCHANT);
        detail.setAccountNo("02020001010000262977202304");
        detail.setAmount(6l);
        System.out.println("start build thread" + acctCmdDriver);
        Random rand = new Random();
        for (int i = 1; i <= 500; i++) {
            final String index = "code_" + i;
            Thread thread = new Thread(() -> {
                try {
                    System.out.println("amount is:" + detail.getAmount());
                    countDownLatch.await();
                    Thread.sleep(rand.nextInt(150));
                    Boolean res = updateMergeAmountService.mergeUpdate(acctCmdDriver);
                    System.out.println("current i" + index + "res:" + res);
                } catch (InterruptedException e) {
                    System.out.println("thread error is:" + e);
                }
            });
            thread.start();
            // 啟動后,倒計時器倒計數減一,代表又有一個線程就緒了
            countDownLatch.countDown();
        }
    }

    以上就是“java高并發熱點數據更新問題怎么解決”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    体育| 呼和浩特市| 扎赉特旗| 吉安市| 东平县| 韶关市| 通江县| 贡嘎县| 新密市| 蒙城县| 甘谷县| 正安县| 牡丹江市| 望江县| 台湾省| 安乡县| 阿拉善左旗| 定日县| 常熟市| 兰考县| 陕西省| 延庆县| 紫金县| 四会市| 香格里拉县| 克什克腾旗| 开封县| 济阳县| 博爱县| 克山县| 通化市| 明星| 清新县| 安化县| 阿城市| 三河市| 剑川县| 封丘县| 牙克石市| 丹寨县| 芜湖市|