在C++中使用librdkafka庫可以很方便地發送JSON數據到Kafka。下面是一個簡單的示例代碼:
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
RdKafka::Topic *kafka_topic = RdKafka::Topic::create(producer, topic, tconf, errstr);
if (!kafka_topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
return 1;
}
std::string json_data = "{\"key\": \"value\"}";
RdKafka::ErrorCode resp = producer->produce(kafka_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(json_data.c_str()), json_data.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
return 1;
}
producer->flush(1000);
delete kafka_topic;
delete producer;
return 0;
}
在這個示例中,我們首先創建一個生產者和一個主題對象,然后使用produce
方法發送一個JSON數據到Kafka主題。在實際使用中,你可能需要根據你的需求調整代碼,并添加錯誤處理邏輯。