您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關kafka流量監控的原理及實現方法是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
工程能力
作為一個優秀的開發人員,項目開發的過程中監控告警系統的可靠性是可以體現出一個人的工程管理能力的。優秀的監控告警系統可以免去很多精力消耗,比如維護,故障預判,故障及時準確通知,故障定位排查等。
可以想像項目上線后,假如沒有監控告警系統,這么一個暗箱是多么可怕。
對于大數據項目,數據一般需要先入消息隊列,如kafka,然后分離線和實時將數據進行解耦分流,用于實時處理和離線處理。消息隊列存在的好處:
消息隊列的訂閱者可以根據需要隨時擴展,可以很好的擴展數據的使用者。
消息隊列的橫向擴展,增加吞吐量,做起來還是很簡單的。這個用傳統數據庫,分庫分表還是很麻煩的。
由于消息隊列的存在,也可以幫助我們抗高峰,避免高峰時期后端處理壓力過大導致整個業務處理宕機。
kafka在大數據項目中作用至關重要,那么對其的監控告警就至關重要了,我們這里主要是講針對kafka流量的監控告警,其目的也是很明顯的便于我們了解數據的整體情況及波動情況,以調整處理后端,如spark streaming,flume等。
kafka 監控工具很多,常見的有kafka manager,KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最經常使用的是kafka manager,也建議大家使用該工具,其不僅有監控功能還有管理功能。具體使用方法可以參看:
kafka管理神器-kafkamanager
監控指標
kafka的指標服務器和客戶端都有的。具體指標內容,可以參看kafka官網:
http://kafka.apache.org/0102/documentation.html#monitoring
查看可用指標的最簡單方法是啟動jconsole并將其指向正在運行的kafka客戶端或服務器; 這將允許使用JMX瀏覽所有指標。
對于熟悉kafka manager的朋友都應該看過broker相關信息,比如每秒鐘的流入的消息條數,每秒鐘的流入的消息大小,流出的消息大小等。
使用kafka manager可以很方便的查看。但是,這其實不能讓我們及時的發現數據流量波動,或者說我們想畫個曲線的詳細對比歷史流量,它是做不到的。所以,我們要想辦法去獲取出來這些指標,然后做我們自己的展示。還有一點就是,流量波動告警。
浪尖這里只做了圖中幾個指標的接口:
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)
}
def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)
}
def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)
}
def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)
}
def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)
}
def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)
}
jmx客戶端
連接jmx的server是可以使用jconsole,但是滿足不了我們的需求。所以,我們使用JMXConnectorFactory 方式連接jmx。使用JMXConnectorFactory 鏈接jmx時,JMXServiceURL 的參數 url 必須使用 service:jmx 方式進行連接,具體鏈接創建方式很簡單,幾行代碼而已,如下:
val jmxHost = "hostname"
val jmxPort = 9999
val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"
val url = new JMXServiceURL(urlString)
val jmxc = JMXConnectorFactory.connect(url )
val mbsc = jmxc.getMBeanServerConnection;
println(KafkaMetrics.getMessagesInPerSec(Kafka_0_10_2_1,mbsc,Some("test")).fifteenMinuteRate)
jmxc.close()
開啟kafka的jmx端口
kafka的jmx服務默認時關閉的,開啟的話很簡單,只需要在kafka server的啟動腳本kafka-server-start.sh里增加一行代碼即可,內容export JMX_PORT="9999",增加位置如下:
if [ "x$KAFKA_HEAP_OPTS" = "x"]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
測試
我這里測試就比較簡單了,主要是將消息條數打出來,大家可以根據需要自行調整,比如均值大于閾值發短信告警等。
一套完整的kafka監控,包括:
消費者監控,主要是存活告警,消費滯后告警。
生產者監控,主要是存活告警,生產者消費上游數據能力告警。
broker監控,主要是存活告警,流量告警,isr列表,topic異常告警,control變換告警。
以上就是kafka流量監控的原理及實現方法是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。