中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行kafka批量消費多消費者問題分析

發布時間:2021-12-15 09:29:26 來源:億速云 閱讀:741 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何進行kafka批量消費多消費者問題分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("----------------- message =" + message);
        }
    }@KafkaListener(id = "2", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )
            })public void listen2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 1=" + record);
            System.out.println("------------------ message 1=" + message);
        }
    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List<ConsumerRecord<?, ?>> records) {try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }

}

一個partition只能有一個消費者,如果多個消費者會是廣播模式,每個消費者都會有一條數據,kafka是一個發布和訂閱模式的主鍵,并不是隊列模式,

spring boot整合時,如果使用topicPartitions 注解參數指定partition會有消息重復消費的問題,最好使用topics注解,并指定groupId。

看完上述內容,你們對如何進行kafka批量消費多消費者問題分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乐东| 湘阴县| 禹州市| 桃江县| 宝应县| 扬州市| 深泽县| 南部县| 钦州市| 三江| 醴陵市| 襄垣县| 同仁县| 瓦房店市| 阜康市| 洪泽县| 木里| 墨竹工卡县| 南阳市| 武隆县| 关岭| 潼关县| 民和| 苍梧县| 隆昌县| 山阳县| 蒙自县| 韶关市| 繁昌县| 扬中市| 和顺县| 民勤县| 揭西县| 奉新县| 诸城市| 徐闻县| 武乡县| 墨竹工卡县| 永丰县| 蒙自县| 张北县|