中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Netty分布式NioEventLoop任務隊列執行的方法

發布時間:2022-03-25 16:14:22 來源:億速云 閱讀:111 作者:iii 欄目:開發技術

這篇文章主要介紹“Netty分布式NioEventLoop任務隊列執行的方法”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Netty分布式NioEventLoop任務隊列執行的方法”文章能幫助大家解決問題。

執行任務隊列

繼續回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //輪詢io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默認是50
            final int ioRatio = this.ioRatio; 
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //記錄下開始時間
                final long ioStartTime = System.nanoTime();
                try {
                    //處理輪詢到的key(2)
                    processSelectedKeys();
                } finally {
                    //計算耗時
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //執行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代碼省略
    }
}

我們看到處理完輪詢到的key之后, 首先記錄下耗時, 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執行taskQueue中的任務

我們知道ioRatio默認是50, 所以執行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執行時間:

跟進runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務隊列中聚合任務
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務執行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執行每一個任務
    for (;;) {
        safeExecute(task);
        //標記當前跑完的任務
        runTasks ++;
        //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask();
        //直到沒有任務執行
        if (task == null) {
            //記錄下最后執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先會執行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務隊列中聚合任務, 也就是將定時任務中找到可以執行的任務添加到taskQueue中

我們跟進fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務隊列中抓取第一個定時任務
    //尋找截止時間為nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務隊列不為空, 則塞到普通任務隊列里面
    while (scheduledTask != null) {
        //如果添加到普通任務隊列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時任務隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續從定時任務隊列中拉取任務
        //方法執行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表從定時任務初始化到現在過去了多長時間

 Runnable scheduledTask= pollScheduledTask(nanoTime) 代表從定時任務隊列中拿到小于nanoTime時間的任務, 因為小于初始化到現在的時間, 說明該任務需要執行了

跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    //拿到定時任務隊列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    //peek()方法拿到第一個任務
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        //從隊列中刪除
        scheduledTaskQueue.remove();
        //返回該任務
        return scheduledTask;
    }
    return null;
}

我們看到首先獲得當前類綁定的定時任務隊列的成員變量

如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務

如果當前任務小于傳來的時間, 說明該任務需要執行, 則從定時任務隊列中刪除

我們繼續回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務隊列中抓取第一個定時任務
    //尋找截止時間為nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務隊列不為空, 則塞到普通任務隊列里面
    while (scheduledTask != null) {
        //如果添加到普通任務隊列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時任務隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續從定時任務隊列中拉取任務
        //方法執行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

彈出需要執行的定時任務之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定時任務隊列中

如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續添加, 直到沒有需要執行的任務

這樣就將定時任務隊列需要執行的任務添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務隊列中聚合任務
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務執行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執行每一個任務
    for (;;) {
        safeExecute(task);
        //標記當前跑完的任務
        runTasks ++;
        //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask();
        //直到沒有任務執行
        if (task == null) {
            //記錄下最后執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先通過 Runnable task = pollTask() 從taskQueue中拿一個任務

任務不為空, 則通過 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 計算一個截止時間, 任務的執行時間不能超過這個時間

然后在for循環中通過safeExecute(task)執行task

我們跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {
    try {
        //直接調用run()方法執行
        task.run();
    } catch (Throwable t) {
        //發生異常不終止
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

這里直接調用task的run()方法進行執行, 其中發生異常, 只打印一條日志, 代表發生異常不終止, 繼續往下執行

回到runAllTasks(long timeoutNanos)方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務隊列中聚合任務
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務執行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執行每一個任務
    for (;;) {
        safeExecute(task);
        //標記當前跑完的任務
        runTasks ++;
        //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask();
        //直到沒有任務執行
        if (task == null) {
            //記錄下最后執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

每次執行完task, runTasks自增

這里 if ((runTasks & 0x3F) == 0) 代表是否執行了64個任務, 如果執行了64個任務, 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務初始化到現在的時間, 如果這個時間超過了截止時間, 則退出循環

如果沒有超過截止時間, 則通過 task = pollTask() 繼續彈出任務執行

這里執行64個任務統計一次時間, 而不是每次執行任務都統計, 主要原因是因為獲取系統時間是個比較耗時的操作, 這里是netty的一種優化方式

如果沒有task需要執行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執行時間

關于“Netty分布式NioEventLoop任務隊列執行的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乌鲁木齐市| 曲周县| 友谊县| 昌黎县| 登封市| 托克逊县| 文昌市| 富顺县| 溆浦县| 长兴县| 太谷县| 曲水县| 永州市| 武城县| 于都县| 广宗县| 宁南县| 抚顺市| 内江市| 涿州市| 五河县| 田东县| 循化| 罗江县| 铁岭市| 丹棱县| SHOW| 平陆县| 达拉特旗| 松原市| 秀山| 普洱| 松潘县| 南雄市| 东莞市| 红安县| 内黄县| 临江市| 达孜县| 渑池县| 房山区|