中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang怎么監聽日志文件并發送到kafka中

發布時間:2022-04-14 17:31:50 來源:億速云 閱讀:266 作者:zzz 欄目:開發技術

這篇文章主要講解了“Golang怎么監聽日志文件并發送到kafka中”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Golang怎么監聽日志文件并發送到kafka中”吧!

涉及的golang庫和可視化工具:

go-ini,sarama,tail其中:

  • go-ini:用于讀取配置文件,統一管理配置項,有利于后其的維護

  • sarama:是一個go操作kafka的客戶端。目前我用于向kefka發送消息

  • tail:類似于linux的tail命令了,讀取文件的后幾行。如果文件有追加數據,會檢測到。就是通過它來監聽日志文件

可視化工具:

offsetexplorer:是kafka的可視化工具,這里用來查看消息是否投遞成功

工作的流程

  • 加載配置,初始化saramakafka

  • 起一個的協程,利用tail不斷去監聽日志文件的變化。

  • 主協程中一直阻塞等待tail發送消息,兩者通過一個管道通訊。一旦主協程接收到新日志,組裝格式,然后發送到kafka中

Golang怎么監聽日志文件并發送到kafka中

環境準備

環境的話,確保zookeeperkafka正常運行。因為還沒有使用sarama讀取數據,使用offsetexplorer來查看任務是否真的投遞成功了。

代碼分層

serve來存放寫tail服務類和sarama服務類,conf存放ini配置文件

main函數為程序入口

Golang怎么監聽日志文件并發送到kafka中

關鍵的代碼

main.go

main函數做的有:構建配置結構體,映射配置文件。調用和初始化tail,srama服務。

package main

import (
	"fmt"
	"sarama/serve"

	"github.com/go-ini/ini"
)

type KafkaConfig struct {
	Address     string `ini:"address"`
	ChannelSize int    `ini:"chan_size"`
}
type TailConfig struct {
	Path     string `ini:"path"`
	Filename string `ini:"fileName"`
	// 如果是結構體,則指明分區名
	Children `ini:"tailfile.children"`
}
type Config struct {
	KafkaConfig `ini:"kafka"`
	TailConfig  `ini:"tailfile"`
}
type Children struct {
	Name string `ini:"name"`
}

func main() {
	// 加載配置
	var cfg = new(Config)
	err := ini.MapTo(cfg, "./conf/go-conf.ini")
	if err != nil {
		fmt.Print(err)
	}
	// 初始化kafka
	ks := &serve.KafukaServe{}
	// 啟動kafka消息監聽。異步
	ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))
	// 關閉主協程時,關閉channel
	defer ks.Destruct()

	// 初始化tail
	ts := &serve.TailServe{}
	ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)
	// 阻塞
	ts.Listener(ks.MsgChan)

}

kafka.go

有3個方法 :

  • InitKafka,組裝配置項以及初始化接收消息的管道,

  • Listener,監聽管道消息,收到消息后,將消息組裝,發送到kafka

  • Destruct, 關閉管道

package serve

import (
	"fmt"

	"github.com/Shopify/sarama"
)

type KafukaServe struct {
	MsgChan chan string
	//err         error
}

func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {

	// 讀取配置
	config := sarama.NewConfig()
	// 1. 初始化生產者配置
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 選擇分區
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 成功交付的信息
	config.Producer.Return.Successes = true

	ks.MsgChan = make(chan string, chanSize)

	go ks.Listener(addr, chanSize, config)

}

func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {
	//  連接kafka
	var kafkaClient, _ = sarama.NewSyncProducer(addr, config)
	defer kafkaClient.Close()
	for {
		select {
		case content := <-ks.MsgChan:
			//
			msg := &sarama.ProducerMessage{
				Topic: "weblog",
				Value: sarama.StringEncoder(content),
			}
			partition, offset, err := kafkaClient.SendMessage(msg)
			if err != nil {
				fmt.Println(err)
			}
			fmt.Println("分區,偏移量:")
			fmt.Println(partition, offset)
			fmt.Println("___")
		}

	}
}

func (ks *KafukaServe) Destruct() {
	close(ks.MsgChan)
}

tail.go

主要包括了兩個方法:

  • TailInit初始化,組裝tail配置。Listener

  • Listener,保存kafka服務類初始化之后的管道。監聽日志文件,如果有新日志,就往管道里發送

package serve

import (
	"fmt"

	"github.com/hpcloud/tail"
)

type TailServe struct {
	tails *tail.Tail
}

func (ts *TailServe) TailInit(filenName string) {
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 打開文件開始讀取數據

	ts.tails, _ = tail.TailFile(filenName, config)

	// if err != nil {
	// 	fmt.Println("tails %s failed,err:%v\n", filenName, err)
	// 	return nil, err
	// }
	fmt.Println("啟動," + filenName + "監聽")
}

func (ts *TailServe) Listener(MsgChan chan string) {
	for {
		msg, ok := <-ts.tails.Lines
		if !ok {
			// todo
			fmt.Println("數據接收失敗")
			return
		}
		fmt.Println(msg.Text)
		MsgChan <- msg.Text
	}
}

// 測試案例
func Demo() {
	filename := `E:\xx.log`
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 打開文件開始讀取數據
	tails, err := tail.TailFile(filename, config)
	if err != nil {
		fmt.Println("tails %s failed,err:%v\n", filename, err)
		return
	}
	var (
		msg *tail.Line
		ok  bool
	)
	fmt.Println("啟動")
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Println("tails file close reopen,filename:$s\n", tails.Filename)
		}
		fmt.Println("msg:", msg.Text)
	}
}

感謝各位的閱讀,以上就是“Golang怎么監聽日志文件并發送到kafka中”的內容了,經過本文的學習后,相信大家對Golang怎么監聽日志文件并發送到kafka中這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

苗栗市| 双峰县| 兴义市| 扎鲁特旗| 庆元县| 乡宁县| SHOW| 左贡县| 上饶县| 东城区| 怀化市| 军事| 蓝山县| 星座| 包头市| 三台县| 平凉市| 三明市| 弥勒县| 津南区| 馆陶县| 建昌县| 兴和县| 宁国市| 新干县| 厦门市| 介休市| 明星| 晋宁县| 会理县| 绥宁县| 仙桃市| 原平市| 都兰县| 郧西县| 台南县| 隆安县| 麦盖提县| 博爱县| 昌江| 平原县|