您好,登錄后才能下訂單哦!
這篇“Golang分布式應用定時任務如何實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Golang分布式應用定時任務如何實現”文章吧。
最小堆是一種特殊的完全二叉樹,任意非葉子節點的值不大于其子節點,如圖
通過最小堆,根據任務最近執行時間鍵堆,每次取堆頂元素即最近需要執行的任務,設置timer定時器,到期后觸發任務執行。由于堆的特性每次調整的時間復雜度為O(lgN),相較于普通隊列性能更快。
在container/heap
中已經實現操作堆的相關函數,我們只需要實現定期任務核心邏輯即可。
// 運行 func (c *Cron) Run() error { // 設置cron已啟動,atomic.Bool來保證并發安全 c.started.Store(true) // 主循環 for { // 如果停止則退出 if !c.started.Load() { break } c.runTask() } return nil } // 核心邏輯 func (c *Cron) runTask() { now := time.Now() duration := infTime // 獲取堆頂元素 task, ok := c.tasks.Peek() if ok { // 如果已刪除則彈出 if !c.set.Has(task.Name()) { c.tasks.Pop() return } // 計算于當前時間查找,設置定時器 if task.next.After(now) { duration = task.next.Sub(now) } else { duration = 0 } } timer := time.NewTimer(duration) defer timer.Stop() // 當有新元素插入直接返回,防止新元素執行時間小于當前堆頂元素 select { case <-c.new: return case <-timer.C: } // 彈出任務,執行 go task.Exec() // 計算下次執行時間,如果為0說明任務已結束,否則重新入堆 task.next = task.Next(time.Now()) if task.next.IsZero() { c.set.Delete(task.Name()) } else { c.tasks.Push(task) } }
主要邏輯可總結為:
將任務按照下次執行時間建最小堆
每次取堆頂任務,設置定時器
如果中間有新加入任務,轉入步驟2
定時器到期后執行任務
再次取下個任務,轉入步驟2,依次執行
另一種實現Cron的方式是時間輪,時間輪通過一個環形隊列,每個插槽放入需要到期執行的任務,按照固定間隔轉動時間輪,取插槽中任務列表執行,如圖所示:
時間輪可看作一個表盤,如圖中時間間隔為1秒,總共60個格子,如果任務在3秒后執行則放為插槽3,每秒轉動次取插槽上所有任務執行。
如果執行時間超過最大插槽,比如有個任務需要63秒后執行(超過了最大格子刻度),一般可以通過多層時間輪,或者設置一個額外變量圈數,只執行圈數為0的任務。
時間輪插入的時間復雜度為O(1),獲取任務列表復雜度為O(1),執行列表最差為O(n)。對比最小堆,時間輪插入刪除元素更快。
核心代碼如下:
// 定義 type TimeWheel struct { interval time.Duration // 觸發間隔 slots int // 總插槽數 currentSlot int // 當前插槽數 tasks []*list.List // 環形列表,每個元素為對應插槽的任務列表 set containerx.Set[string] // 記錄所有任務key值,用來檢查任務是否被刪除 tricker *time.Ticker // 定時觸發器 logger logr.Logger } func (tw *TimeWheel) Run() error { tw.tricker = time.NewTicker(tw.interval) for { // 通過定時器模擬時間輪轉動 now, ok := <-tw.tricker.C if !ok { break } // 轉動一次,執行任務列表 tw.RunTask(now, tw.currentSlot) tw.currentSlot = (tw.currentSlot + 1) % tw.slots } return nil } func (tw *TimeWheel) RunTask(now time.Time, slot int) { // 一次執行任務列表 for item := taskList.Front(); item != nil; { task, ok := item.Value.(*TimeWheelTask) // 任務圈數大于0,不需要執行,將圈數減一 if task.circle > 0 { task.circle-- item = item.Next() continue } // 運行任務 go task.Exec() // 計算任務下次運行時間 next := item.Next() taskList.Remove(item) item = next task.next = task.Next(now) if !task.next.IsZero() { tw.add(now, task) } else { tw.Remove(task.Name()) } } } // 添加任務,計算下一次任務執行的插槽與圈數 func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) { if !task.initialized { task.next = task.Next(now) task.initialized = true } duration := task.next.Sub(now) if duration <= 0 { task.slot = tw.currentSlot + 1 task.circle = 0 } else { mult := int(duration / tw.interval) task.slot = (tw.currentSlot + mult) % tw.slots task.circle = mult / tw.slots } tw.tasks[task.slot].PushBack(task) tw.set.Insert(task.Name()) }
時間輪的主要邏輯如下:
將任務存在對應插槽的時間
通過定時間模擬時間輪轉動
每次到期后遍歷當前插槽的任務列表,若任務圈數為0則執行
如果任務未結束,計算下次執行的插槽與圈數
轉入步驟2,依次執行
以上就是關于“Golang分布式應用定時任務如何實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。