您好,登錄后才能下訂單哦!
這篇“go zero微服務性能優化極致秒殺實例分析”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“go zero微服務性能優化極致秒殺實例分析”文章吧。
在SeckillOrder這個方法中,每來一次秒殺搶購請求都往往Kafka中發送一條消息。假如這個時候有一千萬的用戶同時來搶購,就算我們做了各種限流策略,一瞬間還是可能會有上百萬的消息會發到Kafka,會產生大量的網絡IO和磁盤IO成本,大家都知道Kafka是基于日志的消息系統,寫消息雖然大多情況下都是順序IO,但當海量的消息同時寫入的時候還是可能會扛不住。
那怎么解決這個問題呢?答案是做消息的聚合。之前發送一條消息就會產生一次網絡IO和一次磁盤IO,我們做消息聚合后,比如聚合100條消息后再發送給Kafka,這個時候100條消息才會產生一次網絡IO和磁盤IO,對整個Kafka的吞吐和性能是一個非常大的提升。其實這就是一種小包聚合的思想,或者叫Batch或者批量的思想。這種思想也隨處可見,比如我們使用Mysql插入批量數據的時候,可以通過一條SQL語句執行而不是循環的一條一條插入,還有Redis的Pipeline操作等等。
那怎么來聚合呢,聚合策略是啥呢?聚合策略有兩個維度分別是聚合消息條數和聚合時間,比如聚合消息達到100條我們就往Kafka發送一次,這個條數是可以配置的,那如果一直也達不到100條消息怎么辦呢?通過聚合時間來兜底,這個聚合時間也是可以配置的,比如配置聚合時間為1秒鐘,也就是無論目前聚合了多少條消息只要聚合時間達到1秒,那么就往Kafka發送一次數據。聚合條數和聚合時間是或的關系,也就是只要有一個條件滿足就觸發。
在這里我們提供一個批量聚合數據的工具Batcher,定義如下
type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup }
Do方法:滿足聚合條件后就會執行Do方法,其中val參數為聚合后的數據
Sharding方法:通過Key進行sharding,相同的key消息寫入到同一個channel中,被同一個goroutine處理
在merge方法中有兩個觸發執行Do方法的條件,一是當聚合的數據條數大于等于設置的條數,二是當觸發設置的定時器
代碼實現比較簡單,如下為具體實現:
type msg struct { key string val interface{} } type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup } func New(opts ...Option) *Batcher { b := &Batcher{} for _, opt := range opts { opt.apply(&b.opts) } b.opts.check() b.chans = make([]chan *msg, b.opts.worker) for i := 0; i < b.opts.worker; i++ { b.chans[i] = make(chan *msg, b.opts.buffer) } return b } func (b *Batcher) Start() { if b.Do == nil { log.Fatal("Batcher: Do func is nil") } if b.Sharding == nil { log.Fatal("Batcher: Sharding func is nil") } b.wait.Add(len(b.chans)) for i, ch := range b.chans { go b.merge(i, ch) } } func (b *Batcher) Add(key string, val interface{}) error { ch, msg := b.add(key, val) select { case ch <- msg: default: return ErrFull } return nil } func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) { sharding := b.Sharding(key) % b.opts.worker ch := b.chans[sharding] msg := &msg{key: key, val: val} return ch, msg } func (b *Batcher) merge(idx int, ch <-chan *msg) { defer b.wait.Done() var ( msg *msg count int closed bool lastTicker = true interval = b.opts.interval vals = make(map[string][]interface{}, b.opts.size) ) if idx > 0 { interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker))) } ticker := time.NewTicker(interval) for { select { case msg = <-ch: if msg == nil { closed = true break } count++ vals[msg.key] = append(vals[msg.key], msg.val) if count >= b.opts.size { break } continue case <-ticker.C: if lastTicker { ticker.Stop() ticker = time.NewTicker(b.opts.interval) lastTicker = false } } if len(vals) > 0 { ctx := context.Background() b.Do(ctx, vals) vals = make(map[string][]interface{}, b.opts.size) count = 0 } if closed { ticker.Stop() return } } } func (b *Batcher) Close() { for _, ch := range b.chans { ch <- nil } b.wait.Wait() }
使用的時候需要先創建一個Batcher,然后定義Batcher的Sharding方法和Do方法,在Sharding方法中通過ProductID把不同商品的聚合投遞到不同的goroutine中處理,在Do方法中我們把聚合的數據一次性批量的發送到Kafka,定義如下:
b := batcher.New( batcher.WithSize(batcherSize), batcher.WithBuffer(batcherBuffer), batcher.WithWorker(batcherWorker), batcher.WithInterval(batcherInterval), ) b.Sharding = func(key string) int { pid, _ := strconv.ParseInt(key, 10, 64) return int(pid) % batcherWorker } b.Do = func(ctx context.Context, val map[string][]interface{}) { var msgs []*KafkaData for _, vs := range val { for _, v := range vs { msgs = append(msgs, v.(*KafkaData)) } } kd, err := json.Marshal(msgs) if err != nil { logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err) } if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil { logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err) } } s.batcher = b s.batcher.Start()
在SeckillOrder方法中不再是每來一次請求就往Kafka中投遞一次消息,而是先通過batcher提供的Add方法添加到Batcher中等待滿足聚合條件后再往Kafka中投遞。
err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId}) if err!= nil { logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err) }
通過批量消息處理的思想,我們提供了Batcher工具,提升了性能,但這主要是針對生產端而言的。當我們消費到批量的數據后,還是需要串行的一條條的處理數據,那有沒有辦法能加速消費從而降低消費消息的延遲呢?有兩種方案分別是:
增加消費者的數量
在一個消費者中增加消息處理的并行度
因為在Kafka中,一個Topci可以配置多個Partition,數據會被平均或者按照生產者指定的方式寫入到多個分區中,那么在消費的時候,Kafka約定一個分區只能被一個消費者消費,為什么要這么設計呢?我理解的是如果有多個Consumer同時消費一個分區的數據,那么在操作這個消費進度的時候就需要加鎖,對性能影響比較大。所以說當消費者數量小于分區數量的時候,我們可以增加消費者的數量來增加消息處理能力,但當消費者數量大于分區的時候再繼續增加消費者數量就沒有意義了。
不能增加Consumer的時候,可以在同一個Consumer中提升處理消息的并行度,即通過多個goroutine來并行的消費數據,我們一起來看看如何通過多個goroutine來消費消息。
在Service中定義msgsChan,msgsChan為Slice,Slice的長度表示有多少個goroutine并行的處理數據,初始化如下:
func NewService(c config.Config) *Service { s := &Service{ c: c, ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)), OrderRPC: order.NewOrder(zrpc.MustNewClient(c.OrderRPC)), msgsChan: make([]chan *KafkaData, chanCount), } for i := 0; i < chanCount; i++ { ch := make(chan *KafkaData, bufferCount) s.msgsChan[i] = ch s.waiter.Add(1) go s.consume(ch) } return s }
從Kafka中消費到數據后,把數據投遞到Channel中,注意投遞消息的時候按照商品的id做Sharding,這能保證在同一個Consumer中對同一個商品的處理是串行的,串行的數據處理不會導致并發帶來的數據競爭問題
func (s *Service) Consume(_ string, value string) error { logx.Infof("Consume value: %s\n", value) var data []*KafkaData if err := json.Unmarshal([]byte(value), &data); err != nil { return err } for _, d := range data { s.msgsChan[d.Pid%chanCount] <- d } return nil }
我們定義了chanCount個goroutine同時處理數據,每個channel的長度定義為bufferCount,并行處理數據的方法為consume,如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err) return } if p.Stock <= 0 { logx.Errorf("stock is zero pid: %d", m.Pid) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } } }
當秒殺活動開始后,大量用戶點擊商品詳情頁上的秒殺按鈕,會產生大量的并發請求查詢庫存,一旦某個請求查詢到有庫存,緊接著系統就會進行庫存的扣減。然后,系統生成實際的訂單,并進行后續的處理。如果請求查不到庫存,就會返回,用戶通常會繼續點擊秒殺按鈕,繼續查詢庫存。簡單來說,這個階段的操作就是三個:檢查庫存,庫存扣減、和訂單處理。因為每個秒殺請求都會查詢庫存,而請求只有查到庫存有余量后,后續的庫存扣減和訂單處理才會被執行,所以,這個階段中最大的并發壓力都在庫存檢查操作上。
為了支撐大量高并發的庫存檢查請求,我們需要使用Redis單獨保存庫存量。那么,庫存扣減和訂單處理是否都可以交給Mysql來處理呢?其實,訂單的處理是可以在數據庫中執行的,但庫存扣減操作不能交給Mysql直接處理。因為到了實際的訂單處理環節,請求的壓力已經不大了,數據庫完全可以支撐這些訂單處理請求。那為什么庫存扣減不能直接在數據庫中執行呢?這是因為,一旦請求查到有庫存,就意味著該請求獲得購買資格,緊接著就會進行下單操作,同時庫存量會減一,這個時候如果直接操作數據庫來扣減庫存可能就會導致超賣問題。
直接操作數據庫扣減庫存為什么會導致超賣呢?由于數據庫的處理速度較慢,不能及時更新庫存余量,這就會導致大量的查詢庫存的請求讀取到舊的庫存值,并進行下單,此時就會出現下單數量大于實際的庫存量,導致超賣。所以,就需要直接在Redis中進行庫存扣減,具體的操作是,當庫存檢查完后,一旦庫存有余量,我們就立即在Redis中扣減庫存,同時,為了避免請求查詢到舊的庫存值,庫存檢查和庫存扣減這兩個操作需要保證原子性。
我們使用Redis的Hash來存儲庫存,total為總庫存,seckill為已秒殺的數量,為了保證查詢庫存和減庫存的原子性,我們使用Lua腳本進行原子操作,讓秒殺量小于庫存的時候返回1,表示秒殺成功,否則返回0,表示秒殺失敗,代碼如下:
const ( luaCheckAndUpdateScript = ` local counts = redis.call("HMGET", KEYS[1], "total", "seckill") local total = tonumber(counts[1]) local seckill = tonumber(counts[2]) if seckill + 1 <= total then redis.call("HINCRBY", KEYS[1], "seckill", 1) return 1 end return 0 ` ) func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) { val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)}) if err != nil { return nil, err } if val.(int64) == 0 { return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId)) } return &product.CheckAndUpdateStockResponse{}, nil } func stockKey(pid int64) string { return fmt.Sprintf("stock:%d", pid) }
對應的seckill-rmq代碼修改如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } } }
到這里,我們已經了解了如何使用原子性的Lua腳本來實現庫存的檢查和扣減。其實要想保證庫存檢查和扣減的原子性,還有另外一種方法,那就是使用分布式鎖。
為了簡化分布式鎖、分布式選舉、分布式事務的實現,etcd社區提供了一個名為concurrency的包來幫助我們更簡單、正確的使用分布式鎖。它的實現非常簡單,主要流程如下:
首先通過concurrency.NewSession方法創建Session,本質上是創建了一個TTL為10的Lease
得到Session對象后,通過concurrency.NewMutex創建一個mutex對象,包括了Lease、key prefix等信息
然后聽過mutex對象的Lock方法嘗試獲取鎖
最后通過mutex對象的Unlock方法釋放鎖
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() session, err := concurrency.NewSession(cli, concurrency.WithTTL(10)) if err != nil { log.Fatal(err) } defer session.Close() mux := concurrency.NewMutex(session, "lock") if err := mux.Lock(context.Background()); err != nil { log.Fatal(err) } if err := mux.Unlock(context.Background()); err != nil { log.Fatal(err) }
以上就是關于“go zero微服務性能優化極致秒殺實例分析”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。