中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java分布式流處理組件Producer怎么使用

發布時間:2023-03-07 11:27:50 來源:億速云 閱讀:112 作者:iii 欄目:開發技術

這篇文章主要講解了“java分布式流處理組件Producer怎么使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“java分布式流處理組件Producer怎么使用”吧!

    基于Java的API

    首先, 在了解生產者發送消息的原理之前,我們應該先學會如何去發送消息。

    Kafka為我們提供了很多項目可以操作的API客戶端,包括:

    • C/C++

    • GO

    • Python

    • ...

    通過官網查看API菜單,官方文檔上也是Java的版本。我們根據提示一步步操作即可~

    先新建maven項目,并且引入對應的****kafka-clients依賴

    建議:Kafka-clients依賴版本,最好和安裝的kafka版本一致

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.3.1</version>
    </dependency>

    同步發送

    Kafka生產者主要靠KafkaProducer來進行操作。點擊到對應的文檔頁面,我們可以看到關于KafkaProducer<K,V> 的詳細信息。

    一個好的組件是非常貼心的, 甚至我們都不用去網上搜任何相關的資料,只需要通過查看對應的注釋就可以知道這個東西該怎么用。

    Properties config = new Properties();
    // --bootstrap-server
    config.setProperty(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "master:9092,node01:9092,node02:9092"
    );
    // key 序列化器
    config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    // value 序列化器
    config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    try(Producer<String, String> producer = new KafkaProducer<>(config)) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
                "newTopic001",
                "key01",
                "data from " + KafkaQuickProducer.class.getName()
        ); 
        RecordMetadata recordMetadata = producer.send(record).get();
        System.out.println(
                MessageFormat.format("{0}\t{1}\t{2}\t{3}", 
                        recordMetadata.topic(), 
                        recordMetadata.partition(),
                        recordMetadata.offset(), 
                        recordMetadata.timestamp()
                )
        );
    } catch (Exception e) {
        e.printStackTrace();
    }

    以上代碼就是同步發送的過程,這已經是在開發過程中需要配置的最小單元,而其他關于生產者的配置,我們可以通過ProducerConfig來進行查看

    ** 與命令行上的參數,基本上是一模一樣的**

    而關于序列化器的問題,我們在下面原理的部分說明

    異步發送

    我們在調用同步send的時候,發現有兩個參數的方法, 而這個方法實現的就是****異步發送

    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

    異步發送會將發送結果以事件驅動的形式傳遞,那么這里,我們就需要注意一點:

    • 程序調用完成之后,不能讓他立即執行,否則我們無法查看到具體的發送結果

    接下來我們看具體的程序實現。理論上:我們只需要改最后發送的部分

    Properties config = new Properties();
    // --bootstrap-server
    config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");
    // key 序列化器
    config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    // value 序列化器
    config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    try(Producer<String, String> producer = new KafkaProducer<>(config)) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
                "newTopic001",
                "key01",
                "data from " + KafkaQuickProducer.class.getName()
        );
        async(producer, record);
    } catch (Exception e) {
        e.printStackTrace();
    }
    // 異步發送
    private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record, (recordMetadata, exception) -> {
            if (null != exception) {
                exception.printStackTrace();
                return;
            }
            System.out.println(
                    MessageFormat.format("{0}\t{1}\t{2}\t{3}",
                            recordMetadata.topic(),
                            recordMetadata.partition(),
                            recordMetadata.offset(),
                            recordMetadata.timestamp()
                    )
            );
        });
        try {
            // 將程序進行阻塞,防止由于消息發送成功之后進程停止而無法接收到事件反饋
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    這屬于整個生產者發送消息方式的最小單元,本文屬于Producer入門階段。

    在ProducerConfig中還包含了非常多的配置項,更多的配置信息我們會在優化章節中說明。

    原理

    java分布式流處理組件Producer怎么使用

    在第一部分,我們已經了解到,關于生產者最基本的使用方式,到這里,其實我想跟大家聊一聊:

    • 生產者在發送消息的時候中間到底經歷了什么?

    大家應該已經看到上面的那張原理圖,我們可以從中找出答案!

    主線程

    **這里我們分為兩個線程塊來說明, 第一部分是Main主線程, 也就是生產者在調用****send()**方法時所在的線程

    在這里,我們可以看到:

    • 外部數據首先被封裝為ProducerRecord**,然后調用**send()**方法。

    • 在send()過程中,經過攔截器、序列化器、分區器等處理之后進入到RecordAccumulator中。

    接下來我們仔細聊一聊攔截器、序列化器、分區器的作用

    攔截器

    攔截器很類似于我們在SpringMVC中Interceptor的功能,而且在Producer中我們是可以自定義攔截器的。

    我們可以在發送之前對數據進行攔截處理,比如說:統計生產者發送數據的總量等等。

    當然目前來講,我們如果不開發Kafka監控平臺的話,這里攔截器的用處并不大。我們忽略不計即可

    后續如果有機會的話,我們可以專門寫篇文章,用來介紹如何開發一個攔截器

    序列化器

    而序列化器,主要對兩個部分的數據進行處理:

    • Key

    • Value

    byte[] serializedKey 
      = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    byte[] serializedValue
      = valueSerializer.serialize(record.topic(), record.headers(), record.value());

    從本質上來講,外部數據屬于屬于對象,而對象不能直接通過網絡進行傳輸。 所以我們就需要一個序列化器,將它轉換成字節數組,進而進行傳輸

    java分布式流處理組件Producer怎么使用

    Kafka本身為我們提供了很多可用的序列化器,不過我們能用到最多的還是StringSerializer。

    在生產端將消息進行序列話,那么在消費端必然會進行反序列化操作

    分區器

    我們知道Kafka是以Topic為消息發送的主體,不過由于Topic是一個虛擬的概念, 所以我們沒有辦法在實際中查看到關于Topic的相關信息。 但是前面我們也說過, 當前Topic下的消息數據都是通過Partition進行存儲的。

    發送出去的消息需要存儲在哪個分區中就是通過分區器來進行指定的,在我們沒有指定分區策略的情況下,生產者會通過默認的分區策略指定當前消息應該存儲在哪個分區下

    java分布式流處理組件Producer怎么使用

    分區的內容還是比較多的,我們會在下一節做詳細的說明

    RecordAccumulator

    此時,在主線程的區域中,當消息進入到默認大小為32m的記錄緩沖區時, 本區的工作就到此結束。

    緩沖區中有多個雙端隊列,分別對應Topic不同的分區。每一個分區就會創建一個雙端隊列。

    此時的消息將會被按照批次的方式存放在隊列中, 默認一批為16k大小。當緩沖區達到指定條件之后,****sender線程將會被喚醒,Sender程序將會沖隊列中不斷拉出消息進行下一步的發送

    Sender線程

    影響Sender線程喚醒的條件

    想要喚醒Sender線程有兩個因素,但不是說這兩個條件都必須滿足,他們是或的關系。

    batch.size是一個條件,這也是后期針對生產者優化的主要參數之一。

    當發送消息之后,生產者會將消息進行整合。將其按照一批一批的方式發送給Broker,從而減少網絡間的傳輸請求次數。默認情況下為16k。

    而如果一批數據的大小累計達到了設置的batch.size之后,sender才會做發送數據的操作

    這是第一個限制

    下面再來介紹一個非常強勢的參數:liner.ms。生產者優化的主要參數之二。

    這么說吧,如果你設置的liner.ms=0,表示不延遲直接發送。那么batch.size就不會生效了

    而liner.ms=0屬于默認配置

    如果數據一直沒有達到設置的batch.size大小,數據也不能不發對吧。所以Kafka也就為我們提供了這樣的參數:

    • 當sender等待liner.ms設置的時間之后【單位ms】,不管數據如何都會將消息進行發送

    • 如未設置當前參數,表示沒有延遲,直接發送

    下面舉個小例子

    config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");

    java分布式流處理組件Producer怎么使用

    開始發送

    RecordAccumulator內存儲的數據拉取出來之后,開始將其創建為一個個的Request請求。這里需要注意的是:

    • NetworkClient并非一股腦的將全部可發送數據進行傳輸請求

    正相反,為了能夠保證不同分區所對應DQueue的數據進入到對應的Broker所在的分區內,Kafka將按照<BrokerId, Request>的形式對請求進行傳輸。如果傳輸到達Broker之后沒有acks應答,那么當前節點下最多能夠保存5個未響應的請求。

    ACKS

    這里簡單聊一下它的應答方式。在ProducerConfig.ACKS_DOC下我們也可以看到相關的說明:

    • acks=0: 生產者不會等待Broker的應答,直接表示消息已經發送成功。而消息有沒有真正達到Broker,不關心。

    當然了,這種方式在性能上來講是最好的,適合一些數據不重要的場景

    • acks=1: 生產者將消息發送到Broker之后,由Leader在本地將消息進行存儲之后,返回發送成功的應答。

    如果Follower還沒有同步到消息,Leader就已經掛了。那么此時就會出現消息丟失的情況

    • acks=all:生產者將消息發送到Broker之后,由Leader在本地將消息進行存儲,并且Follower同步完消息之后才會返回發送成功的應答。

    這種方式是最能保證數據安全的情況,但是性能也是最低的~

    最后:

    • 當Broker返回成功應答之后,RecordAccumulator中的數據將會被清理

    • 如果失敗,可以嘗試重試等操作

    感謝各位的閱讀,以上就是“java分布式流處理組件Producer怎么使用”的內容了,經過本文的學習后,相信大家對java分布式流處理組件Producer怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    扎兰屯市| 兴宁市| 恩平市| 云浮市| 乌兰浩特市| 郓城县| 鹤峰县| 岱山县| 常山县| 灵石县| 剑川县| 老河口市| 潍坊市| 大同县| 丹东市| 昭苏县| 淮滨县| 长白| 平江县| 旬阳县| 奉化市| 桃江县| 清镇市| 德令哈市| 余干县| 信宜市| 蓝田县| 兴隆县| 宿松县| 固始县| 凤阳县| 宜丰县| 湘阴县| 来凤县| 开封县| 郯城县| 师宗县| 昌江| 芮城县| 白银市| 嘉义市|