中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka多線程Consumer的實例代碼

發布時間:2021-09-08 16:17:34 來源:億速云 閱讀:152 作者:chen 欄目:云計算

這篇文章主要介紹“Kafka多線程Consumer的實例代碼”,在日常操作中,相信很多人在Kafka多線程Consumer的實例代碼問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Kafka多線程Consumer的實例代碼”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

 
多線程示例代碼:
這里要根據自身需求開發,我這里只舉一個簡單的例子,就是幾個分區就啟動幾個consumer,一一對應。
三個類:
Main:
public static void main(String[] args) {

String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
}



import java.util.ArrayList;
import java.util.List;


public class ConsumerGroup {

private List<ConsumerRunnable> consumers;

public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){

consumers = new ArrayList<>(consumerNum);

for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
}
}

public void execute(){

for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
}
}
}



import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerRunnable implements Runnable{

private final KafkaConsumer<String,String> consumer;

public ConsumerRunnable(String bootstrapServers,String groupId,String topic){

Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
poll方法詳解:

(舊版本:多分區多線程     新版本:一個線程管理多個socket連接)

但新版本KafkaConsumer是雙線程的,主線程負責:消息獲取,rebalance,coordinator,位移提交等等,

另一個是后臺心跳線程。

根據上邊的各種配置,poll方法會找到offset,當獲取了足夠多的可用數據,或者等待時間超過了指定的超時時間,就會返回。

java consumer不是線程安全的,同一個KafkaConsumer用在了多個線程中,將會報Kafka Consumer is not safe for multi-threaded assess異常。可以加一個同步鎖進行保護。

poll的超時參數,已經說過1000的話是超時設定,如果沒有很多數據,也就等一秒,就返回了,比如定時5秒的將消息寫入,就可以將超時參數設置為5000,達到效率最大化。

如果沒有定時任務呢,那就設置為  Long.MAX_VALUE   未獲取足夠多的數據就無限等待。這里要捕獲一下WakeupException。

到此,關于“Kafka多線程Consumer的實例代碼”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

吴堡县| 汝南县| 如皋市| 永嘉县| 福建省| 新竹市| 怀化市| 海安县| 扶风县| 亳州市| 乐至县| 朝阳县| 湖北省| 荥经县| 彩票| 阿拉善左旗| 淄博市| 泸水县| 鹿泉市| 灌南县| 仪征市| 青田县| 肇州县| 巨鹿县| 久治县| 江川县| 新邵县| 靖宇县| 高尔夫| 陆河县| 祁门县| 锡林浩特市| 达尔| 益阳市| 嘉兴市| 曲麻莱县| 吉隆县| 绥芬河市| 安康市| 桦甸市| 光山县|