中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

HashedWheelTimer的作用是什么

發布時間:2021-06-24 09:58:14 來源:億速云 閱讀:195 作者:chen 欄目:大數據

這篇文章主要講解了“HashedWheelTimer的作用是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“HashedWheelTimer的作用是什么”吧!

和同事討論一個定時審核的需求,運營設定審核通過的時間,到了這個時間之后,相關內容自動審核通過,本是個小的需求,但是考慮到如果需要定時審核的東西很多,這樣大量的定時任務帶來的一系列問題,然后逐步的討論到了netty的HashedWheelTimer這個的實現。

方案一 單定時器方案

描述:

把所有需要定時審核的資源放到redis中,例如sorted set中,需要審核通過的時間作為score值。后臺啟動一個定時器,定時輪詢sortedSet,當score值小于當前時間,則運行任務審核通過。

問題

這個方案在小批量數據的情況下沒有問題,但是在大批量任務的情況下就會出現問題了,因為每次都要輪詢全量的數據,逐個判斷是否需要執行,一旦輪詢任務執行比較長,就會出現任務無法按照定時的時間執行的問題。

方案二 多定時器方案

描述

每個需要定時完成的任務都啟動一個定時任務,然后等待完成之后銷毀

問題

這個方案帶來的問題很明顯,定時任務比較多的情況下,會啟動很多的線程,這樣服務器會承受不了之后崩潰。基本上不會采取這個方案。

方案三 借用redis的過期通知功能

描述

和方案一類似,針對每一個需要定時審核的任務,設定過期時間,過期時間也就是審核通過的時間,訂閱redis的過期事件,當這個事件發生時,執行相應的審核通過任務。

問題

這個方案來說是借用了redis這種中間件來實現我們的功能,這中實際上屬于redis的發布訂閱功能中的一部分,針對redis發布訂閱功能是不推薦我們在生產環境中做業務操作的,通常redis內部(例如redis集群節點上下線,選舉等等來使用),我們業務系統使用它的這個事件會產生如下兩個問題 1、redis發布訂閱的不穩定問題 2、redid發布訂閱的可靠性問題 具體可以參考 https://my.oschina.net/u/2457218/blog/3065021 (redis的發布訂閱缺陷)

方案四 Hash分層記時輪算法

也許你和我一樣都是第一次聽說這個東西,這個東西就是專為大批量定時任務管理而生。具體論文詳見參考文獻[2]

算法概要

簡要的說這個是一個輪,里面有指針,指針會根據設定的時間單位旋轉,任務根據一些算法會落在相應的槽位上。如下圖 HashedWheelTimer的作用是什么

首先會有一個輪,這個輪在這里分成了8個槽位,任務任務添加的時候會根據相應的算法對槽位個數取模,得到任務會存儲在具體哪個槽位,每個槽位是一個鏈表結構,任務存儲了任務的過期時間(任務執行時間),任務需要執行需要指針轉的輪數,指針(tick) 每間隔一個單位的時間會往下走一個槽位,然后會查詢這個槽位上的存儲的任務,并且任務的存儲的剩余輪數會減一當剩余輪數小于等于零時,就會開始執行這個任務,執行之后會把任務從這個槽位上給刪除掉。

例如上圖: 槽位為8個槽位 Bucket 指針每個時間間隔(100ms)會往下走一個槽位,這個時間間隔叫做tickDuration 那相當于每隔8*100ms=800ms,會輪詢一圈。

HashedWheelTimer

算法理解起來比較簡單,并且也有成熟的實現,那就是在netty中有一個HashedWheelTimer這個類,把這個算法實現了出來。接下來分析分析一下它的這個代碼。

初始化

在這個類上定義的有幾個比較重要的屬性

  /**
     *這個work是一個內部類,實現了Runable接口,是比較核心的一個類,包裝了具體任務的運行,把任務放到具體如何放到某個槽位上,指針往下走的具體方法,任務取消等。 
     */
    private final Worker worker = new Worker();
    /**
     *工作線程,這個就是整個HashedWheelTimer啟動的起點 
     */
    private final Thread workerThread;

    /**
     *當前任務的狀態,1代表任務已經開始執行,0任務初始化,2任務已關閉 
     */
    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;

    /**
     *這個很核心的一個概念,就是指針往下走的單位,在HashedWheelTimer這個類中,默認是100ms指針往下走一個單位 
     */
    private final long tickDuration;
    /**
     * 這個就是指的時間輪,有多少個槽位,wheel的大小就是多大,HashedWheelTimer中默認槽位有512個
     */
    private final HashedWheelBucket[] wheel;
    /**
     * 主要輔助計算任務會存儲在哪個槽位上,mask =wheel.length-1 
     */
    private final int mask;

    /**
     *所有要執行的任務的任務隊列 
     */
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    /**
     *所有要取消的任務的任務隊列 
     */
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
    /**
     *HashedWheelTimer實例開始運行的時間,是納秒數,開始時間是System.nanotime() 
     */
    private volatile long startTime;

這些屬性的定義和概念映射到上面時間輪算法上就是下圖的樣子了。 HashedWheelTimer的作用是什么

HashedWheelTimer初始化主要是在它的構造函數中,提供了多種重載方式,只需要看最全的構造函數即可。

