在Java中連接Kafka并創建topic,可以使用KafkaAdminClient類的createTopics()方法。
以下是一個示例代碼:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Arrays;
import java.util.Properties;
public class KafkaTopicCreator {
public static void main(String[] args) {
// Kafka broker地址
String bootstrapServers = "localhost:9092";
// 創建AdminClient的配置
Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 創建AdminClient實例
try (AdminClient adminClient = AdminClient.create(adminClientConfig)) {
// 創建一個NewTopic對象
NewTopic newTopic = new NewTopic("my-topic", 3, (short) 1);
// 使用AdminClient創建topic
adminClient.createTopics(Arrays.asList(newTopic)).all().get();
System.out.println("Topic created successfully");
} catch (Exception e) {
e.printStackTrace();
}
}
}
在上述代碼中,我們首先創建一個AdminClient實例,然后使用NewTopic類創建一個新的topic對象。接下來,我們使用AdminClient的createTopics()方法,并將新的topic對象作為參數傳遞給它。最后,通過調用all().get()方法等待創建topic的完成。
注意:為了能夠成功創建topic,您需要運行Kafka服務,并且您的Java應用程序需要能夠訪問Kafka broker。