中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中CountDownLatch異步轉同步工具類的示例分析

發布時間:2021-06-30 14:00:09 來源:億速云 閱讀:353 作者:小新 欄目:開發技術

小編給大家分享一下Java中CountDownLatch異步轉同步工具類的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

使用場景

由于公司業務需求,需要對接socket、MQTT等消息隊列。
眾所周知 socket 是雙向通信,socket的回復是人為定義的,客戶端推送消息給服務端,服務端的回復是兩條線。無法像http請求有回復。
下發指令給硬件時,需要校驗此次數據下發是否成功。
用戶體驗而言,點擊按鈕就要知道此次的下發成功或失敗。

Java中CountDownLatch異步轉同步工具類的示例分析

如上圖模型,

第一種方案使用Tread.sleep
優點:占用資源小,放棄當前cpu資源
缺點: 回復速度快,休眠時間過長,仍然需要等待休眠結束才能返回,響應速度是固定的,無法及時響應第二種方案使用CountDownLatch

package com.lzy.demo.delay;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CountDownLatchPool {

    //countDonw池
    private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
    //延遲隊列
    private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();

    private volatile static boolean flag =false;
    //單線程池
    private final static ExecutorService t = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<>(1));

    public static void addCountDownLatch(Integer messageId) {
        CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) );
        if(countDownLatch == null){
            countDownLatch = countDownLatchMap.get(messageId);
        }
        try {
            addDelayQueue(messageId);
            countDownLatch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("阻塞等待結束~~~~~~");
    }

    public static void removeCountDownLatch(Integer messageId){
        CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
        if(countDownLatch == null)
            return;
        countDownLatch.countDown();
        countDownLatchMap.remove(messageId);
        System.out.println("清除Map數據"+countDownLatchMap);
    }

    private static void addDelayQueue(Integer messageId){
        delayQueue.add(new MessageDelayQueueUtil(messageId));
        clearMessageId();
    }

    private static void clearMessageId(){
        synchronized (CountDownLatchPool.class){
            if(flag){
                return;
            }
            flag = true;
        }
        t.execute(()->{
            while (delayQueue.size() > 0){
                System.out.println("進入線程并開始執行");
                try {
                    MessageDelayQueueUtil take = delayQueue.take();
                    Integer messageId1 = take.getMessageId();
                    removeCountDownLatch(messageId1);
                    System.out.println("清除隊列數據"+messageId1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            flag = false;
            System.out.println("結束end----");
        });
    }

    public static void main(String[] args) throws InterruptedException {
        /*
        測試超時清空map
        new Thread(()->addCountDownLatch(1)).start();
        new Thread(()->addCountDownLatch(2)).start();
        new Thread(()->addCountDownLatch(3)).start();
        */
        //提前創建線程,清空countdown
        new Thread(()->{
            try {
                Thread.sleep(500L);
                removeCountDownLatch(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //開始阻塞
        addCountDownLatch(1);
    	//通過調整上面的sleep我們發現阻塞市場取決于countDownLatch.countDown()執行時間
    	System.out.println("阻塞結束----");
    }
}
class MessageDelayQueueUtil implements Delayed {

    private Integer messageId;
    private long avaibleTime;

    public Integer getMessageId() {
        return messageId;
    }

    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }

    public long getAvaibleTime() {
        return avaibleTime;
    }

    public void setAvaibleTime(long avaibleTime) {
        this.avaibleTime = avaibleTime;
    }

    public MessageDelayQueueUtil(Integer messageId){
        this.messageId = messageId;
        //avaibleTime = 當前時間+ delayTime
        //重試3次,每次3秒+1秒的延遲
        this.avaibleTime=3000*3+1000 + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
    }
}

由于socket并不確定每次都會有數據返回,所以map的數據會越來越大,最終導致內存溢出
需定時清除map內的無效數據。
可以使用DelayedQuene延遲隊列來處理,相當于給對象添加一個過期時間

使用方法 addCountDownLatch 等待消息,異步回調消息清空removeCountDownLatch

看完了這篇文章,相信你對“Java中CountDownLatch異步轉同步工具類的示例分析”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

云南省| 依安县| 大港区| 札达县| 渭源县| 罗江县| 淮滨县| 阜宁县| 米易县| 永吉县| 黎平县| 宁波市| 思南县| 伊宁市| 成安县| 祁门县| 汨罗市| 赫章县| 宝兴县| 同德县| 马龙县| 吉木乃县| 龙胜| 上思县| 县级市| 枞阳县| 辉南县| 玉林市| 伊川县| 胶州市| 宁夏| 林州市| 公主岭市| 万州区| 大连市| 蒲城县| 四平市| 西乌珠穆沁旗| 永寿县| 涞水县| 元氏县|