您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何使用KafkaAPI-ProducerAPI”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用KafkaAPI-ProducerAPI”吧!
Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。main 線程將消息發送給 RecordAccumulator, Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。
相關參數:
batch.size: 只有數據積累到 batch.size 之后, sender 才會發送數據。
linger.ms: 如果數據遲遲未達到 batch.size, sender 等待 linger.time 之后就會發送數據。
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.0</version> </dependency>
需要用到的類:
KafkaProducer:需要創建一個生產者對象,用來發送數據
ProducerConfig:獲取所需的一系列配置參數
ProducerRecord:每條數據都要封裝成一個 ProducerRecord 對象
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MyProducer { public static void main(String[] args) { //生產者配置信息可以從ProducerConfig中取Key //1.創建kafka生產者的配置信息 Properties properties=new Properties(); //2.指定連接的kafka集群 //properties.put("bootstrap.servers","192.168.1.106:9091"); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091"); //3.ACK應答級別 //properties.put("acks","all"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); //4.重試次數 //properties.put("retries",3); properties.put(ProducerConfig.RETRIES_CONFIG,3); //5.批次大小 16k //properties.put("batch.size",16384); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //6.等待時間 //properties.put("linger.ms",1); properties.put(ProducerConfig.LINGER_MS_CONFIG,1); //7.RecordAccumulator 緩沖區大小 32M properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //properties.put("buffer.memory",33554432); //8.Key,Value 的序列化類 //properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //9.創建生產者對象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); //10.發送數據 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i); producer.send(producerRecord); } //11.關閉資源 producer.close(); } }
回調函數會在 producer 收到 ack 時調用,為異步調用, 該方法有兩個參數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CallBackProducer { public static void main(String[] args) { //生產者配置信息可以從ProducerConfig中取Key Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //創建生產者對象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); /*創建topic /opt/kafka/kafka03/bin/kafka-topics.sh --create --zookeeper 192.168.1.106:2181,192.168.1.106:2182,192.168.1.106:2183 --replication-factor 3 --partitions 2 --topic aaa * */ //發送數據 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i); producer.send(producerRecord, (recordMetadata, e) -> { if (e==null){ System.out.println("aaa "+recordMetadata.partition()+ "--"+recordMetadata.offset()); }else { e.printStackTrace(); } }); } //11.關閉資源 producer.close(); } }
同步發送的意思就是,一條消息發送之后,會阻塞當前線程, 直至返回 ack。由于 send 方法返回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實現同步發送的效果,只需在調用 Future 對象的 get 方發即可、
//10.發送數據 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i); producer.send(producerRecord).get(); }
默認分區策略源碼:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /*自定義分區規則*/ List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic); Integer integer =partitionInfos.size(); return key.toString().hashCode()%integer; /*指定分區*/ /* return 1;*/ } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
//配置方法 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");
完整代碼:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class PartitionProducer { public static void main(String[] args) { //生產者配置信息可以從ProducerConfig中取Key Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //配置分區器的全類名 partitioner.class properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner"); //創建生產者對象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); //發送數據 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i); producer.send(producerRecord, (recordMetadata, e) -> { if (e==null){ System.out.println(recordMetadata.topic()+"--"+ recordMetadata.partition()+ "--"+recordMetadata.offset()); }else { e.printStackTrace(); } }); } //11.關閉資源 producer.close(); } }
感謝各位的閱讀,以上就是“如何使用KafkaAPI-ProducerAPI”的內容了,經過本文的學習后,相信大家對如何使用KafkaAPI-ProducerAPI這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。