中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka的知識點有哪些

發布時間:2021-11-16 16:37:27 來源:億速云 閱讀:177 作者:iii 欄目:大數據

這篇文章主要講解了“Kafka的知識點有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kafka的知識點有哪些”吧!

  • kafka具有高吞吐量、低延時的主要原因有三個:

    • 一是其在每次寫入數據時只是將數據寫入到操作系統的頁緩存中,這就相當于只是在內存中寫入數據,而繁雜的磁盤IO工作則交由操作系統自行進行;

    • 二是kafka在寫入數據的時候是采用追加的方式寫入到磁盤中的,這種方式省略了磁頭的隨機移動而產生的隨機IO,其效率甚至比內存的隨機讀取都要高;

    • 三是在為kafka配置了較大的頁緩存時,數據大部分的數據讀取和寫入工作都直接在頁緩存中進行了,讀取和寫入的時候甚至不需要進行磁盤的IO工作,而磁盤的IO也只在操作系統將數據從頁緩存寫入到磁盤中才會進行。

  • kafka使用的零拷貝技術是通過操作系統的sendfile指令來實現的。在正常情況下,數據從磁盤讀取然后發送到網卡中需要經過如下步驟:

    在這個過程中,數據的復制總共發生了四次,其中與DMA相關的有兩次,分別是從磁盤上讀取文件數據到內核緩存和將數據從socket相關的內核緩存復制到網卡中,這兩次復制是不可避免的。但是在數據在內核態與用戶態之間的兩次拷貝則非常的消耗CPU的資源,這兩次也是可以避免的,而零拷貝技術就是省去了這兩次的拷貝過程,從而將CPU給釋放出來,以節約資源。sendfile指令指的是,在DMA將磁盤數據拷貝到內核態緩存之后,其會將該緩存中數據的地址值直接當做網絡相關的內核態的緩存地址,從而直接從該緩存中通過DMA技術將數據寫入到網卡中。通過這種方式就省去了數據在內核態與用戶態之間的來回復制。

    • 首先通過DMA(Direct Memory Access)直接存儲器訪問技術讀取磁盤上的文件數據,將其存儲到操作系統內核態緩存中;

    • 然后CPU會將內核態的緩存讀取到應用程序的緩存中;

    • 應用程序獲取到數據之后,會將該數據寫入到socket相關的內核態緩存中;

    • 最后通過DMA技術將內核態緩存中的數據發送到網卡中。

  • 有了消息的持久化,kafka實現了高可靠性;有了負載均衡和使用文件系統的獨特設計,kafka實現了高吞吐量;有了故障轉移,kafka實現了高可用性。

  • kafka的topic本質上指的是一類消息,其可以理解為消息的目標存儲地址,并且kafka并沒有采用topic-message的二級存儲結構,而是采用了topic-partition-message的三級存儲結構。這樣做的原因在于能夠分散數據到不同的partition上,然后將partition分散到不同的物理機器上,這樣就達到了負載均衡提升系統吞吐量的目的。如下是topic-partition-message的三級存儲結構的示意圖:

