您好,登錄后才能下訂單哦!
本篇內容主要講解“什么是go-stash組件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“什么是go-stash組件”吧!
今天來介紹 go-zero
生態的另一個組件 go-stash
。這是一個 logstash
的 Go 語言替代版,我們用 go-stash
相比原先的 logstash
節省了2/3的服務器資源。如果你在用 logstash
,不妨試試,也可以看看基于 go-zero
實現這樣的工具是多么的容易,這個工具作者僅用了兩天時間。
先從它的配置中,我們來看看設計架構。
Clusters: - Input: Kafka: # Kafka 配置 --> 聯動 go-queue Filters: # filter action - Action: drop - Action: remove_field - Action: transfer Output: ElasticSearch: # es 配置 {host, index}
看配置名:kafka
是數據輸出端,es
是數據輸入端,filter
抽象了數據處理過程。
對,整個 go-stash
就是如 config 配置中顯示的,所見即所得。
從 stash.go
的啟動流程大致分為幾個部分。因為可以配置多個 cluster
,那從一個 cluster
分析:
建立與 es
的連接【傳入 es
配置】
構建 filter processors
【es
前置處理器,做數據過濾以及處理,可以設置多個】
完善對 es
中 索引配置,啟動 handle
,同時將 filter
加入handle【處理輸入輸出】
連接下游的 kafka
,將上面創建的 handle
傳入,完成 kafka
和 es
之間的數據消費和數據寫入
在上面架構圖中,中間的 filter
只是從 config 中看到,其實更詳細是 MessageHandler
的一部分,做數據過濾和轉換,下面來說說這塊。
> 以下代碼:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go
type MessageHandler struct { writer *es.Writer indexer *es.Index filters []filter.FilterFunc }
這個就對應上面說的,filter
只是其中一部分,在結構上 MessageHandler
是對接下游 es
,但是沒有看到對 kafka
的操作。
別急,從接口設計上 MessageHandler
實現了 go-queue
中 ConsumeHandler
接口。
這里,上下游就串聯了:
MessageHandler
接管了 es
的操作,負責數據處理到數據寫入
對上實現了 kafka
的 Consume
操作。這樣在消費過程中執行 handler
的操作,從而寫入 es
實際上,Consume()
也是這么處理的:
func (mh *MessageHandler) Consume(_, val string) error { var m map[string]interface{} // 反序列化從 kafka 中的消息 if err := jsoniter.Unmarshal([]byte(val), &m); err != nil { return err } // es 寫入index配置 index := mh.indexer.GetIndex(m) // filter 鏈式處理【因為沒有泛型,整個處理都是 `map進map出`】 for _, proc := range mh.filters { if m = proc(m); m == nil { return nil } } bs, err := jsoniter.Marshal(m) if err != nil { return err } // es 寫入 return mh.writer.Write(index, string(bs)) }
說完了數據處理,以及上下游的連接點。但是數據要從 kafka -> es
,數據流出這個動作從 kafka
角度看,應該是由開發者主動 pull data from kafka
。
那么數據流是怎么動起來?我們回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go
其實 啟動 整個流程中,其實就是一個組合模式:
func main() { // 解析命令行參數,啟動優雅退出 ... // service 組合模式 group := service.NewServiceGroup() defer group.Stop() for _, processor := range c.Clusters { // 連接es ... // filter processors 構建 ... // 準備es的寫入操作 {寫入的index, 寫入器writer} handle := handler.NewHandler(writer, indexer) handle.AddFilters(filters...) handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) // 按照配置啟動kafka,并將消費操作傳入,同時加入組合器 for _, k := range toKqConf(processor.Input.Kafka) { group.Add(kq.MustNewQueue(k, handle)) } } // 啟動這個組合器 group.Start() }
整個數據流,就和這個 group
組合器有關了。
group.Start() |- group.doStart() |- [service.Start() for service in group.services]
那么說明加入 group
的 service
都是實現 Start()
。也就是說 kafka
端的啟動邏輯在 Start()
:
func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() }
啟動 kafka
消費程序
啟動 kafka
消費拉取端【可能會被名字迷惑,實際上是從 kafka
拉取消息到 q.channel
】
消費程序終止,收尾工作
而我們傳入 kafka
中的 handler
,上文說過其實是 Consume
,而這個方法就是在 q.startConsumers()
中執行的:
q.startConsumers() |- [q.consumeOne(key, value) for msg in q.channel] |- q.handler.Consume(key, value)
這樣整個數據流就徹底串起來了:
到此,相信大家對“什么是go-stash組件”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。