您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關kafka集群發送消息報錯怎么辦,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
因為logstash采集的日志要發往kafka做一個隊列機制,搭建完kafka集群后發送消息出現問題
ERROR fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:slave4,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:slave4,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:484)
at sun.nio.ch.Net.connect(Net.java:476)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
... 12 more
ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
因為看到 Caused by: java.net.ConnectException: Connection refused ,所以查看 selinux
vim /etc/selinux/config
# This file controls the state of SELinux on the system. # SELINUX= can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUX=disabled # SELINUXTYPE= can take one of these two values: # targeted - Targeted processes are protected, # mls - Multi Level Security protection. SELINUXTYPE=targeted |
強制關閉 selinux
setenforce 0
發送消息還是報一樣的錯
查看zookeeper
[zk: master:2181(CONNECTED) 18] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1465262832441","host":"localhost","version":1,"port":9092}
cZxid = 0x70000005e
ctime = Tue Jun 07 09:27:12 CST 2016
mZxid = 0x70000005e
mtime = Tue Jun 07 09:27:12 CST 2016
pZxid = 0x70000005e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x255213e787e0002
dataLength = 86
numChildren = 0
host是localhost,而我的kafka集群是slave4、5上,zk在master,slave2、3上,所以分別修改 slave4、5 上的server.properties的 host.name=slave4, host.name=slave5
重新啟動kafka,查看zk
[zk: master:2181(CONNECTED) 27] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1465266328389","host":"slave4","version":1,"port":9092}
cZxid = 0x70000008a
ctime = Tue Jun 07 10:25:27 CST 2016
mZxid = 0x70000008a
mtime = Tue Jun 07 10:25:27 CST 2016
pZxid = 0x70000008a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x355213e7be80003
dataLength = 83
numChildren = 0
0上的host變成了slave4
在發送消息成功,接收消息成功
關于“kafka集群發送消息報錯怎么辦”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。