在 Kafka 中,生產者發送消息后,可以通過以下幾種方式判斷消息是否發送成功:
send()
方法后,會等待消息的確認返回,如果發送成功,send()
方法會返回一個 RecordMetadata
對象,其中包含了消息的元數據信息;如果發送失敗,則可能拋出異常。ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息發送成功,offset:" + metadata.offset());
} catch (Exception e) {
System.out.println("消息發送失敗:" + e.getMessage());
}
send()
方法后,可以傳遞一個回調函數,在消息發送完成后,會調用該回調函數,通過回調函數可以獲取到發送結果。ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.out.println("消息發送失敗:" + e.getMessage());
} else {
System.out.println("消息發送成功,offset:" + metadata.offset());
}
}
});
無論是同步發送還是異步發送,如果發送失敗,可以根據異常信息進行錯誤處理。