中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang通道channel使用源碼分析

發布時間:2022-12-05 09:15:02 來源:億速云 閱讀:89 作者:iii 欄目:開發技術

這篇文章主要介紹了Golang通道channel使用源碼分析的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Golang通道channel使用源碼分析文章都會有所收獲,下面我們一起來看看吧。

前言

channel是golang中標志性的概念之一,很好很強大!

channel(通道),顧名思義,是一種通道,一種用于并發環境中數據傳遞的通道。通常結合golang中另一重要概念goroutine(go協程)使用,使得在golang中的并發編程變得清晰簡潔同時又高效強大。

channel基礎結構

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx:    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

hchan結構就是channel的底層數據結構,看源碼定義,可以說是非常清晰了。

  • qcount:channel緩存隊列中已有的元素數量

  • dataqsiz:channel的緩存隊列大小(定義channel時指定的緩存大小,這里channel用的是一個環形隊列)

  • buf:指向channel緩存隊列的指針

  • elemsize:通過channel傳遞的元素大小

  • closed:channel是否關閉的標志

  • elemtype:通過channel傳遞的元素類型

  • sendx:channel中發送元素在隊列中的索引

  • recvx:channel中接受元素在隊列中的索引

  • recvq:等待從channel中接收元素的協程列表

  • sendq:等待向channel中發送元素的協程列表

  • lock:channel上的鎖

其中關于recvq和sendq的兩個列表所用的結構waitq簡單看下。

type waitq struct {
    first *sudog
    last  *sudog
}
type sudog struct {
    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)
...
    c           *hchan // channel
}

可以看出waiq是一個雙向鏈表結構,鏈上的節點是sudog。從sudog的結構定義可以粗略看出,sudog是對g(即協程)的一個封裝。用于記錄一個等待在某個channel上的協程g、等待的元素elem等信息。

channel初始化

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem
    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // Allocate memory in one call.
        // Hchan does not contain pointers interesting for GC in this case:
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

第一部分的3個if是對初始化參數的合法性檢查。

if elem.size >= 1<<16:

檢查channel元素大小,小于2字節

if hchanSize%maxAlign != 0 || elem.align > maxAlign

沒看懂(對齊?)

if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size)

第一個判斷緩存大小需要大于等于0

int64(uintptr(size)) != size這一句實際是用于判斷size是否為負數。由于uintptr實際是一個無符號整形,負數經過轉換后會變成一個與原數完全不同的很大的正整數,而正數經過轉換后并沒有變化。

最后一句判斷channel的緩存大小要小于heap中能分配的大小。_MaxMem是可分配的堆大小。

第二部分是具體的內存分配。

元素類型為kindNoPointers的時候,既非指針類型,則直接分配(hchanSize+uintptr(size)*elem.size)大小的連續空間。c.buf指向hchan后面的elem隊列首地址。

如果channel緩存大小為0,則c.buf實際上是沒有給他分配空間的

如果類型為非kindNoPointers,則channel的空間和buf的空間是分別分配的。

channel發送

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

channel發送,即協程向channel中發送數據,與此操作對應的go代碼如c <- x。

channel發送的實現源碼中,通過chansend1(),調用chansend(),其中block參數為true。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }
... 
}

chansend()首先對c進行判斷, if c == nil:即channel沒有被初始化,這個時候會直接調用gopark使得當前協程進入等待狀態。而且用于喚醒的參數unlockf傳的nil,即沒有人來喚醒它,這樣系統進入死鎖。所以channel必須被初始化之后才能使用,否則死鎖。

接下來是正式的發送處理,且后續操作會加鎖。

lock(&c.lock)

close判斷

if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

如果channel已經是closed狀態,解鎖然后直接panic。也就是說我們不可以向已經關閉的通道內在發送數據。

將數據發給接收協程

if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

嘗試從接收等待協程隊列中取出一個協程,如果有則直接數據發給它。也就是說發送到channel的數據會優先檢查接收等待隊列,如果有協程等待取數,就直接給它。發完解鎖,操作完成。

這里send()方法會將數據寫到從隊列里取出來的sg中,通過goready()喚醒sg.g(即等待的協程),進行后續處理。

數據放到緩存

if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

如果沒有接收協程在等待,則去檢查channel的緩存隊列是否還有空位。如果有空位,則將數據放到緩存隊列中。

通過c.sendx游標找到隊列中的空余位置,然后將數據存進去。移動游標,更新數據,然后解鎖,操作完成。

if c.sendx == c.dataqsiz {
        c.sendx = 0
    }

通過這一段游標的處理可以看出,緩存隊列是一個環形。

阻塞發送協程

gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

如果緩存也慢了,這時候就只能阻塞住發送協程了, 等有合適的機會了,再將數據發送出去。

getg()獲取當前協程對象g的指針,acquireSudog()生成一個sudog,然后將當前協程及相關數據封裝好鏈接到sendq列表中。然年通過goparkunlock()將其轉為等待狀態,并解鎖。操作完成。

channel接收

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

channel接收,即協程從channel中接收數據,與此操作對應的go代碼如<- c。

channel接收的實現源碼中,通過chanrecv1(),調用chanrecv(),其中block參數為true。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }
...
}

同發送一樣,接收也會首先檢查c是否為nil,如果為nil,會調用gopark()休眠當前協程,從而最終造成死鎖。

接收操作同樣先進行加鎖,然后開始正式操作。

close處理

if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

接收和發送略有不同,當channel關閉并且channel的緩存隊列里沒有數據了,那么接收動作會直接結束,但不會報錯。

也就是說,允許從已關閉的channel中接收數據。

從發送等待協程中接收

if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

嘗試從發送等待協程列表中取出一個等待協程,如果存在,則調用recv()方法接收數據。

這里的recv()方法比send()方法稍微復雜一點,我們簡單分析下。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        ...
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        ...
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

recv()的接收動作分為兩種情況:

  • c.dataqsiz == 0:即當channel為無緩存channel時,直接將發送協程中的數據,拷貝給接收者。

  • c.dataqsiz != 0:如果channel有緩存,則:根據緩存的接收游標,從緩存隊列中取出一個,拷貝給接受者。

關于“Golang通道channel使用源碼分析”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Golang通道channel使用源碼分析”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

林口县| 南京市| 安泽县| 南漳县| 上林县| 永川市| 邹城市| 蓬溪县| 南京市| 武定县| 辉县市| 娄烦县| 宁乡县| 深水埗区| 巧家县| 武城县| 墨脱县| 桦川县| 南召县| 麻江县| 庆阳市| 浮山县| 昆山市| 兴城市| 双城市| 北川| 六盘水市| 巴楚县| 开江县| 彭泽县| 湘乡市| 闵行区| 酉阳| 新田县| 连江县| 镶黄旗| 砚山县| 嵊泗县| 喜德县| 平遥县| 沾益县|