您好,登錄后才能下訂單哦!
這篇文章主要介紹“Netty分布式NioEventLoop任務隊列執行的方法”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Netty分布式NioEventLoop任務隊列執行的方法”文章能幫助大家解決問題。
繼續回到NioEventLoop的run()方法:
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時間 final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2) processSelectedKeys(); } finally { //計算耗時 final long ioTime = System.nanoTime() - ioStartTime; //執行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略 } }
我們看到處理完輪詢到的key之后, 首先記錄下耗時, 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執行taskQueue中的任務
我們知道ioRatio默認是50, 所以執行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執行時間:
protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務 fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務 Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務執行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每一個任務 for (;;) { safeExecute(task); //標記當前跑完的任務 runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間 if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務 task = pollTask(); //直到沒有任務執行 if (task == null) { //記錄下最后執行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先會執行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務隊列中聚合任務, 也就是將定時任務中找到可以執行的任務添加到taskQueue中
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務隊列中抓取第一個定時任務 //尋找截止時間為nanoTime的任務 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時任務隊列不為空, 則塞到普通任務隊列里面 while (scheduledTask != null) { //如果添加到普通任務隊列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時任務隊列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續從定時任務隊列中拉取任務 //方法執行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表從定時任務初始化到現在過去了多長時間
Runnable scheduledTask= pollScheduledTask(nanoTime) 代表從定時任務隊列中拿到小于nanoTime時間的任務, 因為小于初始化到現在的時間, 說明該任務需要執行了
跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定時任務隊列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一個任務 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //從隊列中刪除 scheduledTaskQueue.remove(); //返回該任務 return scheduledTask; } return null; }
我們看到首先獲得當前類綁定的定時任務隊列的成員變量
如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務
如果當前任務小于傳來的時間, 說明該任務需要執行, 則從定時任務隊列中刪除
我們繼續回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務隊列中抓取第一個定時任務 //尋找截止時間為nanoTime的任務 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時任務隊列不為空, 則塞到普通任務隊列里面 while (scheduledTask != null) { //如果添加到普通任務隊列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時任務隊列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續從定時任務隊列中拉取任務 //方法執行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
彈出需要執行的定時任務之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定時任務隊列中
如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續添加, 直到沒有需要執行的任務
這樣就將定時任務隊列需要執行的任務添加到了taskQueue中
protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務 fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務 Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務執行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每一個任務 for (;;) { safeExecute(task); //標記當前跑完的任務 runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間 if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務 task = pollTask(); //直到沒有任務執行 if (task == null) { //記錄下最后執行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先通過 Runnable task = pollTask() 從taskQueue中拿一個任務
任務不為空, 則通過 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 計算一個截止時間, 任務的執行時間不能超過這個時間
然后在for循環中通過safeExecute(task)執行task
我們跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) { try { //直接調用run()方法執行 task.run(); } catch (Throwable t) { //發生異常不終止 logger.warn("A task raised an exception. Task: {}", task, t); } }
這里直接調用task的run()方法進行執行, 其中發生異常, 只打印一條日志, 代表發生異常不終止, 繼續往下執行
回到runAllTasks(long timeoutNanos)方法:
protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務 fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務 Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務執行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每一個任務 for (;;) { safeExecute(task); //標記當前跑完的任務 runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間 if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續從普通任務隊列拿任務 task = pollTask(); //直到沒有任務執行 if (task == null) { //記錄下最后執行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
每次執行完task, runTasks自增
這里 if ((runTasks & 0x3F) == 0) 代表是否執行了64個任務, 如果執行了64個任務, 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務初始化到現在的時間, 如果這個時間超過了截止時間, 則退出循環
如果沒有超過截止時間, 則通過 task = pollTask() 繼續彈出任務執行
這里執行64個任務統計一次時間, 而不是每次執行任務都統計, 主要原因是因為獲取系統時間是個比較耗時的操作, 這里是netty的一種優化方式
如果沒有task需要執行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執行時間
關于“Netty分布式NioEventLoop任務隊列執行的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。