中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于環狀隊列和迭代器如何實現分布式任務RR分配策略

發布時間:2021-11-23 23:11:32 來源:億速云 閱讀:164 作者:柒染 欄目:云計算

這期內容當中小編將會給大家帶來有關基于環狀隊列和迭代器如何實現分布式任務RR分配策略,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

# 背景

## 分布式任務分配

在很多運維場景下,我們都會執行一些長時間的任務,比如裝機、部署環境、打包鏡像等長時間任務, 而通常我們的任務節點數量通常是有限的(排除基于k8s的hpa、或者knative等自動伸縮場景)。

那么當我們有一個任務如何根據當前的worker和corrdinator和任務來進行合理的分配,分配其實也比較復雜,往復雜里面做,可以根據當前系統的負載、每個任務的執行資源消耗、當前集群的任務數量等, 這里我們就搞一個最簡單的,基于任務和當前worker的RR算法

## 系統架構

在worker和任務隊列之間,添加一層協調調度層Coordinator, 由它來根據當前集群任務的狀態來進行任務的分配,同時感知當前集群worker和task的狀態,協調整個集群任務的執行、終止等操作

# 單機實現

## 整體設計

members: 表示當前集群中所有的worker

tasks: 就是當前的任務

Coordinator: 就是我們的協調者, 負責根據members和tasks進行任務的分配

result: 就是分配的結果

## CircularIterator

CircularIterator就是我們的環狀對立迭代器, 擁有兩個方法, 一個是add添加member, 一個Next返回基于rr的下一個member

```go

// CircularIterator 環狀迭代器

type CircularIterator struct {

list []interface{}    // 保存所有的成員變量

next int

}

// Next 返回下一個元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

```

## Member&Task

Member就是負責執行任務的worker, 有一個AddTask方法和Execute方法負責任務的執行和添加任務

Task標識一個任務

```go

// Member 任務組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當前memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為member添加任務

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執行任務

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())

}

}

// Task 任務

type Task struct {

name string

}

// Execute 執行task返回結果

func (t *Task) Execute() string {

return "Task " + t.name + " run success"

}

```

## Coordinator

Coordinator是協調器,負責根據 Member和task進行集群任務的協調調度

```go

// Task 任務

type Task struct {

name string

}

// Execute 執行task返回結果

func (t *Task) Execute() string {

return "Task " + t.name + " run success"

}

// Coordinator 協調者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為member分配任務

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當前成員, 構造成員隊列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加member組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

```

## 測試

我們首先創建一堆member和task, 然后調用coordinator進行任務分配,執行任務結果

```go

coordinator := NewCoordinator()

for i := 0; i < 10; i++ {

m := &Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i < 30; i++ {

t := &Task{name: fmt.Sprintf("task %d", i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

```

## 結果

可以看到每個worker均勻的得到任務分配

```bash

Member 6 run task Task task 6 run success

Member 6 run task Task task 16 run success

Member 6 run task Task task 26 run success

Member 8 run task Task task 8 run success

Member 8 run task Task task 18 run success

Member 8 run task Task task 28 run success

Member 0 run task Task task 0 run success

Member 0 run task Task task 10 run success

Member 0 run task Task task 20 run success

Member 3 run task Task task 3 run success

Member 3 run task Task task 13 run success

Member 3 run task Task task 23 run success

Member 4 run task Task task 4 run success

Member 4 run task Task task 14 run success

Member 4 run task Task task 24 run success

Member 7 run task Task task 7 run success

Member 7 run task Task task 17 run success

Member 7 run task Task task 27 run success

Member 9 run task Task task 9 run success

Member 9 run task Task task 19 run success

Member 9 run task Task task 29 run success

Member 1 run task Task task 1 run success

Member 1 run task Task task 11 run success

Member 1 run task Task task 21 run success

Member 2 run task Task task 2 run success

Member 2 run task Task task 12 run success

Member 2 run task Task task 22 run success

Member 5 run task Task task 5 run success

Member 5 run task Task task 15 run success

Member 5 run task Task task 25 run success

```

## 完整代碼

```go

package main

import "fmt"

// CircularIterator 環狀迭代器

type CircularIterator struct {

list []interface{}

next int

}

// Next 返回下一個元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任務

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

// Member 任務組成員

type Member struct {

id    int

tasks []*Task

}

// ID 返回當前memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 為member添加任務

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 執行任務

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())

}

}

// Task 任務

type Task struct {

name string

}

// Execute 執行task返回結果

func (t *Task) Execute() string {

return "Task " + t.name + " run success"

}

// Coordinator 協調者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 為member分配任務

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 構建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通過當前成員, 構造成員隊列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加member組成員

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任務

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

// NewCircularIterftor 返回迭代器

func NewCircularIterftor(list []interface{}) *CircularIterator {

iterator := CircularIterator{}

for _, item := range list {

iterator.Add(item)

}

return &iterator

}

// NewCoordinator 返回協調器

func NewCoordinator() *Coordinator {

c := Coordinator{}

return &c

}

func main() {

coordinator := NewCoordinator()

for i := 0; i < 10; i++ {

m := &Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i < 30; i++ {

t := &Task{name: fmt.Sprintf("task %d", i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

}

```

任務協調是一個非常復雜的事情, 內部的任務平臺,雖然實現了基于任務的組合和app化,但是任務調度分配著一塊,仍然沒有去做,只是簡單的根據樹形任務去簡單的做一些分支任務的執行,未來有時間再做吧,要繼續研究下一個模塊了。

上述就是小編為大家分享的基于環狀隊列和迭代器如何實現分布式任務RR分配策略了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

罗源县| 阳高县| 凌源市| 五指山市| 扎赉特旗| 阿拉善右旗| 石河子市| 屏东县| 五华县| 湛江市| 莲花县| 铁岭市| 黔东| 钟山县| 景东| 禹州市| 英吉沙县| 界首市| 喀什市| 晋江市| 南郑县| 西城区| 巢湖市| 宜都市| 澄迈县| 阿拉善右旗| 响水县| 浦北县| 兴义市| 崇左市| 株洲市| 金溪县| 义乌市| 闽侯县| 离岛区| 陇南市| 阳东县| 泰和县| 原平市| 新和县| 东台市|