/**
     * Creates a new timer.
     * @param threadFactory        執行任務的工廠
     * @param tickDuration         指針往下走一步的時間間隔
     * @param unit                 指針往下走一步的時間單位,秒,毫秒。納秒等
     * @param ticksPerWheel        時間輪的大小,也就是槽位的個數
     */
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        /**
         * 先校驗參數的合法性,對threadFactory,時間單位,時間間隔,時間輪大小做了限制
         */
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 創建槽位,實際上就是初始化HashedWheelBucket數組,直接new出來的
        wheel = createWheel(ticksPerWheel);
        //用來計算槽位的輔助變量,一會兒會在Worker中尋找槽位時使用到
        mask = wheel.length - 1;
        ...
        //初始化線程,是用threadFactory創建出來的一個worker線程
        workerThread = threadFactory.newThread(worker);

      ...

    }

任務添加和運行

當需要添加一個定時任務的時候,是通過newTimeout方法添加的,添加的任務必須實現TimerTask接口的run方法。任務添加之后,無需顯式的開啟任務,添加之后任務會自動開啟,等到了執行的時間會被自動執行。客戶端使用的方式如下:

@Test
    public void testRun() throws Exception{
        final CountDownLatch latch = new CountDownLatch(1);

        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("hello world");
            latch.countDown();
        }, 5, TimeUnit.SECONDS);

        latch.await();
        System.out.println("執行結束");

    }

5秒鐘之后會被輸出"hello world",然后任務執行完畢。既然任務的添加和執行入口都是通過newTimeout這個方法搞定的,那就看一下這個方法里面有哪些秘密吧。

 @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ...
        start();
        ...
        /**
         * 可以看到任務存活的時間計算,當前時間的毫秒數加上我們設定的時間,然后減去程序開始執行的時間。這是一個時間段
         */
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

進去看了之后,這個方法很簡單,有兩個關鍵的方法調用 1、start(),這個方法主要是看當前HashedWheelTimer的狀態是否已經啟動,如果沒有啟動則會調用workThread線程的啟動方法。2、計算超時時間和任務添加。我們傳進來的任務會被包裝成一個HashWheelTimeout這個類,包裝之后會把這個包裝類放到timeouts這個阻塞隊列中去,實際上這時候任務并沒有放到某個具體槽位中,只是先放到阻塞隊列中,等待work從這個隊列中取值然后放到具體的槽位上,HashedWheelTimer是一個雙向鏈表,上面圖中已經有這個類的類圖結構,再貼一次: HashedWheelTimer的作用是什么

我們傳進來的任務就是它的task屬性,然后會根據當前時間、過期時間和任務開始時間計算出它的deadline,同事計算出它剩余的輪數(remainingRounds)。
任務執行實際上是調用的它的expire方法。當expire的時候會調用具體的業務任務的run方法。

HashedWheelTimer的expire方法是什么時候被執行的呢。上面也也說到在HashedWheelTimer中有一個workThread,這里面會運行work。能讀到這個地方來的人應該很少了吧,不過能到這個地方你是幸運的,因為work這個類也就是實現這個算法中最核心的一個類了,先來概覽一下這個類 HashedWheelTimer的作用是什么

這個類實現了Runable接口,也就說是一個線程類,然后它會被workTread調用執行啟動。

  • transferTimeoutsToBuckets 把新加入的定時任務從阻塞隊列中取出然后放到相應的bucket中

  • processCancelledTasks 把取消的定時任務從阻塞隊列中取出,然后從相應的bucket中remove掉

  • waitForNextTick 指針往下走的方法,經過一個時間單位,指針會往下走,指向下一個bucket

run方法會一直循環從阻塞隊列中取值,然后放到bucket中,指針循環往下走,對remainderRounds對于0的任務進行執行,不是0的減一

do {
                /**
                 * 里面是一個Thread.sleep操作,模擬指針一步一步往下走的操作。
                 */
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    /**
                     * 計算任務將要落到槽位,這本應該是個取模運算,不過這里用了一個小技巧,就是把取模運算換為了“按位與”,因為“按位與”要比取模運算快的多,
                     * 這個技巧就是當mast的值為2的n次方-1時,能達到取模的效果。這里要感謝一下王洪濤的分享
                     */
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    //取到具體bucket,然后把任務放從阻塞隊列中拿到,放到bucket中
                    HashedWheelBucket bucket =
                            wheel[idx];

                    transferTimeoutsToBuckets();
                    //這里面會調用所有HashedWheelTimeout的方法,就是看他的剩余的輪數是不是大于0,如果是的話則會被執行,不是的話剩余輪數減1
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

后繼

當然HashedWheelTimer這個類屬于全內存任務計算,通常在我們真正的業務中,是不會把這些任務直接放到jvm內存中的,要不然重啟之后任務不都會消失了么,這樣我們需要重寫HashedWheelTimer,只需要對它任務的添加和獲取進行重寫到相應的持久化中間件中即可(例如數據庫或者es等等)

參考和引用

[1][redis的發布訂閱缺陷]

[[2]][Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil] [Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil]: http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf "Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil"

[[3]][Hashed and Hierarchical Timing Wheels] [Hashed and Hierarchical Timing Wheels]: http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt "Hashed and Hierarchical Timing Wheels"

感謝各位的閱讀,以上就是“HashedWheelTimer的作用是什么”的內容了,經過本文的學習后,相信大家對HashedWheelTimer的作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

日喀则市| 金昌市| 图木舒克市| 鲁山县| 昂仁县| 翼城县| 东辽县| 南平市| 天津市| 玛曲县| 神木县| 吉林省| 阳朔县| 泊头市| 扶绥县| 清徐县| 郯城县| 贵阳市| 西乡县| 乐都县| 达拉特旗| 桦甸市| 重庆市| 寿阳县| 泰顺县| 敦化市| 南岸区| 遵义县| 雷州市| 吉林市| 保亭| 祁东县| 逊克县| 汉沽区| 永昌县| 赤水市| 阳西县| 深水埗区| 德庆县| 杭锦后旗| 汕尾市|