您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么使用Go建開發高負載WebSocket服務器”,在日常操作中,相信很多人在怎么使用Go建開發高負載WebSocket服務器問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用Go建開發高負載WebSocket服務器”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
實現方式
讓我們看看如何使用Go函數實現服務器的某些部分,而無需任何優化。
在進行net/http ,我們來談談我們如何發送和接收數據。 站在WebSocket協議(例如JSON對象) 之上的數據在下文中將被稱為分組 。
我們開始實現包含通過WebSocket連接發送和接收這些數據包的Channel結構。
channel 結構
// Packet represents application level data. type Packet struct { ... } // Channel wraps user connection. type Channel struct { conn net.Conn // WebSocket connection. send chan Packet // Outgoing packets queue. } func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c }
注意這里有reader和writer連個goroutines。 每個goroutine都需要自己的內存棧, 根據操作系統和Go版本可能具有2到8 KB的初始大小。
在300萬個在線連接的時候,我們將需要24 GB的內存 (堆棧為4 KB)用于維持所有連接。 這還沒有計算為Channel結構分配的內存,傳出的數據包ch.send和其他內部字段消耗的內存。
I/O goroutines
我們來看看“reader”的實現:
func (c *Channel) reader() { // We make a buffered read to reduce read syscalls. buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) } }
這里我們使用bufio.Reader來減少read() syscalls的數量,并讀取與buf緩沖區大小一樣的數量。 在***循環中,我們期待新數據的到來。 請記住: 預計新數據將會來臨。 我們稍后會回來。
我們將離開傳入數據包的解析和處理,因為對我們將要討論的優化不重要。 但是, buf現在值得我們注意:默認情況下,它是4 KB,這意味著我們需要另外12 GB內存。 “writer”有類似的情況:
func (c *Channel) writer() { // We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() } }
我們遍歷c.send ,并將它們寫入緩沖區。細心讀者已經猜到的,我們的300萬個連接還將消耗12 GB的內存。
HTTP
我們已經有一個簡單的Channel實現,現在我們需要一個WebSocket連接才能使用。
注意:如果您不知道WebSocket如何工作。客戶端通過稱為升級的特殊HTTP機制切換到WebSocket協議。 在成功處理升級請求后,服務器和客戶端使用TCP連接來交換二進制WebSocket幀。 這是連接中的框架結構的描述。
import ( "net/http" "some/websocket" ) http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //... })
請注意, http.ResponseWriter為bufio.Reader和bufio.Writer (使用4 KB緩沖區)進行內存分配,用于*http.Request初始化和進一步的響應寫入。
無論使用什么WebSocket庫,在成功響應升級請求后, 服務器在responseWriter.Hijack()調用之后,連同TCP連接一起接收 I/O緩沖區。
提示:在某些情況下, go:linkname 可用于 通過調用 net/http.putBufio{Reader,Writer} 將緩沖區返回到 net/http 內 的 sync.Pool 。
因此,我們需要另外24 GB的內存來維持300萬個鏈接。
所以,我們的程序即使什么都沒做,也需要72G內存。
優化
我們來回顧介紹部分中談到的內容,并記住用戶連接的行為。 切換到WebSocket之后,客戶端發送一個包含相關事件的數據包,換句話說就是訂閱事件。 然后(不考慮諸如ping/pong等技術信息),客戶端可能在整個連接壽***不發送任何其他信息。
連接壽命可能是幾秒到幾天。
所以在最多的時候,我們的Channel.reader()和Channel.writer()正在等待接收或發送數據的處理。 每個都有4 KB的I/O緩沖區。
現在很明顯,某些事情可以做得更好,不是嗎?
Netpoll
你還記得bufio.Reader.Read()內部,Channel.reader()實現了在沒有新數據的時候conn.read()會被鎖。如果連接中有數據,Go運行時“喚醒”我們的goroutine并允許它讀取下一個數據包。 之后,goroutine再次鎖定,期待新的數據。 讓我們看看Go運行時如何理解goroutine必須被“喚醒”。 如果我們看看conn.Read()實現 ,我們將在其中看到net.netFD.Read()調用 :
// net/fd_unix.go func (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //... }
Go在非阻塞模式下使用套接字。 EAGAIN表示,套接字中沒有數據,并且在從空套接字讀取時不會被鎖定,操作系統將控制權返還給我們。
我們從連接文件描述符中看到一個read()系統調用。 如果讀取返回EAGAIN錯誤 ,則運行時會使pollDesc.waitRead()調用 :
// net/fd_poll_runtime.go func (pd *pollDesc) waitRead() error { return pd.wait('r') } func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //... }
如果我們深入挖掘 ,我們將看到netpoll是使用Linux中的epoll和BSD中的kqueue來實現的。 為什么不使用相同的方法來進行連接? 我們可以分配一個讀緩沖區,只有在真正有必要時才使用goroutine:當套接字中有真實可讀的數據時。
在github.com/golang/go上, 導出netpoll函數有問題 。
擺脫goroutines
假設我們有Go的netpoll實現 。 現在我們可以避免使用內部緩沖區啟動Channel.reader() goroutine,并在連接中訂閱可讀數據的事件:
ch := NewChannel(conn) // Make conn to be observed by netpoll instance. poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go Receive(ch) }) // Receive reads a packet from conn and handles it somehow. func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt) }
使用Channel.writer()更容易,因為只有當我們要發送數據包時,我們才能運行goroutine并分配緩沖區:
func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p }
請注意,當操作系統在 write() 系統調用時返回 EAGAIN 時,我們不處理這種情況 。 對于這種情況,我們傾向于Go運行時那樣處理。 如果需要,它可以以相同的方式來處理。
從ch.send (一個或幾個)讀出傳出的數據包后,writer將完成其操作并釋放goroutine棧和發送緩沖區。
***! 通過擺脫兩個連續運行的goroutine中的堆棧和I/O緩沖區,我們節省了48 GB 。
資源控制
大量的連接不僅涉及高內存消耗。 在開發服務器時,我們會經歷重復的競爭條件和死鎖,常常是所謂的自動DDoS,這種情況是當應用程序客戶端肆意嘗試連接到服務器,從而破壞服務器。
例如,如果由于某些原因我們突然無法處理ping/pong消息,但是空閑連接的處理程序會關閉這樣的連接(假設連接斷開,因此沒有提供數據),客戶端會不斷嘗試連接,而不是等待事件。
如果鎖定或超載的服務器剛剛停止接受新連接,并且負載均衡器(例如,nginx)將請求都傳遞給下一個服務器實例,那壓力將是巨大的。
此外,無論服務器負載如何,如果所有客戶端突然想要以任何原因發送數據包(大概是由于錯誤原因),則先前節省的48 GB將再次使用,因為我們將實際恢復到初始狀態goroutine和并對每個連接分配緩沖區。
Goroutine池
我們可以使用goroutine池來限制同時處理的數據包數量。 這是一個go routine池的簡單實現:
package gopool func New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } }
現在我們的netpoll代碼如下:
pool := gopool.New(128) poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { Receive(ch) }) })
所以現在我們讀取數據包可以在池中使用了空閑的goroutine。
同樣,我們將更改Send() :
pool := gopool.New(128) func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p }
而不是go ch.writer() ,我們想寫一個重用的goroutine。 因此,對于N goroutines池,我們可以保證在N請求同時處理并且到達N + 1我們不會分配N + 1緩沖區進行讀取。 goroutine池還允許我們限制新連接的Accept()和Upgrade() ,并避免大多數情況下被DDoS打垮。
零拷貝升級
讓我們從WebSocket協議中偏離一點。 如前所述,客戶端使用HTTP升級請求切換到WebSocket協議。 協議是樣子:
GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket
也就是說,在我們的例子中,我們需要HTTP請求和header才能切換到WebSocket協議。 這個知識點和http.Request的內部實現表明我們可以做優化。我們會在處理HTTP請求時拋棄不必要的內存分配和復制,并放棄標準的net/http服務器。
例如, http.Request 包含一個具有相同名稱的頭文件類型的字段,它通過將數據從連接復制到值字符串而無條件填充所有請求頭。 想像一下這個字段中可以保留多少額外的數據,例如大型Cookie頭。
但是要做什么呢?
WebSocket實現
不幸的是,在我們的服務器優化時存在的所有庫都允許我們對標準的net/http服務器進行升級。 此外,所有庫都不能使用所有上述讀寫優化。 為使這些優化能夠正常工作,我們必須使用一個相當低級別的API來處理WebSocket。 要重用緩沖區,我們需要procotol函數看起來像這樣:
func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error
如果我們有一個這樣的API的庫,我們可以從連接中讀取數據包,如下所示(數據包寫入看起來差不多):
// getReadBuf, putReadBuf are intended to // reuse *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // readPacket must be called when data could be read from conn. func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //... }
簡而言之,現在是制作我們自己庫的時候了。
github.com/gobwas/ws
為了避免將協議操作邏輯強加給用戶,我們編寫了WS庫。 所有讀寫方法都接受標準的io.Reader和io.Writer接口,可以使用或不使用緩沖或任何其他I/O包裝器。
除了來自標準net/http升級請求之外, ws支持零拷貝升級 ,升級請求的處理和切換到WebSocket,而無需內存分配或復制。 ws.Upgrade()接受io.ReadWriter ( net.Conn實現了這個接口)。 換句話說,我們可以使用標準的net.Listen()并將接收到的連接從ln.Accept()立即傳遞給ws.Upgrade() 。 該庫可以復制任何請求數據以供將來在應用程序中使用(例如, Cookie以驗證會話)。
以下是升級請求處理的基準 :標準net/http服務器與net.Listen()加零拷貝升級:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
切換到ws和零拷貝升級節省了另外24 GB內存 - 這是由net/http處理程序請求處理時為I/O緩沖區分配的空間。
概要
讓我們結合代碼告訴你我們做的優化。
讀取內部緩沖區的goroutine是非常昂貴的。 解決方案 :netpoll(epoll,kqueue); 重用緩沖區。
寫入內部緩沖區的goroutine是非常昂貴的。 解決方案 :必要時啟動goroutine; 重用緩沖區。
DDOS,netpoll將無法工作。 解決方案 :重新使用數量限制的goroutines。
net/http不是處理升級到WebSocket的最快方法。 解決方案 :在連接上使用零拷貝升級。
這就是服務器代碼的樣子:
import ( "net" "github.com/gobwas/ws" ) ln, _ := net.Listen("tcp", ":8080") for { // Try to accept incoming connection inside free pool worker. // If there no free workers for 1ms, do not accept anything and try later. // This will help us to prevent many self-ddos or out of resource limit cases. err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // Wrap WebSocket connection with our Channel struct. // This will help us to handle/send our app's packets. ch := NewChannel(conn) // Wait for incoming bytes from connection. poller.Start(conn, netpoll.EventRead, func() { // Do not cross the resource limits. pool.Schedule(func() { // Read and handle incoming packet(s). ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) } }
到此,關于“怎么使用Go建開發高負載WebSocket服務器”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。