中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka丟失數據問題優化分析

發布時間:2021-11-22 10:09:35 來源:億速云 閱讀:156 作者:iii 欄目:大數據

本篇內容主要講解“Kafka丟失數據問題優化分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Kafka丟失數據問題優化分析”吧!

數據丟失是一件非常嚴重的事情事,針對數據丟失的問題我們需要有明確的思路來確定問題所在,針對這段時間的總結,我個人面對kafka 數據丟失問題的解決思路如下:

1、是否真正的存在數據丟失問題,比如有很多時候可能是其他同事操作了測試環境,所以首先確保數據沒有第三方干擾。

2、理清你的業務流程,數據流向,數據到底是在什么地方丟失的數據,在kafka 之前的環節或者kafka之后的流程丟失?比如kafka的數據是由flume提供的,也許是flume丟失了數據,kafka 自然就沒有這一部分數據。

3、如何發現有數據丟失,又是如何驗證的。從業務角度考慮,例如:教育行業,每年高考后數據量巨大,但是卻反常的比高考前還少,或者源端數據量和目的端數據量不符

4、 定位數據是否在kafka之前就已經丟失還事消費端丟失數據的

kafka支持數據的重新回放功能(換個消費group),清空目的端所有數據,重新消費。
如果是在消費端丟失數據,那么多次消費結果完全一模一樣的幾率很低。
如果是在寫入端丟失數據,那么每次結果應該完全一樣(在寫入端沒有問題的前提下)。
 

5、kafka環節丟失數據,常見的kafka環節丟失數據的原因有:

如果auto.commit.enable=true,當consumer fetch了一些數據但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接著consumer crash掉了。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。

網絡負載很高或者磁盤很忙寫入失敗的情況下,沒有自動重試重發消息。沒有做限速處理,超出了網絡帶寬限速。kafka一定要配置上消息重試的機制,并且重試的時間間隔一定要長一些,默認1秒鐘并不符合生產環境(網絡中斷時間有可能超過1秒)。

如果磁盤壞了,會丟失已經落盤的數據
 

單批數據的長度超過限制會丟失數據,報kafka.common.MessageSizeTooLargeException異常解決:

Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.

Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).

Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.

Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
 

6、partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的數據因為未備份就丟失了!kafka是多副本的,當你配置了同步復制之后。多個副本的數據都在PageCache里面,出現多個副本同時掛掉的概率比1個副本掛掉的概率就很小了。(官方推薦是通過副本來保證數據的完整性的)

7、kafka的數據一開始就是存儲在PageCache上的,定期flush到磁盤上的,也就是說,不是每個消息都被存儲在磁盤了,如果出現斷電或者機器故障等,PageCache上的數據就丟失了。可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的數據多些,小會影響性能但在0.8版本,可以通過replica機制保證數據不丟,代價就是需要更多資源,尤其是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題 是否使用replica取決于在可靠性和資源代價之間的balance。

同時kafka也提供了相關的配置參數,來讓你在性能與可靠性之間權衡(一般默認):

當達到下面的消息數量時,會將數據flush到日志文件中。默認10000

log.flush.interval.messages=10000
 

當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms

log.flush.interval.ms=1000
 

檢查是否需要將日志flush的時間間隔

log.flush.scheduler.interval.ms = 3000  
  
Kafka的優化建議

producer端

  • 設計上保證數據的可靠安全性,依據分區數做好數據備份,設立副本數等。push數據的方式:同步異步推送數據:權衡安全性和速度性的要求,選擇相應的同步推送還是異步推送方式,當發現數據有問題時,可以改為同步來查找問題。

  • flush是kafka的內部機制,kafka優先在內存中完成數據的交換,然后將數據持久化到磁盤.kafka首先會把數據緩存(緩存到內存中)起來再批量flush。可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔

  • 可以通過replica機制保證數據不丟。代價就是需要更多資源,尤其是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題。是否使用replica(副本)取決于在可靠性和資源代價之間的balance(平衡)

  • broker到 Consumer kafka的consumer提供兩種接口。

high-level版本已經封裝了對partition和offset的管理,默認是會定期自動commit offset,這樣可能會丟數據的
low-level版本自己管理spout線程和partition之間的對應關系和每個partition上的已消費的offset(定期寫到zk)
并且只有當這個offset被ack后,即成功處理后,才會被更新到zk,所以基本是可以保證數據不丟的即使spout線程crash(崩潰),重啟后還是可以從zk中讀到對應的offset
 
  • 異步要考慮到partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的數據因為未備份就丟失了!

