中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

springboot自動配置方式有哪些

發布時間:2021-09-28 17:50:52 來源:億速云 閱讀:226 作者:小新 欄目:開發技術

這篇文章給大家分享的是有關springboot自動配置方式有哪些的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

    spring boot自動配置方式整合

    spring boot具有許多自動化配置,對于kafka的自動化配置當然也包含在內,基于spring boot自動配置方式整合kafka,需要做以下步驟。

    引入kafka的pom依賴包

    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.2.RELEASE</version>
    </dependency>

    在配置文件中配置kafka相關屬性配置,分別配置生產者和消費者的屬性,在程序啟動時,spring boot框架會自動讀取這些配置的屬性,創建相關的生產者、消費者等。下面展示一個簡單的配置。

    #kafka默認消費者配置
    spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    #kafka默認生產者配置
    spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
    spring.kafka.producer.acks=-1
    spring.kafka.client-id=kafka-producer
    spring.kafka.producer.batch-size=5

    當然,在實際生產中的配置肯定比上面的配置復雜,需要一些定制化的操作,那么spring boot的自動化配置創建的生產者或者消費者都不能滿足我們時,應該需要自定義化相關配置,這個在后續舉例,這里先分析自動化配置。

    在進行了如上配置之后,需要生產者時,使用方式為下代碼所示。

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = {UserSSOApplication.class})
    public class UserSSOApplicationTests {
       @Resource
        //注入kafkatemplete,這個由spring boot自動創建
       KafkaTemplate kafkaTemplate;
       @Test
       public void testKafkaSendMsg() {
           //發送消息
          kafkaTemplate.send("test", 0,12,"1222");
       }
    }

    消費者的使用注解方式整合,代碼如下。

    @Component
    @Slf4j
    public class KafkaMessageReceiver2 {
        //指定監聽的topic,當前消費者組id
        @KafkaListener(topics = {"test"}, groupId = "receiver")
        public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
            log.info(integerStringConsumerRecords.value());
        }
    }

    上面是最簡單的配置,實現的一個簡單例子,如果需要更加定制化的配置,可以參考類

    org.springframework.boot.autoconfigure.kafka.KafkaProperties這里面包含了大部分需要的kafka配置。針對配置,在properties文件中添加即可。

    spring boot自動配置的不足

    上面是依賴spring boot自動化配置完成的整合方式,實際上所有的配置實現都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出個是依賴于@Configuration完成bean配置,這種配置方式基本能夠實現大部分情況,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

    但是這種方式還有一個問題,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并沒有涵蓋所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,這就導致某些特殊配置不能依賴spring boot自動創建,需要我們手動創建Producer和comsumer。

    @Configuration
    @ConditionalOnClass(KafkaTemplate.class)
    @EnableConfigurationProperties(KafkaProperties.class)
    @Import(KafkaAnnotationDrivenConfiguration.class)
    public class KafkaAutoConfiguration {
       private final KafkaProperties properties;
       private final RecordMessageConverter messageConverter;
       public KafkaAutoConfiguration(KafkaProperties properties,
             ObjectProvider<RecordMessageConverter> messageConverter) {
          this.properties = properties;
          this.messageConverter = messageConverter.getIfUnique();
       }
       @Bean
       @ConditionalOnMissingBean(KafkaTemplate.class)
       public KafkaTemplate<?, ?> kafkaTemplate(
             ProducerFactory<Object, Object> kafkaProducerFactory,
             ProducerListener<Object, Object> kafkaProducerListener) {
          KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
                kafkaProducerFactory);
          if (this.messageConverter != null) {
             kafkaTemplate.setMessageConverter(this.messageConverter);
          }
          kafkaTemplate.setProducerListener(kafkaProducerListener);
          kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
          return kafkaTemplate;
       }
       @Bean
       @ConditionalOnMissingBean(ConsumerFactory.class)
       public ConsumerFactory<?, ?> kafkaConsumerFactory() {
          return new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
       }
       @Bean
       @ConditionalOnMissingBean(ProducerFactory.class)
       public ProducerFactory<?, ?> kafkaProducerFactory() {
          DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
          String transactionIdPrefix = this.properties.getProducer()
                .getTransactionIdPrefix();
          if (transactionIdPrefix != null) {
             factory.setTransactionIdPrefix(transactionIdPrefix);
          }
          return factory;
       }
      //略略略
    }

    spring boot下手動配置kafka

    由于需要對某些特殊配置進行配置,我們可能需要手動配置kafka相關的bean,創建一個配置類如下,類似于

    org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,這里創建了對應類型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的對應Bean定義將不起作用。

    所有的生產者配置可以參考ProducerConfig類,所有的消費者配置可以參考ConsumerConfig類。

    /**
     * kafka配置,實際上,在KafkaAutoConfiguration中已經有默認的根據配置文件信息創建配置,但是自動配置屬性沒有涵蓋所有
     * 我們可以自定義創建相關bean,進行如下配置
     *
     * @author zhoujy
     * @date 2018年12月17日
     **/
    @Configuration
    public class KafkaConfig {
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String bootstrapServers;
        //構造消費者屬性map,ConsumerConfig中的可配置屬性比spring boot自動配置要多
        private Map<String, Object> consumerProperties(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            return props;
        }
        /**
         * 不使用spring boot默認方式創建的DefaultKafkaConsumerFactory,重新定義創建方式
         * @return
         */
        @Bean("consumerFactory")
        public DefaultKafkaConsumerFactory consumerFactory(){
            return new DefaultKafkaConsumerFactory(consumerProperties());
        }
        @Bean("listenerContainerFactory")
        //個性化定義消費者
        public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
            //指定使用DefaultKafkaConsumerFactory
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            
     //設置消費者ack模式為手動,看需求設置  
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            //設置可批量拉取消息消費,拉取數量一次3,看需求設置
            factory.setConcurrency(3);
            factory.setBatchListener(true);
            return factory;
        }
       /* @Bean
         //代碼創建方式topic
        public NewTopic batchTopic() {
            return new NewTopic("topic.quick.batch", 8, (short) 1);
        }*/
        //創建生產者配置map,ProducerConfig中的可配置屬性比spring boot自動配置要多
        private Map<String, Object> producerProperties(){
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG, "-1");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            return props;
        }
        /**
         * 不使用spring boot的KafkaAutoConfiguration默認方式創建的DefaultKafkaProducerFactory,重新定義
         * @return
         */
        @Bean("produceFactory")
        public DefaultKafkaProducerFactory produceFactory(){
            return new DefaultKafkaProducerFactory(producerProperties());
        }
        /**
         * 不使用spring boot的KafkaAutoConfiguration默認方式創建的KafkaTemplate,重新定義
         * @param produceFactory
         * @return
         */
        @Bean
        public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
            return new KafkaTemplate(produceFactory);
        }
    }

    生產者的使用方式,跟自動配置一樣,直接注入KafkaTemplate即可。主要是消費者的使用有些不同。

    批量消費消息

    上面的消費者配置配置了一個bean,@Bean(“listenerContainerFactory”),這個bean可以指定為消費者,注解方式中是如下的使用方式。

    containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作為消費者。

    • 注意registryReceiver中的參數,ConsumerRecord對比之前的消費者,因為設置listenerContainerFactory是批量消費,因此ConsumerRecord是一個List,如果不是批量消費的話,相對應就是一個對象。

    • 注意第二個參數Acknowledgment,這個參數只有在設置消費者的ack應答模式為AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手動ack。

    @Component
    @Slf4j
    public class KafkaMessageReceiver {   
        /**
         * listenerContainerFactory設置了批量拉取消息,因此參數是List<ConsumerRecord<Integer, String>>,否則是ConsumerRecord
         * @param integerStringConsumerRecords
         * @param acknowledgment
         */
        @KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
        public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
            Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
            while (it.hasNext()){
                ConsumerRecord<Integer, String> consumerRecords = it.next();
                //dosome
                 acknowledgment.acknowledge();
            }
        } 
    }

    如果不想要批量消費消息,那就可以另外定義一個bean類似于@Bean(“listenerContainerFactory”),如下,只要不設置批量消費即可。

    @Bean("listenerContainerFactory2")
        //個性化定義消費者
        public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
            //指定使用DefaultKafkaConsumerFactory
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            
     //設置消費者ack模式為手動,看需求設置  
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }

    spring boot整合kafka報錯

    Timeout expired while fetching topic metadata

    這種報錯應檢查 kafka連接問題,服務是否啟動,端口是否正確。

    Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

    這種報錯要考慮kafka和spring對應的版本問題,我的springboot 2.1.2在使用kafka_2.12-2.1.1時出現此問題,將kafka版本換為2.11-1.1.1后,問題解決。

    感謝各位的閱讀!關于“springboot自動配置方式有哪些”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    巩义市| 内丘县| 大同县| 承德县| 惠州市| 临朐县| 龙山县| 菏泽市| 丰县| 铜山县| 昭苏县| 怀集县| 布尔津县| 英山县| 霍林郭勒市| 准格尔旗| 峨山| 三门峡市| 青铜峡市| 肇源县| 建阳市| 蒲江县| 保山市| 成都市| 甘洛县| 浪卡子县| 常德市| 大足县| 遂溪县| 尉氏县| 乌拉特前旗| 修武县| 肇源县| 尼勒克县| 湟中县| 南陵县| 筠连县| 措美县| 肇东市| 万荣县| 北碚区|