中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka分布式集群

發布時間:2020-07-19 07:00:21 來源:網絡 閱讀:1992 作者:蔣將將 欄目:建站服務器

一、簡介

1、消息傳輸流程

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設計實現上完全不同,此外它并不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統可用性集群保存一些meta信息。

kafka分布式集群kafka分布式集群

Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。

Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息

Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。

2、Topics/logs

一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“隨機讀寫”。

kafka分布式集群kafka分布式集群

談到kafka的存儲,就不得不提到分區,即partitions,創建一個topic時,同時可以指定分區數目,分區數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者發送的消息之后,會根據均衡策略將消息存儲到不同的分區中。

kafka服務器消息存儲策略如圖

kafka分布式集群kafka分布式集群

kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除.日志文件將會根據broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之后對文件內容改動的磁盤IO開支.

對于consumer而言,它需要保存消費消息的offset,對于offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中,參見下文)

kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.

partitions的設計目的有多個.最根本原因是kafka基于文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發消費的能力.(具體原理參見下文).

3、Distribution(分布)

一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多臺機器上,以提高可用性.

基于replicated方案,那么就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定.

Producers

Producer將消息發布到指定的Topic中,同時Producer也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.

Consumers

本質上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費.

如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡.

如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者.

在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的.

kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息.

Guarantees

1) 發送到partitions中的消息將會按照它接收的順序追加到日志中

2) 對于消費者而言,它們消費消息的順序和日志中消息順序一致.

3) 如果Topic的"replicationfactor"為N,那么允許N-1個kafka實例失效。

與生產者的交互

kafka分布式集群kafka分布式集群

生產者在向kafka集群發送消息的時候,可以通過指定分區來發送到指定的分區中,也可以通過指定均衡策略來將消息發送到不同的分區中,如果不指定,就會采用默認的隨機均衡策略,將消息隨機的存儲到不同的分區中

與消費者的交互

kafka分布式集群kafka分布式集群

在消費者消費消息時,kafka使用offset來記錄當前消費的位置,在kafka的設計中,可以有多個不同的group來同時消費同一個topic下的消息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不項目,不互相干擾。

  對于一個group而言,消費者的數量不應該多余分區的數量,因為在一個group中,每個分區至多只能綁定到一個消費者上,即一個消費者可以消費多個分區,一個分區只能給一個消費者消費

  因此,若一個group中的消費者數量大于分區數量的話,多余的消費者將不會收到任何消息。

 

二、使用場景

1、Messaging

對于一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在一定程度上,尚未確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等)

2、Websit activity tracking

kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/用戶操作等信息發送到kafka中.并實時監控,或者離線統計分析等

3、Log Aggregation

kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.

三、設計原理

kafka的設計初衷是希望作為一個統一的信息收集平臺,能夠實時的收集反饋信息,并需要能夠支撐較大的數據量,且具備良好的容錯能力。

1、持久性

2、性能

3、生產者

4、消費者

5、消息傳送機制

6、復制備份

7、日志

8、分配

四、主要配置

1、Broker配置

kafka分布式集群kafka分布式集群

2、Consumer主要配置

kafka分布式集群kafka分布式集群

3、Producer主要配置

kafka分布式集群kafka分布式集群

五、kafka集群搭建步驟

1、系統環境

主機名

系統

zookeeper版本

IP

master

CentOS7.4

3.4.12

192.168.56.129

slave1

CentOS7.4

3.4.12

192.168.56.130

slave2

CentOS7.4

3.4.12

192.168.56.131

2、暫時關閉防火墻和selinux

3、軟件下載

下載地址:http://kafka.apache.org/downloads.html

備注:下載最新的二進制tgz包

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群

4、搭建zookeeper集群

備注:小伙伴可以參考上一篇文章即可

5、kafka集群

5.1、根據上面的zookeeper集群服務器,把kafka上傳到/home下

5.2、解壓

[root@master home]# tar -zxvf kafka_2.12-2.0.0.tgz

[root@master home]# mv kafka_2.12-2.0.0 kafka01

5.3、配置文件

[root@master home]# cd /home/kafka01/config/

備注:server.properties文件里的broker.id,log.dirs,zookeeper.connect必須根據實際情況進行修改,其他項根據需要自行斟酌,master配置如下:

broker.id=1  

port=9091

num.network.threads=2

num.io.threads=2

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

 log.dirs=/var/log/kafka/kafka-logs

num.partitions=2

log.flush.interval.messages=10000

log.flush.interval.ms=1000

log.retention.hours=168

#log.retention.bytes=1073741824

log.segment.bytes=536870912

num.replica.fetchers=2

log.cleanup.interval.mins=10

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

zookeeper.connection.timeout.ms=1000000

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/kafka_metrics

kafka.csv.metrics.reporter.enabled=false

5.4、啟動服務(master)----前提是三個節點的zookeeper已啟動

[root@master kafka01]# ./bin/kafka-server-start.sh config/server.properties &

kafka分布式集群kafka分布式集群

補充:

問題:&可以使程序在后臺運行,但一旦斷開ssh終端,后臺Java程序也會終止。

解決辦法:使用shell腳本啟動

[root@master kafka01]# cat start.sh

#!/bin/bash

cd /home/kafka01/

./bin/kafka-server-start.sh config/server.properties &

exit

授權,運行即可

[root@master kafka01]#chmod +x start.sh

5.5、配置slave1和slave2

slave1配置如下:

broker.id=2

port=9092

log.dirs=/var/log/kafka

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

啟動即可

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群


slave2配置如下:

broker.id=3

port=9093

log.dirs=/var/log/kafka

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

啟動即可

kafka分布式集群kafka分布式集群

6、測試

Kafka通過topic對同一類的數據進行管理,同一類的數據使用同一個topic可以在處理數據時更加的便捷

6.1、創建一個Topic

[root@master kafka01]# bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 1 --partitions 1 --topic test

查看

[root@master kafka01]# bin/kafka-topics.sh --list --zookeeper 192.168.56.129:2181

kafka分布式集群kafka分布式集群

6.2、創建一個消息消費者

[root@master kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.129:9091 --topic test --from-beginning

kafka分布式集群kafka分布式集群

消費者創建完成之后,因為還沒有發送任何數據,因此這里在執行后沒有打印出任何數據

         不過別著急,不要關閉這個終端,打開一個新的終端,接下來我們創建第一個消息生產者

6.3、創建一個消息生產者

在kafka解壓目錄打開一個新的終端,輸入

[root@master kafka01]# bin/kafka-console-producer.sh --broker-list 192.168.56.129:9091 --topic test

kafka分布式集群kafka分布式集群

在發送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經打印出了我們剛才發送的消息

kafka分布式集群kafka分布式集群

zookeeper查看topic

kafka分布式集群kafka分布式集群

到此即可,共同進步之路!!!!!


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

盖州市| 桦甸市| 云梦县| 庆阳市| 和平县| 临夏县| 汤原县| 汤阴县| 新邵县| 象州县| 广宁县| 琼海市| 福州市| 塔河县| 黎城县| 泾阳县| 通城县| 赤壁市| 融水| 胶州市| 锡林浩特市| 木里| 九龙坡区| 平潭县| 平南县| 墨脱县| 石楼县| 诏安县| 成都市| 秦安县| 乌什县| 岐山县| 宣恩县| 泰兴市| 扎鲁特旗| 乐平市| 鞍山市| 通州区| 色达县| 汨罗市| 邮箱|