不能讓內存的緩沖池太滿,如果滿了內存溢出,也就是說數據寫入過快,kafka的緩沖池數據落盤速度太慢,這時肯定會造成數據丟失。
盡量保證生產者端數據一直處于線程阻塞狀態,這樣一邊寫內存一邊落盤。
異步寫入的話還可以設置類似flume回滾類型的batch數,即按照累計的消息數量,累計的時間間隔,累計的數據大小設置batch大小。
 
  • 設置合適的方式,增大batch 大小來減小網絡IO和磁盤IO的請求,這是對于kafka效率的思考。

不過異步寫入丟失數據的情況還是難以控制
還是得穩定整體集群架構的運行,特別是zookeeper,當然正對異步數據丟失的情況盡量保證broker端的穩定運作吧
 

kafka不像hadoop更致力于處理大量級數據,kafka的消息隊列更擅長于處理小數據。針對具體業務而言,若是源源不斷的push大量的數據(eg:網絡爬蟲),可以考慮消息壓縮。但是這也一定程度上對CPU造成了壓力,還是得結合業務數據進行測試選擇 

broker端

topic設置多分區,分區自適應所在機器,為了讓各分區均勻分布在所在的broker中,分區數要大于broker數。分區是kafka進行并行讀寫的單位,是提升kafka速度的關鍵。

  1. broker能接收消息的最大字節數的設置一定要比消費端能消費的最大字節數要小,否則broker就會因為消費端無法使用這個消息而掛起。

  2. broker可賦值的消息的最大字節數設置一定要比能接受的最大字節數大,否則broker就會因為數據量的問題無法復制副本,導致數據丟失。 

comsumer端

關閉自動更新offset,等到數據被處理后再手動跟新offset。

在消費前做驗證前拿取的數據是否是接著上回消費的數據,不正確則return先行處理排錯。

一般來說zookeeper只要穩定的情況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分區的數據,其中一個先提交了,另一個就丟失了。 

問題

kafka的數據一開始就是存儲在PageCache上的,定期flush到磁盤上的,也就是說,不是每個消息都被存儲在磁盤了,如果出現斷電或者機器故障等,PageCache上的數據就丟失了。這個是總結出的到目前為止沒有發生丟失數據的情況

//producer用于壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好
    props.put("compression.type", "gzip");
    //增加延遲
    props.put("linger.ms", "50");
    //這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。,
    props.put("acks", "all");
    //無限重試,直到你意識到出現了問題,設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
    props.put("retries ", MAX_VALUE);
    props.put("reconnect.backoff.ms ", 20000);
    props.put("retry.backoff.ms", 20000);

    //關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數據丟失
    props.put("unclean.leader.election.enable", false);
    //關閉自動提交offset
    props.put("enable.auto.commit", false);
    限制客戶端在單個連接上能夠發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求之前client不能再向同一個broker發送請求。注意:設置此參數是為了避免消息亂序
    props.put("max.in.flight.requests.per.connection", 1);
   
Kafka重復消費原因

強行kill線程,導致消費后的數據,offset沒有提交,partition就斷開連接。比如,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-blance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費。

如果在close之前調用了consumer.unsubscribe()則有可能部分offset沒提交,下次重啟會重復消費。

kafka數據重復 kafka設計的時候是設計了(at-least once)至少一次的邏輯,這樣就決定了數據可能是重復的,kafka采用基于時間的SLA(服務水平保證),消息保存一定時間(通常為7天)后會被刪除。

kafka的數據重復一般情況下應該在消費者端,這時log.cleanup.policy = delete使用定期刪除機制。

到此,相信大家對“Kafka丟失數據問題優化分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

治县。| 黑山县| 江油市| 山东省| 德化县| 上高县| 木兰县| 博野县| 禹州市| 吉木萨尔县| 迁西县| 孟州市| 惠水县| 洮南市| 永昌县| 宁化县| 万山特区| 乌鲁木齐市| 京山县| 色达县| 澳门| 革吉县| 上栗县| 秦皇岛市| 玉山县| 吉首市| 桂林市| 溆浦县| 南江县| 恩平市| 榆树市| 通化县| 佛坪县| 鄢陵县| 饶河县| 田林县| 徐州市| 惠来县| 青铜峡市| 灵川县| 顺平县|