您好,登錄后才能下訂單哦!
主要是要讓它來格式化:GroupMetadataManager.OffsetsMessageFormatter
最后用看了它的源碼,把這部分挑選出來,自己解析了得到的byte[]。核心代碼如下:
// com.sina.mis.app.ConsumerInnerTopic ConsumerRecords<byte[], byte[]> records = consumer.poll(512); for (ConsumerRecord<byte[], byte[]> record : records) { Object offsetKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if (offsetKey instanceof OffsetKey) { GroupTopicPartition groupTopicPartition = ((OffsetKey) offsetKey).key(); OffsetAndMetadata value = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); LOG.info(groupTopicPartition.toString() + "---:---" + value); } else { LOG.info("############:{}", offsetKey); } }
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \ --zookeeper localhost:2181 --topic __consumer_offsets
#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \ --zookeeper 10.39.40.98:2181/kafka10 --topic __consumer_offsets
Committing and fetching consumer offsets in Kafka : 使用Java API獲取Consumer Offset。
Since version 0.8.2, kafka has had the ability to store consumer offsets in an internal compacted topic called __consumer_offsets
Kafka 用 Yammer Metrics來存儲Server、Client的數據。可以使用插件式的方式獲取這些數據,寫入到CSV文件。
Kafka 實現了 KafkaCSVMetricsReporter.scala
,可以 將metrics寫入到CSV文件。
由于沒有實現寫入ganglia的實現類,所以無法直接從Kafka將metrics寫入到ganglia。
document
為什么某個topic的HDFS的數據多余Kafka自己統計的流量40%左右。
sina的KafkaProxy都使用了snappy壓縮后入kafka。
猜想 30%-40%
需要測試一下:找一批HDFS的文件,寫入Kafka,消費出來,寫成文件,看看大小差別。
High level Consumer的API,默認以Range的方式分配,還有另外一個是RoundRobin。
這是有DefaultPartitioner
決定的。
If a partition is specified in the record, use it.
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
中文:
有key就hash
沒key就Round-robin
0.8.0 版本在沒key的時候,是Random的方式。
90%的broker GC暫停時間為21ms左右。每秒進行的young GC小于1次
傳統網絡IO流程,一次傳送過程:
從Disk把數據讀到內核區的Read Buffer。
把數據從內核區到用戶區Buffer。
再把數據寫入到內核區的Socket Buffer上。
把數據從Socket Buffer復制到網卡的NIC Buffer上。
Kafka少了中間兩步,這就是sendfile技術:
依賴OS文件系統的頁緩存 (當上層有寫操作時,操作系統只是將數據寫入PageCache,同時標記Page屬性為Dirty。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤調度,最終返回需要的數據。實際上PageCache是把盡可能多的空閑內存都當做了磁盤緩存來使用。同時如果有其他進程申請內存,回收PageCache的代價又很小。
總結:依賴OS的頁緩存能大量減少IO,高效利用內存來作為緩存)新航道雅思
不使用JVM緩存數據, 內存利用率高
順序IO以及O(1)常量時間get、put消息
sendfile技術(零拷貝)
每次發送數據時,Producer都是send()
之后就認為已經發送出去了,但其實大多數情況下消息還在內存的MessageSet當中,尚未發送到網絡,這時候如果Producer掛掉,那就會出現丟數據的情況。
解決辦法: ack機制,一般設置為acks=1,消息只需要被Leader接受并確認即可,這樣同時保證了可靠性和效率。
MessageSet手段批量順序寫入
數據支持壓縮
異步發送
push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
log:字節流,sendfile、zero copy技術
index:稀疏索引,mmap的數據結構-本質是個類,二分查找尋找到offset。
一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”)。
剛創建的topic一般"preferred replica"是leader。在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。
所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式,通知需要做此修改的Broker。
那么Controller是如何選舉leader的?
如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader。
如果replica都不在ISR列表里面,選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。
如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后寫入數據。
Consumer(0.8)通過zk找到leader,讀取數據。
Consumer(0.10)通過Coordinator找到Leader,讀取數據。
不會。擴容的時候,新的leader需要從舊有的broker復制數據,跟上以后,會切換成leader。
這個時間期間,Producer、Consumer會向舊有的leader通信。
如果請求過來的topic是__consumer_offsets,那就啟動OffsetManager的異步讀
這個異步讀會一直讀取__consumer_offsets并把消息解碼成消費進度放入緩存
I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。
單機partition數的最大值:100 * broker * replica
(if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。