您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關Kafka配置參數Consumer的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
#############################Consumer #############################
# Consumer端核心的配置是group.id、zookeeper.connect
# 決定該Consumer歸屬的唯一組ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.
group.id
# 消費者的ID,若是沒有設置的話,會自增
consumer.id
# 一個用于跟蹤調查的ID ,最好同group.id相同
client.id = <group_id>
# 對于zookeeper集群的指定,必須和broker使用同樣的zk配置
zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182
# zookeeper的心跳超時時間,查過這個時間就認為是無效的消費者
zookeeper.session.timeout.ms = 6000
# zookeeper的等待連接時間
zookeeper.connection.timeout.ms = 6000
# zookeeper的follower同leader的同步時間
zookeeper.sync.time.ms = 2000
# 當zookeeper中沒有初始的offset時,或者超出offset上限時的處理方式 。
# smallest :重置為最小值
# largest:重置為最大值
# anything else:拋出異常給consumer
auto.offset.reset = largest
# socket的超時時間,實際的超時時間為max.fetch.wait + socket.timeout.ms.
socket.timeout.ms= 30 * 1000
# socket的接收緩存空間大小
socket.receive.buffer.bytes=64 * 1024
#從每個分區fetch的消息大小限制
fetch.message.max.bytes = 1024 * 1024
# true時,Consumer會在消費消息后將offset同步到zookeeper,這樣當Consumer失敗后,新的consumer就能從zookeeper獲取最新的offset
auto.commit.enable = true
# 自動提交的時間間隔
auto.commit.interval.ms = 60 * 1000
# 用于消費的最大數量的消息塊緩沖大小,每個塊可以等同于fetch.message.max.bytes中數值
queued.max.message.chunks = 10
# 當有新的consumer加入到group時,將嘗試reblance,將partitions的消費端遷移到新的consumer中, 該設置是嘗試的次數
rebalance.max.retries = 4
# 每次reblance的時間間隔
rebalance.backoff.ms = 2000
# 每次重新選舉leader的時間
refresh.leader.backoff.ms
# server發送到消費端的最小數據,若是不滿足這個數值則會等待直到滿足指定大小。默認為1表示立即接收。
fetch.min.bytes = 1
# 若是不滿足fetch.min.bytes時,等待消費端請求的最長等待時間
fetch.wait.max.ms = 100
# 如果指定時間內沒有新消息可用于消費,就拋出異常,默認-1表示不受限
consumer.timeout.ms = -1
------------------------------------Kafka配置參數--Consumer詳解-----------------------------
group.id 默認值:無
唯一的指明了consumer的group的名字,group名一樣的進程屬于同一個consumer group。
zookeeper.connect 默認值:無
指定了ZooKeeper的connect string,以hostname:port的形式,hostname和port就是ZooKeeper集群各個節點的hostname和port。 ZooKeeper集群中的某個節點可能會掛掉,所以可以指定多個節點的connect string。如下所式:
hostname1:port1,hostname2:port2,hostname3:port3.
ZooKeeper也可以允許你指定一個"chroot"的路徑,可以讓Kafka集群將需要存儲在ZooKeeper的數據存儲到指定的路徑下這可以讓多個Kafka集群或其他應用程序公用同一個ZooKeeper集群。可以使用如下的connect string:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.id 默認值:null
如果沒有設置的話則自動生成。
socket.timeout.ms 默認值:30 * 1000
socket請求的超時時間。實際的超時時間為max.fetch.wait + socket.timeout.ms。
socket.receive.buffer.bytes 默認值:64 * 1024
socket的receiver buffer的字節大小。
fetch.message.max.bytes 默認值:1024 * 1024
每一個獲取某個topic的某個partition的請求,得到最大的字節數,每一個partition的要被讀取的數據會加載入內存,所以這可以幫助控制consumer使用的內存。這個值的設置不能小于在server端設置的最大消息的字節數,否則producer可能會發送大于consumer可以獲取的字節數限制的消息。
auto.commit.enable 默認值:true
如果設為true,consumer會定時向ZooKeeper發送已經獲取到的消息的offset。當consumer進程掛掉時,已經提交的offset可以繼續使用,讓新的consumer繼續工作。
auto.commit.interval.ms 默認值:60 * 1000
consumer向ZooKeeper發送offset的時間間隔。
queued.max.message.chunks 默認值:10
緩存用來消費的消息的chunk的最大數量,每一個chunk最大可以達到fetch.message.max.bytes。
rebalance.max.retries 默認值:4
當一個新的consumer加入一個consumer group時,會有一個rebalance的操作,導致每一個consumer和partition的關系重新分配。如果這個重分配失敗的話,會進行重試,此配置就代表最大的重試次數。
fetch.min.bytes 默認值:1
一個fetch請求最少要返回多少字節的數據,如果數據量比這個配置少,則會等待,知道有足夠的數據為止。
fetch.wait.max.ms 默認值:100
在server回應fetch請求前,如果消息不足,就是說小于fetch.min.bytes時,server最多阻塞的時間。如果超時,消息將立即發送給consumer.。
rebalance.backoff.ms 默認值:2000
在rebalance重試時的backoff時間。
refresh.leader.backoff.ms 默認值:200
在consumer發現失去某個partition的leader后,在leader選出來前的等待的backoff時間。
auto.offset.reset 默認值:largest
在Consumer在ZooKeeper中發現沒有初始的offset時或者發現offset不在范圍呢,該怎么做:
* smallest : 自動把offset設為最小的offset。
* largest : 自動把offset設為最大的offset。
* anything else: 拋出異常。
consumer.timeout.ms 默認值:-1
如果在指定的時間間隔后,沒有發現可用的消息可消費,則拋出一個timeout異常。
client.id 默認值: group id value
每一個請求中用戶自定義的client id,可幫助追蹤調用情況。
zookeeper.session.timeout.ms 默認值:6000
ZooKeeper的session的超時時間,如果在這段時間內沒有收到ZK的心跳,則會被認為該Kafka server掛掉了。如果把這個值設置得過低可能被誤認為掛掉,如果設置得過高,如果真的掛了,則需要很長時間才能被server得知。
zookeeper.connection.timeout.ms 默認值:6000
client連接到ZK server的超時時間。
zookeeper.sync.time.ms 默認值:2000
一個ZK follower能落后leader多久。
感謝各位的閱讀!關于“Kafka配置參數Consumer的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。