您好,登錄后才能下訂單哦!
這篇文章主要介紹“kafka consumer怎么使用”,在日常操作中,相信很多人在kafka consumer怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka consumer怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
consumer作為kafka當中一個重要元素,它的常用操作并不復雜,說白了無非就是2點,1、把數據poll出來,2、把位置標記上。我們找到kafka的java api doc,找到了官方提供的幾種consumer操作的例子,逐一進行分析,看看都有幾種操作類型。
自動 Offset 提交
這個例子顯示了一個基于offset自動提交的consumer api的簡單應用。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
enable.auto.commit
意味著offset將會得到自動提交,而這個自動提交的時間間隔由 auto.commit.interval.ms
來進行控制。
客戶端通過 bootstrap.servers
的配置來連接服務器,這個配值當中可以是一個或多個broker,需要注意的是,這個配置僅僅用來讓客戶端找到我們的server集群,而不需要把集群當中的所有服務器地址都列上。
在這個例子當中,客戶端作為test group的一員,訂閱了foo和bar2個topic。
( 這一段直接翻譯很蹩腳,我會試著根據自己的理解翻譯出來)首先假設,foo和bar這2個topic,都分別有3個partitions,同時我們將上面的代碼在我們的機器上起3個進程,也就是說,在test group當中,目前有了3個consumer,一般來講,這3個consumer會分別獲得 foo和bar 的各一個partitions,這是前提。3個consumer會周期性的執行一個poll的動作(這個動作當中隱含的有一個heartbeat的發送,來告訴cluster我是活的),這樣3個consumer會持續的保有他們對分配給自己的partition的訪問的權利,如果某一個consumer失效了,也就是poll不再執行了,cluster會在一段時間( session.timeout.ms
)之后把partitions分配給其他的consumer。
反序列化的設置,定義了如何轉化bytes,這里我們把key和value都直接轉化為string。
手動的offset控制
除了周期性的自動提交offset之外,用戶也可以在消息被消費了之后提交他們的offset。
某些情況下,消息的消費是和某些處理邏輯相關聯的,我們可以用這樣的方式,手動的在處理邏輯結束之后提交offset。
簡要地說,在這個例子當中,我們希望每次至少消費200條消息并將它們插入數據庫,之后再提交offset。如果仍然使用前面的自動提交方式,就可能出現消息已經被消費,但是插入數據庫失敗的情況。這里可以視作一個簡單的事務封裝。
但是,有沒有另一種可能性,在插入數據庫成功之后,提交offset之前,發生了錯誤,或者說是提交offset本身發生了錯誤,那么就可能出現某些消息被重復消費的情況。
個人認為這段話說的莫名其妙,簡單地說,采用這樣的方式,消息不會被丟失,但是有可能出現重復消費。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
上面的例子當中,我們用commitSync來標記所有的消息;在有些情況下,我們可能希望更加精確的控制offset,那么在下面的例子當中,我們可以在每一個partition當中分別控制offset的提交。
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
注意:提交的offset應該是next message,所以,提交的時候需要在當前最后一條的基礎上+1.
手動的分區分配
前面的例子當中,我們訂閱一個topic,然后讓kafka把該topic當中的不同partitions,公平的在一個consumer group內部進行分配。那么,在某些情況下,我們希望能夠具體的指定partitions的分配關系。
如果某個進程在本地管理了和partition相關的狀態,那么它只需要獲得跟他相關partition。
如果某個進程自身具備高可用性,那么就不需要kafka來檢測錯誤并重新分配partition,因為消費者進程會在另一臺設備上重新啟動。
要使用這種模式,可以用assign方法來代替subscribe,具體指定一個partitions列表。
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
分配之后,就可以像前面的例子一樣,在循環當中調用poll來消費消息。手動的分區分配不需要組協調,所以消費進程失效之后,不會引發partition的重新分配,每一個消費者都是獨立工作的,即使它和其他消費者屬于同一個group。為了避免offset提交的沖突,在這種情況下,通常我們需要保證每一個consumer使用自己的group id。
需要注意的是,手動partition分配和通過subscribe實現的動態的分區分配,2種方式是不能混合使用的。
到此,關于“kafka consumer怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。