Kafka的消費者可以通過兩種方式來管理消息的偏移量:手動管理和自動管理。
示例代碼如下:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitSync();
}
示例代碼如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
}
消費者可以根據實際需求選擇手動管理或自動管理消息的偏移量。手動管理可以提供更精確的控制,但也需要消費者編寫更多的代碼來處理偏移量的提交。自動管理則更為方便,但可能會因為定期提交偏移量而導致消息重復消費的情況發生。