中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Go語言之并發示例(Runner)

發布時間:2020-07-08 14:57:43 來源:網絡 閱讀:331 作者:baby神 欄目:編程語言

這篇通過一個例子,演示使用通道來監控程序的執行時間,生命周期,甚至終止程序等。我們這個程序叫runner,我們可以稱之為執行者,它可以在后臺執行任何任務,而且我們還可以控制這個執行者,比如強制終止它等。


現在開始吧,運用我們前面十幾篇連載的知識,來構建我們的Runner,使用一個結構體類型就可以。


//一個執行者,可以執行任何任務,但是這些任務是限制完成的,//該執行者可以通過發送終止信號終止它
type Runner struct {
    tasks []func(int) //要執行的任務
    complete chan error //用于通知任務全部完成
    timeout <-chan time.Time //這些任務在多久內完成
    interrupt chan os.Signal //可以控制強制終止的信號
}


示例中,我們定義了一個結構體類型Runner,這個Runner包含了要執行哪些任務tasks,然后使用complete通知任務是否全部完成,不過這個執行者是有時間限制的,這就是timeout,如果在限定的時間內沒有完成,就會接收到超時的通知,如果完成了就會接收到完成的通知。注意這里的timeout是單向通道,只能接收。


complete定義為error類型的通道,是為了當執行任務出現問題時返回錯誤的原因,如果沒有出現錯誤,返回的是nil


此外,我們還定義了一個中斷的信號,讓我們可以隨時的終止執行者。


有了結構體,我們接著再定義一個工廠函數New,用于返回我們需要的Runner


func New(tm time.Duration) *Runner {
    return &Runner{
        complete:make(chan error),
        timeout:time.After(tm),
        interrupt:make(chan os.Signal,1),
    }
}


這個New函數非常簡潔,可以幫我們很快的初始化一個Runnner,它只有一個參數,用來設置這個執行者的超時時間。這個超時時間被我們傳遞給了time.After函數,這個函數可以在tm時間后,會同伙一個time.Time類型的只能接收的單向通道,來告訴我們已經到時間了。


complete是一個無緩沖通道,也就是同步通道,因為我們要使用它來控制我們整個程序是否終止,所以它必須是同步通道,要讓main goroutine等待,一致要任務完成或者被強制終止。


interrupt是一個有緩沖的通道,這樣做是因為,我們可以至少接收到一個操作系統的中斷信息,這樣Go runtime在發送這個信號的時候不會被阻塞,如果是無緩沖的通道就會阻塞了。


系統信號是什么意思呢,比如我們在程序執行的時候按下Ctrl + C,這就是一個中斷的信號,告訴程序可以強制終止了。


我們這里初始化了結構體的三個字段,而執行的任務tasks沒有初始化,默認就是零nil,因為它是一個切片。但是我們的執行者Runner不能沒有任務啊,既然初始化Runner的時候沒有,那我們就定義一個方法,通過方法給執行者添加需要執行的任務。


//將需要執行的任務,添加到Runner里
func (r *Runner) Add(tasks ...func(int)){
    r.tasks = append(r.tasks,tasks...)
}


這個沒有太多可以說明的,r.tasks就是一個切片,來存儲需要執行的任務。通過內置的append函數就可以追加任務了。這里使用了可變參數,可以靈活的添加一個,甚至同時多個任務,比較方便。


到了這里我們需要的執行者Runner,如何添加任務,如何獲取一個執行者,都有了,下面就開始執行者如何運行任務?如何在運行的時候強制中斷任務?在這些處理之前,我們先來定義兩個我們的兩個錯誤變量,以便在接下來的代碼實例中使用。


var ErrTimeOut = errors.New("執行者執行超時")
var ErrInterrupt = errors.New("執行者被中斷")


兩種錯誤類型,一個表示因為超時錯誤,一個表示因為被中斷錯誤。下面我們就看看如何執行一個個任務。


//執行任務,執行的過程中接收到中斷信號時,返回中斷錯誤//如果任務全部執行完,還沒有接收到中斷信號,則返回nil
func (r *Runner) run() error {
    for id, task := range r.tasks {
        if r.isInterrupt() {
               return ErrInterrupt
        }
        task(id)
     }    
    return nil}//檢查是否接收到了中斷信號
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)        
        return true
    default:       
         return false
    }
}


新增的run方法也很簡單,會使用for循環,不停的運行任務,在運行的每個任務之前,都會檢測是否收到了中斷信號,如果沒有收到,則繼續執行,一直到執行完畢,返回nil;如果收到了中斷信號,則直接返回中斷錯誤類型,任務執行終止。


這里注意isInterrupt函數,它在實現的時候,使用了基于select的多路復用,selectswitch很像,只不過它的每個case都是一個通信操作。那么到底選擇哪個case塊執行呢?原則就是哪個case的通信操作可以執行就執行哪個,如果同時有多個可以執行的case,那么就隨機選擇一個執行。


針對我們方法中,如果r.interrupt中接受不到值,就會執行default語句塊,返回false,一旦r.interrupt中可以接收值,就會通知Go Runtime停止接收中斷信號,然后返回true


這里如果沒有default的話,select是會阻塞的,直到r.interrupt可以接收值為止,因為我們例子中的邏輯要求不能阻塞,所以我們使用了default


好了,基礎工作都做好了,現在開始執行我們所有的任務,并且時刻監視著任務的完成,執行事件的超時。


