中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

什么是go-stash組件

發布時間:2021-10-14 20:54:48 來源:億速云 閱讀:356 作者:iii 欄目:編程語言

本篇內容主要講解“什么是go-stash組件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“什么是go-stash組件”吧!

前言

今天來介紹 go-zero 生態的另一個組件 go-stash。這是一個 logstash 的 Go 語言替代版,我們用 go-stash 相比原先的 logstash 節省了2/3的服務器資源。如果你在用 logstash,不妨試試,也可以看看基于 go-zero 實現這樣的工具是多么的容易,這個工具作者僅用了兩天時間。

整體架構

先從它的配置中,我們來看看設計架構。

Clusters:
  - Input:
      Kafka:
        # Kafka 配置 --> 聯動 go-queue
    Filters:
    	# filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es 配置 {host, index}

看配置名:kafka 是數據輸出端,es 是數據輸入端,filter 抽象了數據處理過程。

對,整個 go-stash 就是如 config 配置中顯示的,所見即所得。

什么是go-stash組件

啟動

stash.go 的啟動流程大致分為幾個部分。因為可以配置多個 cluster,那從一個 cluster 分析:

  1. 建立與 es 的連接【傳入 es 配置】

  2. 構建 filter processorses 前置處理器,做數據過濾以及處理,可以設置多個】

  3. 完善對 es 中 索引配置,啟動 handle ,同時將 filter 加入handle【處理輸入輸出】

  4. 連接下游的 kafka,將上面創建的 handle 傳入,完成 kafkaes 之間的數據消費和數據寫入

MessageHandler

在上面架構圖中,中間的 filter 只是從 config 中看到,其實更詳細是 MessageHandler 的一部分,做數據過濾和轉換,下面來說說這塊。

> 以下代碼:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go

type MessageHandler struct {
	writer  *es.Writer
	indexer *es.Index
	filters []filter.FilterFunc
}

這個就對應上面說的,filter 只是其中一部分,在結構上 MessageHandler 是對接下游 es ,但是沒有看到對 kafka 的操作。

別急,從接口設計上 MessageHandler 實現了 go-queueConsumeHandler 接口。

這里,上下游就串聯了:

  1. MessageHandler 接管了 es 的操作,負責數據處理到數據寫入

  2. 對上實現了 kafkaConsume 操作。這樣在消費過程中執行 handler 的操作,從而寫入 es

實際上,Consume() 也是這么處理的:

func (mh *MessageHandler) Consume(_, val string) error {
	var m map[string]interface{}
  // 反序列化從 kafka 中的消息
	if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
		return err
	}
	// es 寫入index配置
	index := mh.indexer.GetIndex(m)
  // filter 鏈式處理【因為沒有泛型,整個處理都是 `map進map出`】
	for _, proc := range mh.filters {
		if m = proc(m); m == nil {
			return nil
		}
	}
	bs, err := jsoniter.Marshal(m)
	if err != nil {
		return err
	}
	// es 寫入
	return mh.writer.Write(index, string(bs))
}

數據流

說完了數據處理,以及上下游的連接點。但是數據要從 kafka -> es ,數據流出這個動作從 kafka 角度看,應該是由開發者主動 pull data from kafka

那么數據流是怎么動起來?我們回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go

其實 啟動 整個流程中,其實就是一個組合模式:

func main() {
	// 解析命令行參數,啟動優雅退出
	...
  // service 組合模式
	group := service.NewServiceGroup()
	defer group.Stop()

	for _, processor := range c.Clusters {
		// 連接es
    ...
		// filter processors 構建
    ...
    // 準備es的寫入操作 {寫入的index, 寫入器writer}
		handle := handler.NewHandler(writer, indexer)
		handle.AddFilters(filters...)
		handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
    // 按照配置啟動kafka,并將消費操作傳入,同時加入組合器
		for _, k := range toKqConf(processor.Input.Kafka) {
			group.Add(kq.MustNewQueue(k, handle))
		}
	}
	// 啟動這個組合器
	group.Start()
}

整個數據流,就和這個 group 組合器有關了。

group.Start()
	|- group.doStart()
		|- [service.Start() for service in group.services]

那么說明加入 groupservice 都是實現 Start()。也就是說 kafka 端的啟動邏輯在 Start()

func (q *kafkaQueue) Start() {
	q.startConsumers()
	q.startProducers()

	q.producerRoutines.Wait()
	close(q.channel)
	q.consumerRoutines.Wait()
}
  1. 啟動 kafka 消費程序

  2. 啟動 kafka 消費拉取端【可能會被名字迷惑,實際上是從 kafka 拉取消息到 q.channel

  3. 消費程序終止,收尾工作

而我們傳入 kafka 中的 handler,上文說過其實是 Consume,而這個方法就是在 q.startConsumers() 中執行的:

q.startConsumers()
	|- [q.consumeOne(key, value) for msg in q.channel]
		|- q.handler.Consume(key, value)

這樣整個數據流就徹底串起來了:

什么是go-stash組件

到此,相信大家對“什么是go-stash組件”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平乡县| 巴青县| 汪清县| 昌平区| 临桂县| 左云县| 新津县| 志丹县| 莆田市| 平乐县| 射洪县| 昌吉市| 清原| 堆龙德庆县| 太原市| 界首市| 东阳市| 崇义县| 钦州市| 塔河县| 平遥县| 普洱| 松潘县| 建湖县| 富蕴县| 盐源县| 沾化县| 丹凤县| 丹巴县| 无极县| 全椒县| 丹寨县| 三都| 扬中市| 建水县| 庆阳市| 阳泉市| 五河县| 宣恩县| 兴山县| 镇平县|