中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka 0.10問題點滴

發布時間:2020-07-25 17:33:06 來源:網絡 閱讀:997 作者:張濤澤 欄目:網絡安全

15.如何消費內部topic: __consumer_offsets

  • 主要是要讓它來格式化:GroupMetadataManager.OffsetsMessageFormatter

  • 最后用看了它的源碼,把這部分挑選出來,自己解析了得到的byte[]。核心代碼如下:

// com.sina.mis.app.ConsumerInnerTopic
            ConsumerRecords<byte[], byte[]> records = consumer.poll(512);            for (ConsumerRecord<byte[], byte[]> record : records) {
                Object offsetKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));                if (offsetKey instanceof OffsetKey) {
                    GroupTopicPartition groupTopicPartition = ((OffsetKey) offsetKey).key();
                    OffsetAndMetadata value = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
                    LOG.info(groupTopicPartition.toString() + "---:---" + value);
                } else {
                    LOG.info("############:{}", offsetKey);
                }
            }
1.For Kafka 0.8.2.x
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets
2.For Kafka 0.9.x.x and 0.10.0.0
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--zookeeper 10.39.40.98:2181/kafka10 --topic __consumer_offsets
  • Committing and fetching consumer offsets in Kafka : 使用Java API獲取Consumer Offset。

Since version 0.8.2, kafka has had the ability to store consumer offsets in an internal compacted topic called __consumer_offsets

14.Kafka metrics

Kafka 用 Yammer Metrics來存儲Server、Client的數據。可以使用插件式的方式獲取這些數據,寫入到CSV文件。

Kafka 實現了 KafkaCSVMetricsReporter.scala,可以 將metrics寫入到CSV文件。

由于沒有實現寫入ganglia的實現類,所以無法直接從Kafka將metrics寫入到ganglia。

document

13.snappy的壓縮率

為什么某個topic的HDFS的數據多余Kafka自己統計的流量40%左右。

sina的KafkaProxy都使用了snappy壓縮后入kafka。

  • 猜想 30%-40%

  • 需要測試一下:找一批HDFS的文件,寫入Kafka,消費出來,寫成文件,看看大小差別。

12.Kafka的Consumer讀取數據的時候,讀哪個partition

High level Consumer的API,默認以Range的方式分配,還有另外一個是RoundRobin。

11.Kafka的Producer發送數據的時候,發送給哪個partition

這是有DefaultPartitioner決定的。
If a partition is specified in the record, use it.

  • If no partition is specified but a key is present choose a partition based on a hash of the key

  • If no partition or key is present choose a partition in a round-robin fashion

中文:

  • 有key就hash

  • 沒key就Round-robin

0.8.0 版本在沒key的時候,是Random的方式。

10.Linkedin的集群GC情況

90%的broker GC暫停時間為21ms左右。每秒進行的young GC小于1次

9.解釋一下什么是ZoroCopy(sendfile技術)

傳統網絡IO流程,一次傳送過程:

  1. 從Disk把數據讀到內核區的Read Buffer。

  2. 把數據從內核區到用戶區Buffer。

  3. 再把數據寫入到內核區的Socket Buffer上。

  4. 把數據從Socket Buffer復制到網卡的NIC Buffer上。


Kafka少了中間兩步,這就是sendfile技術:


8.kafka如何做到大吞吐量、強大消息堆積能力等特性

  1. 依賴OS文件系統的頁緩存 (當上層有寫操作時,操作系統只是將數據寫入PageCache,同時標記Page屬性為Dirty。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤調度,最終返回需要的數據。實際上PageCache是把盡可能多的空閑內存都當做了磁盤緩存來使用。同時如果有其他進程申請內存,回收PageCache的代價又很小。
    總結:依賴OS的頁緩存能大量減少IO,高效利用內存來作為緩存)新航道雅思

  2. 不使用JVM緩存數據, 內存利用率高

  3. 順序IO以及O(1)常量時間get、put消息

  4. sendfile技術(零拷貝)

7.一個隊列最重要的就是消息丟失問題,kafka是如何處理的

每次發送數據時,Producer都是send()之后就認為已經發送出去了,但其實大多數情況下消息還在內存的MessageSet當中,尚未發送到網絡,這時候如果Producer掛掉,那就會出現丟數據的情況。

解決辦法: ack機制,一般設置為acks=1,消息只需要被Leader接受并確認即可,這樣同時保證了可靠性和效率。

6.Kafka 0.10的Producer做了什么優化

  • MessageSet手段批量順序寫入

  • 數據支持壓縮

  • 異步發送

5.為什么kafka是pull模型

push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。

4.底層的LogSegment、Index是怎么存儲的

  • log:字節流,sendfile、zero copy技術

  • index:稀疏索引,mmap的數據結構-本質是個類,二分查找尋找到offset。

3.一個很重要的問題是當Leader宕機了,怎樣在Follower中選舉出新的Leader

一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”)。

剛創建的topic一般"preferred replica"是leader。在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。

所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式,通知需要做此修改的Broker。

那么Controller是如何選舉leader的?

  • 如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader。

  • 如果replica都不在ISR列表里面,選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。

  • 如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。

2. Partition的leader選舉是怎么樣的

  • Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后寫入數據。

  • Consumer(0.8)通過zk找到leader,讀取數據。

  • Consumer(0.10)通過Coordinator找到Leader,讀取數據。

1. reassign一個topic,Producer、Consumer是否會丟失數據

不會。擴容的時候,新的leader需要從舊有的broker復制數據,跟上以后,會切換成leader。

這個時間期間,Producer、Consumer會向舊有的leader通信。

內部topic:__consumer_offsets
這個topic是用來管理所有的consumer的進度的,這樣避免了把消費進度存zk上面影響擴展性。它是由Coordinator來管理的。

如果請求過來的topic是__consumer_offsets,那就啟動OffsetManager的異步讀
這個異步讀會一直讀取__consumer_offsets并把消息解碼成消費進度放入緩存

queued.max.requests=16

I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。

confluence

  • 單機partition數的最大值:100 * broker * replica (if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.)



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

三门峡市| 安仁县| 泰安市| 远安县| 山西省| 张家口市| 孙吴县| 平乡县| 遵义县| 怀宁县| 安徽省| 永康市| 汽车| 万源市| 樟树市| 通渭县| 三亚市| 临泉县| 潢川县| 香格里拉县| 克什克腾旗| 渝中区| 峨边| 新巴尔虎左旗| 阿克苏市| 西青区| 南汇区| 卫辉市| 望江县| 永清县| 天祝| 永春县| 亳州市| 内丘县| 留坝县| 陇西县| 三门峡市| 遂昌县| 秦安县| 衡东县| 板桥市|