要連接Kafka,可以使用Kafka-Python庫。以下是一個簡單的示例代碼,演示如何連接Kafka并發送消息:
from kafka import KafkaProducer, KafkaConsumer
# 連接Kafka生產者
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 發送消息
producer.send('my_topic', b'Hello, Kafka!')
# 關閉生產者連接
producer.close()
# 連接Kafka消費者
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')
# 消費消息
for message in consumer:
print(message.value.decode('utf-8'))
# 關閉消費者連接
consumer.close()
在上面的示例中,我們首先使用KafkaProducer
類連接到Kafka,并使用send()
方法發送消息到名為my_topic
的主題。
然后,我們使用KafkaConsumer
類連接到Kafka,并使用consumer()
方法訂閱my_topic
主題。然后,使用for
循環遍歷消費者對象以接收消息,并使用value
屬性獲取消息的值。
最后,我們關閉生產者和消費者的連接。
請注意,上述示例假設Kafka服務器在本地運行,且端口號為9092。您需要根據您的Kafka服務器配置進行適當的更改。