中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

kafka手動提交偏移量怎么實現

小億
162
2023-11-28 17:11:46
欄目: 大數據

Kafka 提供了兩種方式來手動提交偏移量:

  1. 使用 commitSync() 方法同步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitSync(); // 手動提交偏移量
    }
} finally {
    consumer.close();
}
  1. 使用 commitAsync() 方法異步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitAsync(); // 異步提交偏移量
    }
} finally {
    consumer.close();
}

在這兩種方式中,commitSync() 方法會一直阻塞直到偏移量提交成功或發生錯誤。而 commitAsync() 方法則會在提交請求發送后立即返回,不會等待確認。如果發生錯誤,可以在 commitAsync() 方法的回調函數中處理。

0
南平市| 图木舒克市| 长阳| 北流市| 桂林市| 商丘市| 增城市| 绵阳市| 南城县| 长治市| 丘北县| 祁门县| 平武县| 屏边| 仪征市| 长顺县| 辽阳县| 电白县| 杂多县| 奉化市| 长丰县| 九龙县| 师宗县| 亚东县| 宜兰县| 白银市| 兰考县| 三门峡市| 武冈市| 千阳县| 安平县| 双辽市| 永济市| 图片| 张掖市| 德阳市| 石首市| 浮山县| 江北区| 永吉县| 巴中市|