中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

KAFKA是如何處理粘包拆包的

發布時間:2021-11-22 09:54:25 來源:億速云 閱讀:237 作者:iii 欄目:云計算

本篇內容主要講解“KAFKA是如何處理粘包拆包的”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“KAFKA是如何處理粘包拆包的”吧!

一、為什么會出現粘包拆包現象?

我們知道,TCP數據包都是按照協議進行拆包、編號然后分批發送的;
那么對應我們應用層有意義的數據包,傳輸層的協議并不了解其含義,更不會去根據你的業務內容去分包和發送,只會按照自己的協議棧去進行數據發送。
因此,就出現了網絡數據的粘包,拆包問題。
究其本質,其實就是傳輸層并不了解上層應用的數據含義,只會按照協議棧進行數據發送。


二、通常有哪些解決粘包拆包問題的方法?

在了解出現這個問題的本質后,那么要想解決這個問題就很簡單了。
不就是在進行數據接收的時候,我們應用層收到數據后根據標識判斷一下,數據是否完整,如果完整了我們再進行數據包解析,最后交給業務代碼不就好了?
通常解決粘包拆包的問題有三種方案:

  1. 定長,例如我保證我每一條數據都是200b,那么我每接收到200b就認為是一條完整的數據,接著就可以進行解析,并向業務代碼交付。

  2. 分隔符,一樣的意思,我每條數據末尾都用一個分隔符例如換行符,制表符這種來標識這條數據寫完了,那么我們收到數據判找一下這個分割符在哪兒,最后進行切割就可以得到完整的數據包了。

  3. 自定義協議,這個也很簡單,就是定義一個你的完整數據包的內容格式是什么樣子的,例如 len + data,其中len是代表data的字節長度。這樣每次根據前面4個字節的len,就能得到后面還需要多少數據才是一條完整的數據,少了就等,多了就截取。

最后,可能很多不熟悉網絡編程的同學會納悶,那萬一TCP的數據包丟失了,亂序了,上面這種方法不就出問題了嘛?
其實不是的,TCP一個可靠的消息傳輸協議,其協議的根本思想就是提供可靠的數據傳輸服務。
翻譯一下就是,你可以相信TCP傳輸的數據是可靠的,在交付給應用層數據的時候,是不會出現上述這種情況的。
出現這種情況只會在傳輸層出現,而TCP協議也為對應的情況設計了分批、編號、去重、校驗和、超時重傳等一系列的操作,來保證數據可靠。


三、kakfa是如何解決粘包拆包問題的呢?

最后,讓我們來看下kafka是如何解決粘包拆包問題的呢?是以上面提到的哪種方式來解決的呢?
首先看粘包,也就是接收到了多余的數據,該如何拆分數據包,讀取到正確完整的數據包?
如下面代碼所示,分為三個階段:

  1. 先讀取前4字節,轉換為一個int,即長度。

  2. 根據長度申請內存buffer。

  3. 最后讀取指定大小的數據到申請好的buffer

由此,就完整了一整條數據的正確讀取。整個過程其實就是上面提到的 len+data 這么一個簡單的自定義協議

public NetworkReceive read() throws IOException {
    NetworkReceive result = null;    // 新建一個receive    if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool);    }    // 真正的數據read    receive(receive);    // 數據讀取完成的后置操作    if (receive.complete()) {        // 倒帶,等待讀receive.payload().rewind();        // 直接引用賦值        result = receive;        // 最后清空當前引用,然后等待下次進入read的時候,執行new 操作        receive = null;    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves.        mute();    }return result;}
public long readFrom(ScatteringByteChannel channel) throws IOException {int read = 0;
    // 存在數據if (size.hasRemaining()) {        // len + dataint bytesRead = channel.read(size);        if (bytesRead < 0)throw new EOFException();        read += bytesRead;
        // 如果讀滿了長度,則直接倒帶得到具體的len值
        // 這里的size是一個byteBuffer類型的,也就是接收到的數據        if (!size.hasRemaining()) {size.rewind();            int receiveSize = size.getInt();            if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");            if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");            requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)            if (receiveSize == 0) {buffer = EMPTY_BUFFER;            }
        }
    }    // 如果長度已經就緒了,那么就需要接下來的data需要多少空間,在這里進行申請if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet        buffer = memoryPool.tryAllocate(requestedBufferSize);        if (buffer == null)log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);    }
    // 申請完畢之后,就調用read函數,直接read出來即可。if (buffer != null) {        int bytesRead = channel.read(buffer);        if (bytesRead < 0)throw new EOFException();        read += bytesRead;    }    // 返回讀取的總字節數return read;}

再先看拆包,也就是接收到數據不夠組成一條完整的數據,該如何等待完整的數據包?
下面代碼最核心的就是receive.complete()函數的判斷邏輯,這個判斷的三個條件分別意味著:

  • !size.hasRemaining():接收到的buffer數據已經讀取完成。

  • buffer != null:buffer已經創建。

  • !buffer.hasRemaining():buffer已經讀取完成。

翻譯一下,其實就是只要一條數據沒讀完整,那么receive.complete()函數返回值就是false,那么最終返回的結果就是null,等待下一次OP_READ事件的時候再接著上次沒讀完的數據讀取,直到讀取一條完整的數據為止。

public NetworkReceive read() throws IOException {
    NetworkReceive result = null;    if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool);    }

    receive(receive);    if (receive.complete()) {receive.payload().rewind();        result = receive;        receive = null;    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves.        mute();    }return result;}
public boolean complete() {    return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

最后,我們再補充一點,當我們一次性收到很多條數據的時候,會如何處理呢?
下面的源碼告訴了我們答案,就是一次性全部讀取出來,然后存入stageReceives這個數據結構中等待下一步業務處理。

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no    //previous receive(s) already staged or otherwise in progress then read from it    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
        NetworkReceive networkReceive;        // 一次性讀取所有的receives,暫存到stageReceives中        while ((networkReceive = channel.read()) != null) {            madeReadProgressLastPoll = true;            addToStagedReceives(channel, networkReceive);        }// isMute是判斷當前channel是否關注了OP_READ事件        if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure.        } else {madeReadProgressLastPoll = true;        }
    }
}

到此,相信大家對“KAFKA是如何處理粘包拆包的”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

绥德县| 屏南县| 广西| 司法| 南投县| 长寿区| 如皋市| 汶川县| 兴城市| 道孚县| 承德县| 雷波县| 苗栗市| 龙泉市| 灵山县| 龙陵县| 瑞丽市| 宾川县| 崇礼县| 鸡泽县| 松溪县| 玉环县| 旌德县| 江油市| 淅川县| 绥棱县| 花莲县| 凤翔县| 浠水县| 白城市| 余姚市| 三河市| 德江县| 会昌县| 宁强县| 德兴市| 永登县| 虹口区| 东安县| 仲巴县| 荃湾区|