Java程序可以通過Kafka提供的Java客戶端來獲取Kafka的topic。以下是一個獲取topic列表的示例代碼:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicExample {
public static void main(String[] args) {
// Kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 創建AdminClient對象
try (AdminClient adminClient = AdminClient.create(properties)) {
// 獲取topic列表
ListTopicsResult topicsResult = adminClient.listTopics();
// 獲取Future對象
KafkaFuture<Collection<TopicListing>> topicListingFuture = topicsResult.listings();
// 獲取topic列表
Collection<TopicListing> topicListings = topicListingFuture.get();
// 遍歷輸出每個topic
for (TopicListing topicListing : topicListings) {
System.out.println(topicListing.name());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代碼中,我們首先創建了一個AdminClient
對象,并傳入Kafka的配置。然后,我們通過listTopics
方法獲取一個ListTopicsResult
對象,該對象包含了獲取topic列表的方法。我們通過調用listings
方法獲取一個KafkaFuture
對象,該對象代表了一個異步的獲取topic列表的過程。最后,我們通過調用get
方法獲取真正的topic列表,并遍歷輸出每個topic的名稱。
請注意,這里的配置中使用了bootstrap.servers
參數來指定Kafka集群的地址,你需要根據你實際的Kafka集群配置來修改該參數。