中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot怎么整合Kafka

發布時間:2022-03-03 15:40:28 來源:億速云 閱讀:139 作者:iii 欄目:web開發

本文小編為大家詳細介紹“SpringBoot怎么整合Kafka”,內容詳細,步驟清晰,細節處理妥當,希望這篇“SpringBoot怎么整合Kafka”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

一、準備工作
提前說明:如果你運行出問題,請檢查Kafka的版本與SpringBoot的版本是否與我文中的一致,本文中的環境已經經過測試。

Kafka服務版本為 kafka_2.11-1.1.0 (Scala), 也就是1.1.0
SpringBoot版本:1.5.10.RELEASE

提前啟動zk,kafka,并且創建一個Topic

[root@Basic kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic 

確保你的kafka能夠訪問,如果訪問不了,需要打開外網訪問。
config/server.properties

advertised.listeners=PLAINTEXT://192.168.239.128:9092

Maven 依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

二、項目結構
為了更加體現實際開發需求,一般生產者都是在調用某些接口的服務處理完邏輯之后然后往kafka里面扔數據,然后有一個消費者不停的監控這個Topic,然后處理數據,所以這里把生產者作為一個接口,消費者放到kafka這個目錄下,注意@Component注解,不然掃描不到@KafkaListener

三、具體實現代碼
SpringBoot配置文件
application.yml

spring:
kafka:
bootstrap-servers: 192.168.239.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生產者
package cn.saytime.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 測試kafka生產者
*/
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic", msg);
return "success";
}

}
消費者
這里的消費者會監聽這個主題,有消息就會執行,不需要進行while(true)

package cn.saytime.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* kafka消費者測試
*/
@Component
public class TestConsumer {

@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

項目啟動類

package cn.saytime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestApplication{

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}

四、測試
運行項目,執行:http://localhost:8080/kafka/send?msg=hello

控制臺輸出:

topic = test_topic, offset = 19, value = hello 
1
為了體現消費者不止執行一次就結束,再調用一次接口: 
http://localhost:8080/kafka/send?msg=kafka

topic = test_topic, offset = 20, value = kafka 
1
所以可以看到這里消費者實際上是不停的poll Topic數據的。

讀到這里,這篇“SpringBoot怎么整合Kafka”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

石棉县| 白水县| 诸暨市| 宁波市| 孝义市| 石柱| 罗源县| 东光县| 乐陵市| 交城县| 苍梧县| 叶城县| 盘山县| 民权县| 麻栗坡县| 广宗县| 广昌县| 广东省| 台中县| 吐鲁番市| 林甸县| 行唐县| 贡山| 蕉岭县| 阳谷县| 新密市| 龙川县| 确山县| 栾城县| 沐川县| 怀宁县| 和田市| 遂平县| 保定市| 宁陵县| 福贡县| 东乌珠穆沁旗| 晋州市| 烟台市| 民勤县| 深泽县|