中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python如何實現對kafka的基本操作

發布時間:2021-10-18 15:25:40 來源:億速云 閱讀:215 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“python如何實現對kafka的基本操作”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“python如何實現對kafka的基本操作”這篇文章吧。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生產者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個消費者消費一個topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個偏移量消費
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個消費者訂閱多個topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消費者(手動拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何實現對kafka的基本操作”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

连平县| 泉州市| 托里县| 常山县| 玉溪市| 石门县| 通化县| 八宿县| 二连浩特市| 嵊泗县| 崇左市| 扎赉特旗| 伊宁市| 金平| 修水县| 五华县| 东乡族自治县| 金乡县| 石柱| 亳州市| 治多县| 红桥区| 和硕县| 定兴县| 汾阳市| 漳平市| 南安市| 大石桥市| 玉环县| 凌源市| 普兰县| 鲁甸县| 松滋市| 东光县| 德钦县| 盱眙县| 吴堡县| 章丘市| 涞水县| 浦城县| 吉木乃县|