Kafka的知識點有哪些

  • kafka中每個消息都會被分配一個位移offset,這個位移指的是該消息在當前partition中的一個偏移量,對于每個新增的數據,其位移是依次增加的。在consumer端也有一個位移的概念,這個位移指的是該consumer在當前partition中所消費的一個位置,其一定是小于等于該partition中最新的消息的位移的。

  • 根據配置,kafka的每個partition都可以設置一個或多個副本,這些副本分為leader和follower,leader副本會對外提供寫入和讀取消息的功能,而follower只會從leader副本上讀取數據并保存下來,其不對外提供服務。另外,kafka會保證一個partition的多個副本被平均的分配到多個服務器上。通過這種方式,kafka就實現了服務的高可用,當leaderpartition所在的機器宕機之后,kafka就會在follower副本中選舉出一個作為leader并對外提供服務。在副本中,還有一個ISR的概念,所謂的ISR就是in-sync-replica,也就是處于完全同步狀態的副本,有這個概念的原因主要是follower在同步leader的數據的時候,可能由于網絡等原因導致數據不是完全同步的,這個時候leader就會通過ISR來標記哪些副本是處于完全同步狀態的,這樣在leader宕機的時候,就只會在處于ISR狀態的副本中選舉新的leader,從而保證數據的一致性。

  • 對于kafka硬盤的規劃建議點:

    • 追求性價比的公司可以使用JBOD磁盤;

    • 使用機械硬盤完全可以滿足kafka集群的使用,SSD更好。

  • 磁盤大小規劃的影響因素:

    • 新增消息數

    • 消息留存時間;

    • 平均消息大小;

    • 副本數;

    • 是否啟用壓縮;

  • 對于內存大小設置的建議:

    • 盡量分配更多的內存給操作系統的page cache;

    • 不要為broker設置過大的堆內存,最好不超過6GB;

    • page cache大小至少要大于一個日志段的大小;

  • 對CPU規劃的建議:

    • 使用多核系統,CPU數最好大于8;

    • 如果使用kafka 0.10.0.0之前的版本或clients端與broker端消息版本不一致,則考慮多配置一些資源以防止消息解壓縮操作消耗過多的CPU。

  • 對帶寬資源規劃的建議:

    • 盡量使用高速網絡;

    • 根據自身網絡條件和帶寬來評估kafka集群機器數量;

    • 避免使用跨機房網絡。

  • broker參數

    • broker.id:該參數指的是當前broker的id,每個broker的id不能重復,建議讀者自行指定該參數,數值從0依次網上增長即可,如果不指定該參數,那么broker將會自動生成一個隨機數;

    • log.dirs:該參數指定了kafka持久化消息的目錄,建議讀者根據自己的磁盤數量來指定相應的目錄數,這是因為每個磁盤都有一個磁頭,指定相同數量的目錄之后,kafka寫入日志就可以利用多個磁頭并行的寫入;

    • zookeeper.connect:該參數指定了zookeeper服務器的地址,多個地址可以用逗號隔開,形如zk1:port1,zk2:port2,zk3:port3

    • listeners:broker監聽器列表,格式為[協議]://[主機名]:[端口],[協議]://[主機名]:[端口]。該參數主要用于客戶端連接broker使用,可以認為是broker端開放給clients的監聽端口;

    • advertised.listeners:和listeners類似,該參數也是用于發布給clientss的監聽器,不過該參數主要用于IaaS環境;

    • unclean.leader.election.enable:是否開啟unclean leader選舉,所謂的unclean leader指的是在leader宕機時,會從ISR中選舉一個follower作為新的leader,但是如果ISR中為空的,說明所有的replica都處于未同步狀態,而unclean leader選舉指的就是在這種情況下,是否使用這種未同步狀態的replica作為新的leader;

    • delete.topic.enable:是否允許kafka刪除topic,默認是開啟的,也建議開啟,因為控制是否刪除可以通過權限來進行;

    • log.retention.{hours|minutes|ms}:該參數指定了每個partition的日志的留存時間,默認是7天,超過7天的日志將會被自動刪除,這個參數中的三個選項如果同時配置,那么優先級是ms>minutes>hours;

    • log.retention.bytes:指定了每個消息日志最多保存多大的數據,對于超過該參數的分區日志而言,其會被自動刪除,該參數默認值為-1;

    • min.insync.replicas:該參數指定了broker最少響應client消息發送的最少副本數。需要注意的是,該參數不能設置得與當前broker副本數一樣,比如當前副本有3個,如果設置為3,那么客戶端發送的消息必須在3個副本中都保存才算保存成功,此時如果某個副本宕機了,那么客戶端寫入消息之后就始終不會有目標數量的副本數響應,因而該消息始終無法寫入成功。該參數還可以與客戶端的acks參數配合使用以達到消息持久保存,并且需要注意的是,該參數只有在客戶端的acks參數指定為-1時才有效;

    • num.network.threads:指定了broker在后臺用于處理網絡請求的線程數,默認為3。需要注意的是,這里的“處理”其實只是負責轉發請求,它會將接收到的請求轉發到后面的處理線程中,在真實環境中可以通過NetworkProcessorAvgIdlePercent JMX指標來監控,如果該值持續低于0.3,建議適當提高該參數的值;

    • num.io.threads:這個參數控制了broker端實際處理網絡請求的線程數,默認值是8,可以通過Request HandlerAvgIdlePercent JMX指標來監控該數據,如果持續低于0.3,則可以考慮適當增加該參數值;

    • message.max.bytes:該參數指定了每條消息的最大字節數,默認為977KB,真實環境中一般沒有這么大的消息。

  • topic級別參數

    • delete.retention.ms:每個topic設置自己的日志留存時間,以覆蓋全局默認值;

    • max.message.bytes:每個topic設置自己的消息最大字節數,以覆蓋全局默認值;

    • retention.bytes:每個topic設置自己的日志留存大小,以覆蓋全局默認值。

  • GC參數

    • 建議使用G1 GC;

  • JVM參數

    • 主要是關于堆內存大小的,因為kafka主要使用的是堆外內存,因而建議堆內存不要超過6GB;

  • OS參數

    • 文件描述符限制:由于kafka對打開大量的文件,因而建議將kafka的文件描述符限制設置一個比較大的值,命令如:ulimit -n 100000

    • socket緩沖區大小:一般的內網環境的socket緩沖區大小為64KB,這對于內網是完全足夠的,因為內網的往返時間RTT是非常短的,但是如果消息要經過長距離傳輸,那么建議提升該值,比如128KB,以防止數據堆積;

    • 最好使用Ext4或者XFS文件系統:使用這種文件系統會提供更好的寫入性能,尤其是XFS文件系統;

    • 關閉swap:降低對swap空間的使用,命令為sysctl vm.swappiness=<一個較小的數>

    • 設置更長的flush時間:默認情況下,OS的刷盤時間是5s,但是這個事件太短了,建議提升該值為2分鐘,以更大程度的提升OS物理寫入操作的性能。

  • Producer在發送消息時,其會根據一定的分區篩選策略來選擇將當前消息發送到哪個分區。如果當前消息中指定了key,那么就會根據這個key的hash值將該消息發送給某個分區,也就是說具有同一個key的所有消息將會發送到同一個分區。如果當前消息沒有key,那么就會采用輪詢的方式,依次將消息均勻的發送給各個分區。

  • kafka的producer的工作流程如下:

    ![image-20190826202129576](/Users/zhangxufeng/Library/Application Support/typora-user-images/image-20190826202129576.png)

    首先,producer會將消息封裝到一個ProducerRecord中,然后根據設置的序列化器將其序列化為二進制數據,并且將其放置到producer端的一個消息緩沖池中;接著,另一個線程會不斷的從消息緩沖池中批量的讀取數據,將其封裝在一起后一次性的發送給broker進行處理;處理完成之后,由broker返回處理結果,如果某個消息處理失敗,那么就戶根據設置的重試次數,對其進行重試;如果沒有失敗,則將消息結果返回給客戶端線程。

  • 在發送消息的時候可以指定消息的時間戳,但是建議producer不要設置該參數,因為kafka保存消息是嚴格按照時間戳順序來排列的,如果隨意指定時間戳,那么可能會導致消息混亂,從而找不到消息,并且也可能會影響消息的保存策略。

  • Kafka的producer的異步回調函數中有一個異常參數,該異常分為可重試異常和不可重試異常。可重試異常主要有以下幾類:

    不可重試的異常主要有以下幾類:

    • RecordTooLargeException:發送的消息尺寸過大,超過了規定的大小上線,這種異常一般重試之后也是無法恢復的;

    • SerializationException:序列化異常,重試無法恢復;

    • KafkaException:其他類型的異常。

    • NetworkException:網絡瞬時故障引起的異常,可重試;

    • LeaderNotAvailableException:這種異常一般發生在leader換屆選舉的時候,一般重試之后就會恢復;

    • NotControllerException:當前的controller不可用,一般發生在controller換屆選舉的時候,重試之后可恢復;

  • producer的關閉有兩種方法:不帶超時時間的和帶超時時間的close()方法。不帶超時時間的close()方法會等待之前發送的所有的消息都處理完畢之后再關閉,而帶超時時間的close()方法則需要指定一個超時時間,如果在超時時間結束了,消息還未處理完畢,那么就會終止所有的消息發送,這種情況下是可能丟失消息的。

  • producer端主要參數:

    • 重試可能會造成消息的重復發送。比如某個消息已經成功寫入到了broker端,但是由于網絡抖動,導致producer端沒有接收到響應,或者響應超時,那么producer會嘗試重新發送該消息,這樣就會產生重復消息。在kafka 0.11.0.0版本中已經開始支持”精確一次“的處理語義;

    • 重試可能會造成消息的亂序。在producer發送消息的時候,默認是會將5條消息作為一個批次進行發送,但是如果其中某個消息寫入失敗,而其余四條消息寫入成功,此時該消息就會被重試,這種情況下,本來應該在該消息后面的消息反而被保存在了該消息前面。

    • bootstrap.servers:該參數指定了當前kafka服務器的地址,格式如:ip1:port1,ip2:port2,ip3:port3

    • key.serializer:該參數指定了key的序列化器,必須是類的全限定名;

    • value.serializer:該參數指定了value的序列化器,必須是類的全限定名;

    • acks:這個參數的主要作用是控制producer發送消息后對消息可靠性的管理級別。若設置為0,則表示producer發送消息后,不管消息是否發送成功,即使發生任何異常也無法接收到該異常,這種情況下,消息的可靠性是最低的,但是吞吐量最高;若設置為1,則表示消息發送后,只要在接收消息的broker上成功寫入該消息,并且返回響應之后即可進行后續的操作,這個情況下,消息的可靠性和吞吐量都適中,因為寫入消息的broker如果宕機,那么消息還是會丟失的;若設置為all或-1,則表示當前消息會被寫入到接收消息的broker的日志中,并且還會等待所有ISR集合中的副本都寫入日志成功之后才返回響應,這種情況下,消息的可靠性是最高的,但是吞吐量最低。

    • buffer.memory:該參數指定了當前producer所使用的發送緩沖區的大小;

    • compression.type:該參數指定了producer端壓縮消息的類型,可選擇的有GZIP、Snappy和LZ4。這三種壓縮方式中,LZ4的效率是最好的。如果為消息設置了壓縮類型,這將會顯著的減少網絡IO的開銷,但是會增加producer端的CPU負擔。另外需要注意的是,如果producer端和broker端設置的壓縮類型不同,那么broker端就會對消息進行解壓縮,然后又進行壓縮后再保存,這將增加broker的CPU負擔。

    • retries:該參數指定了在producer端發生了諸如網絡抖動或leader選舉等可重試異常時進行重試的次數。該參數的默認值為0,即不進行重試,建議設置為一個比0大的數,比如3。需要注意的是,設置了重試次數之后,可能會造成兩個問題:

    • max.in.flight.requests.per.connection:指定了在給broker發送消息時,同一時刻發送的請求數量,如果指定為1,那么producer一次就只會發送一個請求;

    • retry.backoff.ms:在進行重試時,producer會等待一段時間再進行重試,以防止過多的重試對broker造成負擔,該參數就是指定這個重試時間間隔的,默認為100ms,建議設置得比當前kafka集群中分區leader選舉的時間稍微長一點,因為這種選舉是最頻繁的;

    • batch.size:在producer發送消息的時候,會緩存一個批次的消息后再發送該消息,該參數就指定了這個緩沖區的大小的,默認是16384,即16KB,這是一個比較保守的數字,建議稍微提高一些該參數,因為一個適當的緩沖區大小將會極大的提升系統的吞吐量;

    • linger.ms:在producer發送消息的時候,有的時候并不會等待一個batch都滿了才發送消息,因為這可能造成極大的延遲,而是會在等待一個時間間隔之后就直接發送這個batch的消息。默認情況下,該參數值為0,表示消息需要被立即發送,但是這樣會拉低系統的吞吐量,為其設置一個適當的值可以提升系統的吞吐量。

    • max.request.size:該參數指定了producer發送請求的大小,默認為1048576;

    • request.timeout.ms:該參數指定了producer發送消息后等待broker響應的超時時間,默認為30S;

  • 自定義Partition的步驟如下:

    • 首先定義一個類實現Partitioner接口;

    • 然后在Producer的Property中配置partitioner.class屬性,值為自定義Partitioner的類的全限定名。

  • 自定義producer的serializer的步驟:

    • 定義數據對象格式;

    • 創建自定義序列化類,實現org.apache.kafka.common.serialization.Serializer接口,在serializer方法中實現序列化邏輯;

    • 在用于構造KafkaProducer的Properties對象中設置key.serializervalue.serializer

  • 自定義producer的攔截器使用步驟:

    • onSend():該方法會保證在消息被序列化以計算分區之前調用,主要是做一些消息發送前的處理工作;

    • onAcknowledgement():該方法主要是在消息被應答之前或者消息發送失敗時調用,并且通常是在producer回調邏輯觸發之前調用;

    • close():該方法主要是關閉interceptor時觸發,作用是進行一些清理工作。

    • 創建攔截器類,實現org.apache.kafka.clients.producer.ProducerInterceptor接口,其有三個方法:

    • 在構造KafkaProducer的Properties對象中設置interceptor.classes屬性,值為元素為ProducerInterceptor的list。

  • Producer端的消息可靠性保證。在Producer端發送消息的時候,kafka會將消息放到一個batch里,然后進行整批數據進行發送,那么這就會產生一個時間窗口,若在這個窗口內producer宕機,那么就有可能丟失消息。producer的這種批量發送的機制會產生兩個問題:

    producer端的無消息丟失配置如下:

    block.on.buffer.full=true
    # 該參數的目的主要是在producer的發送緩沖區滿的時候,必須等到緩沖區的數據處理完畢才會開始下一個批次的消息處理
    
    acks=all or -1
    # 該參數的作用是控制broker只有在所有的ISR中的副本都寫入消息完畢才會發送響應給producer
    
    retries=Integer.MAX_VALUE
    # 該參數指定了重試次數,這樣可以保證可重試的消息一定會寫入成功,而對于不可重試的異常,kafka會將其直接返回
    
    max.in.flight.requests.per.connection=1
    # 該參數指定了producer同一時刻只會給一個broker發送消息,并且會等待該消息處理完畢
    
    使用帶回調機制的send()發送消息,即KafkaProducer.send(record, callback);
    # 使用這種機制是因為可以通過這種方式來確保消息處理完成,這里一定不能使用不帶任何參數的send()方法,因為該方法是一個異步方式,其無法保證消息的順序性
    
    Callback邏輯中顯示地立即關閉producer,使用close(0)
    # 在Callback中顯示的調用close(0)方法可以保證producer不會將未完成的消息發送出去
    
    unclean.leader.election.enable=false
    # 關閉該參數可以保證broker在宕機重新選舉時,不會將沒有完全同步的broker副本選為新的leader,因為這樣會導致消息丟失
    
    replication.factor=3
    # 該參數指定了每個topic的副本數
    
    min.insync.replicas=2
    # 該參數指定了ISR中至少需要有多少個副本處于同步狀態,其必須大于1,因為1個的時候表示只有leader副本,需要注意的是,只有producer的acks為-1或者all時該參數才有效
    
    replication.factor > min.insync.replicas
    # 這種配置可以保證當前分區的副本數大于等于ISR中的副本數,這樣就可以允許至少一個或多個副本宕機,否則,如果某個副本宕機,那么如果producer設置的acks為-1或all時,始終沒法保證ISR中的數量達到目標副本數
    
    enable.auto.commit=false
    # 設置自動提交為false,這樣可以保證消費者消費完成之后再提交消費信息


    • 如果在發送batch數據之前producer宕機,那么就會丟失這一部分數據;

    • 如果在發送batch數據的過程中,某個數據發送失敗了,該消息就會被重試,而其后面成功發送的數據是不會重試的,因而如果重試后成功,那么這條本應該在前面的消息,其就會排到后面了;

  • producer端的時間消耗主要發生在壓縮上,而壓縮的效率與batch的大小是有一定關系的。batch大小越大,壓縮時間就越長,不過時間的增長不是線性的,而是越來越平緩的。如果發現壓縮很慢,說明系統的瓶頸在用戶主進程而不是IO線程,因此可以考慮增加多個用戶線程同時發送消息,這樣通常能顯著的提升producer的吞吐量。

  • 消費者組:消費者所使用一個消費者組名(即group.id)來標記自己,topic的每條消息都只會被發送到每個訂閱它的消費者組的一個消費者實例上。

  • 消費者組的要點:a. 一個consumer group可能有若干個consumer實例(一個group只有一個示例也是允許的);b. 對于同一個group而言,topic的每條消息只能被發送到group下的一個consumer實例上;③topic消息可以被發送到多個group中。

  • 對于使用consumer group的優點,其主要是實現消費者的高伸縮性、容錯性的目的。在正常情況下,消費者組里的消費者會被平均的分配各個partition以進行消息的消費,當某個消費者宕機時,consumer group會將已經崩潰的消費者所消費的分區分配給其他的消費者進行消費,通過這種方式實現集群容錯。如果某個topic的消息比較多,分區數也比消費者數量多,此時如果消費者處理速度比生產者生產消息的速度低,那么就會出現消息積壓的情況,此時就可以通過增加消費者數量的方式提升消費效率,只需要將新加入的消費者實例的groupId指定為當前產生積壓的group的id即可,consumer group就會自動將多余的分區分配給該實例。這里說的這兩種重分配partition的方式稱為再平衡。

  • kafka將保存位移信息沒有保存在zookeeper的原因有兩點:①zookeeper本質上是一個分布式的服務協調工具,其不適合做數據存儲服務;②zookeeper對于讀的性能是非常高的,但是寫性能比較低,而對于kafka這種需要頻繁寫入消費進度的場景,其是不適合的。在新版本中,kafka將每個topic的消費進度都保存在了一個特殊的topic下,即__consumer_offsets

  • kafka將位移信息提交到__consumer_offsets這個topic下的原理如下:首先consumer會將其要提交的信息組成一個KV形式的消息,其key是groupId+topic+partition的一個字符串連接的形式,其值則為當前的偏移量;然后在消息發送到該topic之后,這個topic會對消息進行壓實處理,也就是說,會取每個key下面的最大的value,而只保存該value。通過這種方式,在__consumer_offsets下就為每個group的某個topic下的某個分區唯一保存了一條數據,這條數據就是最新的offset。

感謝各位的閱讀,以上就是“Kafka的知識點有哪些”的內容了,經過本文的學習后,相信大家對Kafka的知識點有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

信丰县| 娄烦县| 清丰县| 余江县| 昔阳县| 肇源县| 酒泉市| 巴东县| 兴隆县| 惠东县| 海门市| 全州县| 关岭| 于都县| 吴忠市| 呼和浩特市| 仁布县| 潞城市| 苍南县| 宁津县| 防城港市| 榆中县| 江山市| 贵溪市| 鞍山市| 讷河市| 商都县| 于田县| 保山市| 龙口市| 保亭| 松潘县| 信丰县| 西贡区| 靖西县| 西乌珠穆沁旗| 海林市| 仲巴县| 襄汾县| 吉首市| 新宾|