您好,登錄后才能下訂單哦!
一、簡介
1、消息傳輸流程
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設計實現上完全不同,此外它并不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統可用性集群保存一些meta信息。
Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息
Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
2、Topics/logs
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“隨機讀寫”。
談到kafka的存儲,就不得不提到分區,即partitions,創建一個topic時,同時可以指定分區數目,分區數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者發送的消息之后,會根據均衡策略將消息存儲到不同的分區中。
kafka服務器消息存儲策略如圖
kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除.日志文件將會根據broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之后對文件內容改動的磁盤IO開支.
對于consumer而言,它需要保存消費消息的offset,對于offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
partitions的設計目的有多個.最根本原因是kafka基于文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發消費的能力.(具體原理參見下文).
3、Distribution(分布)
一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多臺機器上,以提高可用性.
基于replicated方案,那么就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定.
Producers
Producer將消息發布到指定的Topic中,同時Producer也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.
Consumers
本質上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費.
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者.
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的.
kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息.
Guarantees
1) 發送到partitions中的消息將會按照它接收的順序追加到日志中
2) 對于消費者而言,它們消費消息的順序和日志中消息順序一致.
3) 如果Topic的"replicationfactor"為N,那么允許N-1個kafka實例失效。
與生產者的交互
生產者在向kafka集群發送消息的時候,可以通過指定分區來發送到指定的分區中,也可以通過指定均衡策略來將消息發送到不同的分區中,如果不指定,就會采用默認的隨機均衡策略,將消息隨機的存儲到不同的分區中
與消費者的交互
在消費者消費消息時,kafka使用offset來記錄當前消費的位置,在kafka的設計中,可以有多個不同的group來同時消費同一個topic下的消息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不項目,不互相干擾。
對于一個group而言,消費者的數量不應該多余分區的數量,因為在一個group中,每個分區至多只能綁定到一個消費者上,即一個消費者可以消費多個分區,一個分區只能給一個消費者消費
因此,若一個group中的消費者數量大于分區數量的話,多余的消費者將不會收到任何消息。
二、使用場景
1、Messaging
對于一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在一定程度上,尚未確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等)
2、Websit activity tracking
kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/用戶操作等信息發送到kafka中.并實時監控,或者離線統計分析等
3、Log Aggregation
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.
三、設計原理
kafka的設計初衷是希望作為一個統一的信息收集平臺,能夠實時的收集反饋信息,并需要能夠支撐較大的數據量,且具備良好的容錯能力。
1、持久性
2、性能
3、生產者
4、消費者
5、消息傳送機制
6、復制備份
7、日志
8、分配
四、主要配置
1、Broker配置
2、Consumer主要配置
3、Producer主要配置
五、kafka集群搭建步驟
1、系統環境
主機名 | 系統 | zookeeper版本 | IP |
master | CentOS7.4 | 3.4.12 | 192.168.56.129 |
slave1 | CentOS7.4 | 3.4.12 | 192.168.56.130 |
slave2 | CentOS7.4 | 3.4.12 | 192.168.56.131 |
2、暫時關閉防火墻和selinux
3、軟件下載
下載地址:http://kafka.apache.org/downloads.html
備注:下載最新的二進制tgz包
4、搭建zookeeper集群
備注:小伙伴可以參考上一篇文章即可
5、kafka集群
5.1、根據上面的zookeeper集群服務器,把kafka上傳到/home下
5.2、解壓
[root@master home]# tar -zxvf kafka_2.12-2.0.0.tgz
[root@master home]# mv kafka_2.12-2.0.0 kafka01
5.3、配置文件
[root@master home]# cd /home/kafka01/config/
備注:server.properties文件里的broker.id,log.dirs,zookeeper.connect必須根據實際情況進行修改,其他項根據需要自行斟酌,master配置如下:
broker.id=1
port=9091
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka/kafka-logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
num.replica.fetchers=2
log.cleanup.interval.mins=10
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
5.4、啟動服務(master)----前提是三個節點的zookeeper已啟動
[root@master kafka01]# ./bin/kafka-server-start.sh config/server.properties &
補充:
問題:&可以使程序在后臺運行,但一旦斷開ssh終端,后臺Java程序也會終止。
解決辦法:使用shell腳本啟動
[root@master kafka01]# cat start.sh
#!/bin/bash
cd /home/kafka01/
./bin/kafka-server-start.sh config/server.properties &
exit
授權,運行即可
[root@master kafka01]#chmod +x start.sh
5.5、配置slave1和slave2
slave1配置如下:
broker.id=2
port=9092
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動即可
slave2配置如下:
broker.id=3
port=9093
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動即可
6、測試
Kafka通過topic對同一類的數據進行管理,同一類的數據使用同一個topic可以在處理數據時更加的便捷
6.1、創建一個Topic
[root@master kafka01]# bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 1 --partitions 1 --topic test
查看
[root@master kafka01]# bin/kafka-topics.sh --list --zookeeper 192.168.56.129:2181
6.2、創建一個消息消費者
[root@master kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.129:9091 --topic test --from-beginning
消費者創建完成之后,因為還沒有發送任何數據,因此這里在執行后沒有打印出任何數據
不過別著急,不要關閉這個終端,打開一個新的終端,接下來我們創建第一個消息生產者
6.3、創建一個消息生產者
在kafka解壓目錄打開一個新的終端,輸入
[root@master kafka01]# bin/kafka-console-producer.sh --broker-list 192.168.56.129:9091 --topic test
在發送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經打印出了我們剛才發送的消息
zookeeper查看topic
到此即可,共同進步之路!!!!!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。