//開始執行所有任務,并且監視通道事件
func (r *Runner) Start() error {
    //希望接收哪些系統信號
    signal.Notify(r.interrupt, os.Interrupt)    
    go func() {
        r.complete <- r.run()
    }()    

    select {
        case err := <-r.complete:
                return err    
        case <-r.timeout:        
                return ErrTimeOut
    }
}


signal.Notify(r.interrupt, os.Interrupt),這個是表示,如果有系統中斷的信號,發給r.interrupt即可。


任務的執行,這里開啟了一個groutine,然后調用run方法,結果發送給通道r.complete。最后就是使用一個select多路復用,哪個通道可以操作,就返回哪個。


到了這時候,只有兩種情況了,要么任務完成;要么到時間了,任務執行超時。從我們前面的代碼看,任務完成又分兩種情況,一種是沒有執行完,但是收到了中斷信號,中斷了,這時返回中斷錯誤;一種是順利執行完成,這時返回nil


現在把這些代碼匯總一下,容易統一理解一下,所有代碼如下:


package commonimport (
    "errors"
    "os"
    "os/signal"
    "time")
var ErrTimeOut = errors.New("執行者執行超時")
var ErrInterrupt = errors.New("執行者被中斷")

//一個執行者,可以執行任何任務,但是這些任務是限制完成的,//該執行者可以通過發送終止信號終止它
type Runner struct {
    tasks     []func(int)      //要執行的任務
    complete  chan error       //用于通知任務全部完成
    timeout   <-chan time.Time //這些任務在多久內完成
    interrupt chan os.Signal   //可以控制強制終止的信號
}
func New(tm time.Duration) *Runner {
    return &Runner{
        complete:  make(chan error),
        timeout:   time.After(tm),
        interrupt: make(chan os.Signal, 1),
    }
}
//將需要執行的任務,添加到Runner里
func (r *Runner) Add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}
//執行任務,執行的過程中接收到中斷信號時,返回中斷錯誤//如果任務全部執行完,還沒有接收到中斷信號,則返回nil
func (r *Runner) run() error {
    for id, task := range r.tasks {
            if r.isInterrupt() {
                        return ErrInterrupt
        }
        task(id)
     }
     return nil
}
//檢查是否接收到了中斷信號
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)        
        return true
    default:        
        return false
    }}//開始執行所有任務,并且監視通道事件func (r *Runner) Start() error {    
    //希望接收哪些系統信號
    signal.Notify(r.interrupt, os.Interrupt)
    go func() {
        r.complete <- r.run()
    }()    
    select {
        case err := <-r.complete: 
               return err   
        case <-r.timeout:       
               return ErrTimeOut
    }
}


這個common包里的Runner我們已經開發完了,現在我們寫個例子試試它。


package main
import (
    "flysnow.org/hello/common"
    "log"
    "time"
    "os")
func main() {
    log.Println("...開始執行任務...")

    timeout := 3 * time.Second
    r := common.New(timeout)

    r.Add(createTask(), createTask(), createTask())
    if err:=r.Start();err!=nil{
            switch err { 
            case common.ErrTimeOut:
                    log.Println(err)
                    os.Exit(1)        
            case common.ErrInterrupt:
                   log.Println(err)
                   os.Exit(2)
        }
    }
    log.Println("...任務執行結束...")
}
func createTask() func(int) {
    return func(id int) {
        log.Printf("正在執行任務%d", id)
        time.Sleep(time.Duration(id)* time.Second)
    }
}


例子非常簡單,定義任務超時時間為 3 秒,添加 3 個生成的任務,每個任務都是打印一個正在執行哪個任務,然后休眠一段時間。


調用r.Start()開始執行任務,如果一切都正常的話,返回nil,然后打印出...任務執行結束...,不過我們例子中,因為超時時間和任務的設定,結果是執行超時的。


2017/04/15 22:17:55 ...開始執行任務...
2017/04/15 22:17:55 正在執行任務0
2017/04/15 22:17:55 正在執行任務1
2017/04/15 22:17:56 正在執行任務2
2017/04/15 22:17:58 執行者執行超時


如果我們把超時時間改為 4 秒或者更多,就會打印...任務執行結束...。這里我們還可以測試另外一種系統中斷情況,在終端里運行程序后,快速不停地按Ctrl + C,就可以看到執行者被中斷的打印輸出信息了。


到這里,這篇文章已經要收尾了,這個例子中,我們演示使用通道通信、同步等待,監控程序等。


此外這個執行者也是一個很不錯的模式,比如我們寫好之后,交給定時任務去執行即可,比如cron,這個模式我們還可以擴展,更高效率的并發,更多靈活的控制程序的生命周期,更高效的監控等,這個大家自己可以試試,基于自己的需求修改就可以了。


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

郎溪县| 怀来县| 永顺县| 庄浪县| 内丘县| 武乡县| 阿拉善盟| 共和县| 临澧县| 华容县| 红桥区| 岑巩县| 朝阳市| 宁乡县| 武清区| 凤台县| 屯门区| 陵川县| 长泰县| 梧州市| 张掖市| 房产| 涿州市| 邵武市| 扶余县| 兰坪| 碌曲县| 育儿| 精河县| 晴隆县| 临武县| 方正县| 图片| 凌云县| 铜陵市| 晋中市| 泸定县| 虎林市| 阳山县| 平遥县| 西林县|