中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用KafkaAPI-ProducerAPI

發布時間:2021-10-13 14:24:20 來源:億速云 閱讀:127 作者:iii 欄目:編程語言

這篇文章主要講解了“如何使用KafkaAPI-ProducerAPI”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用KafkaAPI-ProducerAPI”吧!

1.消息發送流程

    Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。main 線程將消息發送給 RecordAccumulator, Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

如何使用KafkaAPI-ProducerAPI

相關參數:
batch.size: 只有數據積累到 batch.size 之后, sender 才會發送數據。
linger.ms: 如果數據遲遲未達到 batch.size, sender 等待 linger.time 之后就會發送數據。

2.異步發送API

1)導入依賴
 

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

2)編寫代碼

需要用到的類:
KafkaProducer:需要創建一個生產者對象,用來發送數據
ProducerConfig:獲取所需的一系列配置參數
ProducerRecord:每條數據都要封裝成一個 ProducerRecord 對象

2.1). 不帶回調函數的API
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        //生產者配置信息可以從ProducerConfig中取Key
         //1.創建kafka生產者的配置信息
        Properties properties=new Properties();
        //2.指定連接的kafka集群
        //properties.put("bootstrap.servers","192.168.1.106:9091");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091");
        //3.ACK應答級別
        //properties.put("acks","all");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        //4.重試次數
        //properties.put("retries",3);
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        //5.批次大小 16k
        //properties.put("batch.size",16384);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //6.等待時間
        //properties.put("linger.ms",1);
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //7.RecordAccumulator 緩沖區大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //properties.put("buffer.memory",33554432);
        //8.Key,Value 的序列化類
        //properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //9.創建生產者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        //10.發送數據
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord);
        }
        //11.關閉資源
        producer.close();
    }
}
2.2) 帶回調函數的API

    回調函數會在 producer 收到 ack 時調用,為異步調用, 該方法有兩個參數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。
    注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CallBackProducer {
    public static void main(String[] args) {
        //生產者配置信息可以從ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //創建生產者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        /*創建topic
        /opt/kafka/kafka03/bin/kafka-topics.sh  --create --zookeeper 192.168.1.106:2181,192.168.1.106:2182,192.168.1.106:2183  --replication-factor 3  --partitions 2 --topic aaa
        * */

        //發送數據
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println("aaa  "+recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.關閉資源
        producer.close();
    }
}

3.    同步發送API

    同步發送的意思就是,一條消息發送之后,會阻塞當前線程, 直至返回 ack。由于 send 方法返回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實現同步發送的效果,只需在調用 Future 對象的 get 方發即可、

        //10.發送數據
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord).get();
        }

4.自定義分區器

默認分區策略源碼:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

1.1.    自定義分區器代碼:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*自定義分區規則*/
        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
        Integer integer =partitionInfos.size();
        return key.toString().hashCode()%integer;
        /*指定分區*/
       /* return 1;*/
    }
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

1.2.    生產者使用自定義分區器

//配置方法
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");

完整代碼:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    public static void main(String[] args) {
        //生產者配置信息可以從ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //配置分區器的全類名 partitioner.class
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");
        //創建生產者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //發送數據
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println(recordMetadata.topic()+"--"+ recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.關閉資源
        producer.close();
    }
}

感謝各位的閱讀,以上就是“如何使用KafkaAPI-ProducerAPI”的內容了,經過本文的學習后,相信大家對如何使用KafkaAPI-ProducerAPI這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

华安县| 井冈山市| 安丘市| 曲靖市| 田东县| 麻城市| 枝江市| 奈曼旗| 大理市| 阿坝县| 福清市| 涞源县| 孝昌县| 泾阳县| 无锡市| 湖南省| 隆林| 禹城市| 达日县| 繁峙县| 沂南县| 黔江区| 城口县| 防城港市| 探索| 塔城市| 银川市| 乌鲁木齐市| 阿巴嘎旗| 闽清县| 离岛区| 皋兰县| 武乡县| 丹凤县| 济宁市| 乌兰浩特市| 滁州市| 黑山县| 綦江县| 唐海县| 阳谷县|