中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka Network層解析,還是有人把它說清楚了

發布時間:2020-08-13 00:33:41 來源:網絡 閱讀:496 作者:Java筆記丶 欄目:編程語言

我們知道kafka是基于TCP連接的。其并沒有像很多中間件使用netty作為TCP服務器。而是自己基于Java NIO寫了一套。

幾個重要類

先看下Kafka Client的網絡層架構。


Kafka Network層解析,還是有人把它說清楚了


本文主要分析的是Network層。

Network層有兩個重要的類:SelectorKafkaChannel

這兩個類和Java NIO層的java.nio.channels.SelectorChannel有點類似。

Selector幾個關鍵字段如下

//?jdk?nio中的Selector
java.nio.channels.Selector?nioSelector;
//?記錄當前Selector的所有連接信息
Map<String,?KafkaChannel>?channels;
//?已發送完成的請求
List<Send>?completedSends;
//?已收到的請求
List<NetworkReceive>?completedReceives;
//?還沒有完全收到的請求,對上層不可見
Map<KafkaChannel,?Deque<NetworkReceive>>?stagedReceives;
//?作為client端,調用connect連接遠端時返回true的連接
Set<SelectionKey>?immediatelyConnectedKeys;
//?已經完成的連接
List<String>?connected;
//?一次讀取的最大大小
int?maxReceiveSize;

從網絡層來看kafka是分為client端(producer和consumer,broker作為從時也是client)和server端(broker)的。本文將分析client端是如何建立連接,以及收發數據的。server也是依靠SelectorKafkaChannel進行網絡傳輸。在Network層兩端的區別并不大。

建立連接

kafka的client端啟動時會調用Selector#connect(下文中如無特殊注明,均指org.apache.kafka.common.network.Selector)方法建立連接。

public?void?connect(String?id,?InetSocketAddress?address,?int?sendBufferSize,?int?receiveBufferSize)?throws?IOException?{
????if?(this.channels.containsKey(id))
????????throw?new?IllegalStateException("There?is?already?a?connection?for?id?"?+?id);
????//?創建一個SocketChannel
????SocketChannel?socketChannel?=?SocketChannel.open();
????//?設置為非阻塞模式
????socketChannel.configureBlocking(false);
????//?創建socket并設置相關屬性
????Socket?socket?=?socketChannel.socket();
????socket.setKeepAlive(true);
????if?(sendBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setSendBufferSize(sendBufferSize);
????if?(receiveBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setReceiveBufferSize(receiveBufferSize);
????socket.setTcpNoDelay(true);
????boolean?connected;
????try?{
????????//?調用SocketChannel的connect方法,該方法會向遠端發起tcp建連請求
????????//?因為是非阻塞的,所以該方法返回時,連接不一定已經建立好(即完成3次握手)。連接如果已經建立好則返回true,否則返回false。一般來說server和client在一臺機器上,該方法可能返回true。
????????connected?=?socketChannel.connect(address);
????}?catch?(UnresolvedAddressException?e)?{
????????socketChannel.close();
????????throw?new?IOException("Can't?resolve?address:?"?+?address,?e);
????}?catch?(IOException?e)?{
????????socketChannel.close();
????????throw?e;
????}
????//?對CONNECT事件進行注冊
????SelectionKey?key?=?socketChannel.register(nioSelector,?SelectionKey.OP_CONNECT);
????KafkaChannel?channel;
????try?{
????????//?構造一個KafkaChannel
????????channel?=?channelBuilder.buildChannel(id,?key,?maxReceiveSize);
????}?catch?(Exception?e)?{
??????...
????}
????//?將kafkachannel綁定到SelectionKey上
????key.attach(channel);
????//?放入到map中,id是遠端服務器的名稱
????this.channels.put(id,?channel);
????//?connectct為true代表該連接不會再觸發CONNECT事件,所以這里要單獨處理
????if?(connected)?{
????????//?OP_CONNECT?won't?trigger?for?immediately?connected?channels
????????log.debug("Immediately?connected?to?node?{}",?channel.id());
????????//?加入到一個單獨的集合中
????????immediatelyConnectedKeys.add(key);
????????//?取消對該連接的CONNECT事件的監聽
????????key.interestOps(0);
????}
}

這里的流程和標準的NIO流程差不多,需要單獨說下的是socketChannel#connect方法返回true的場景,該方法的注釋中有提到

*?<p>?If?this?channel?is?in?non-blocking?mode?then?an?invocation?of?this
*?method?initiates?a?non-blocking?connection?operation.??If?the?connection
*?is?established?immediately,?as?can?happen?with?a?local?connection,?then
*?this?method?returns?<tt>true</tt>.??Otherwise?this?method?returns
*?<tt>false</tt>?and?the?connection?operation?must?later?be?completed?by
*?invoking?the?{@link?#finishConnect?finishConnect}?method.

也就是說在非阻塞模式下,對于local connection,連接可能在馬上就建立好了,那該方法會返回true,對于這種情況,不會再觸發之后的connect事件。因此kafka用一個單獨的集合immediatelyConnectedKeys將這些特殊的連接記錄下來。在接下來的步驟會進行特殊處理。

之后會調用poll方法對網絡事件監聽:

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進行處理,第2個參數為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進行處理。第2個參數為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????//?如果當前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件
????????if?(isImmediatelyConnected?||?key.isConnectable())?{
????????????//?finishconnect中會增加READ事件的監聽
????????????if?(channel.finishConnect())?{
????????????????this.connected.add(channel.id());
????????????????this.sensors.connectionCreated.record();
????????????????...
????????????}?else
????????????????continue;
????????}

????????//?對于ssl的連接還有些額外的步驟
????????if?(channel.isConnected()?&&?!channel.ready())
????????????channel.prepare();

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????addToStagedReceives(channel,?networkReceive);
????????}

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????Send?send?=?channel.write();
????????????if?(send?!=?null)?{
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}

????????//?如果連接失效
????????if?(!key.isValid())
????????????close(channel,?true);

????}?catch?(Exception?e)?{
????????String?desc?=?channel.socketDescription();
????????if?(e?instanceof?IOException)
????????????log.debug("Connection?with?{}?disconnected",?desc,?e);
????????else
????????????log.warn("Unexpected?error?from?{};?closing?connection",?desc,?e);
????????close(channel,?true);
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

因為immediatelyConnectedKeys中的連接不會觸發CONNNECT事件,所以在poll時會單獨對immediatelyConnectedKeys的channel調用finishConnect方法。在明文傳輸模式下該方法會調用到PlaintextTransportLayer#finishConnect,其實現如下:

public?boolean?finishConnect()?throws?IOException?{
????//?返回true代表已經連接好了
????boolean?connected?=?socketChannel.finishConnect();
????if?(connected)
????????//?取消監聽CONNECt事件,增加READ事件的監聽
????????key.interestOps(key.interestOps()?&?~SelectionKey.OP_CONNECT?|?SelectionKey.OP_READ);
????return?connected;
}

關于immediatelyConnectedKeys更詳細的內容可以看看這里。

發送數據

kafka發送數據分為兩個步驟:

1.調用Selector#send將要發送的數據保存在對應的KafkaChannel中,該方法并沒有進行真正的網絡IO

//?Selector#send
public?void?send(Send?send)?{
????String?connectionId?=?send.destination();
????//?如果所在的連接正在關閉中,則加入到失敗集合failedSends中
????if?(closingChannels.containsKey(connectionId))
????????this.failedSends.add(connectionId);
????else?{
????????KafkaChannel?channel?=?channelOrFail(connectionId,?false);
????????try?{
????????????channel.setSend(send);
????????}?catch?(CancelledKeyException?e)?{
????????????this.failedSends.add(connectionId);
????????????close(channel,?false);
????????}
????}
}

//KafkaChannel#setSend
public?void?setSend(Send?send)?{
????//?如果還有數據沒有發送出去則報錯
????if?(this.send?!=?null)
????????throw?new?IllegalStateException("Attempt?to?begin?a?send?operation?with?prior?send?operation?still?in?progress.");
????//?保存下來
????this.send?=?send;
????//?添加對WRITE事件的監聽
????this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
  1. 調用Selector#poll,在第一步中已經對該channel注冊了WRITE事件的監聽,所以在當channel可寫時,會調用到pollSelectionKeys將數據真正的發送出去。

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????//?真正的網絡寫
????????????Send?send?=?channel.write();
????????????//?一個Send對象可能會被拆成幾次發送,write非空代表一個send發送完成
????????????if?(send?!=?null)?{
????????????????//?completedSends代表已發送完成的集合
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

當可寫時,會調用KafkaChannel#write方法,該方法中會進行真正的網絡IO:

public?Send?write()?throws?IOException?{
????Send?result?=?null;
????if?(send?!=?null?&&?send(send))?{
????????result?=?send;
????????send?=?null;
????}
????return?result;
}
private?boolean?send(Send?send)?throws?IOException?{
????//?最終調用SocketChannel#write進行真正的寫
????send.writeTo(transportLayer);
????if?(send.completed())
????????//?如果寫完了,則移除對WRITE事件的監聽
????????transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

????return?send.completed();
}

接收數據

如果遠端有發送數據過來,那調用poll方法時,會對接收到的數據進行處理。

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進行處理,第2個參數為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進行處理。第2個參數為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????//?read方法會從網絡中讀取數據,但可能一次只能讀取一個req的部分數據。只有讀到一個完整的req的情況下,該方法才返回非null
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????//?將讀到的請求存在stagedReceives中
????????????????addToStagedReceives(channel,?networkReceive);
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

private?void?addToStagedReceives(KafkaChannel?channel,?NetworkReceive?receive)?{
????if?(!stagedReceives.containsKey(channel))
????????stagedReceives.put(channel,?new?ArrayDeque<NetworkReceive>());

????Deque<NetworkReceive>?deque?=?stagedReceives.get(channel);
????deque.add(receive);
}

在之后的addToCompletedReceives方法中會對該集合進行處理。

private?void?addToCompletedReceives()?{
????if?(!this.stagedReceives.isEmpty())?{
????????Iterator<Map.Entry<KafkaChannel,?Deque<NetworkReceive>>>?iter?=?this.stagedReceives.entrySet().iterator();
????????while?(iter.hasNext())?{
????????????Map.Entry<KafkaChannel,?Deque<NetworkReceive>>?entry?=?iter.next();
????????????KafkaChannel?channel?=?entry.getKey();
????????????//?對于client端來說該isMute返回為false,server端則依靠該方法保證消息的順序
????????????if?(!channel.isMute())?{
????????????????Deque<NetworkReceive>?deque?=?entry.getValue();
????????????????addToCompletedReceives(channel,?deque);
????????????????if?(deque.isEmpty())
????????????????????iter.remove();
????????????}
????????}
????}
}
private?void?addToCompletedReceives(KafkaChannel?channel,?Deque<NetworkReceive>?stagedDeque)?{
????//?將每個channel的第一個NetworkReceive加入到completedReceives
????NetworkReceive?networkReceive?=?stagedDeque.poll();
????this.completedReceives.add(networkReceive);
????this.sensors.recordBytesReceived(channel.id(),?networkReceive.payload().limit());
}

讀出數據后,會先放到stagedReceives集合中,然后在addToCompletedReceives方法中對于每個channel都會從stagedReceives取出一個NetworkReceive(如果有的話),放入到completedReceives中。

這樣做的原因有兩點:

  1. 對于SSL的連接來說,其數據內容是加密的,所以不能精準的確定本次需要讀取的數據大小,只能盡可能的多讀,這樣會導致可能會比請求的數據讀的要多。那如果該channel之后沒有數據可以讀,會導致多讀的數據將不會被處理。

  2. kafka需要確保一個channel上request被處理的順序是其發送的順序。因此對于每個channel而言,每次poll上層最多只能看見一個請求,當該請求處理完成之后,再處理其他的請求。在sever端,每次poll后都會將該channel給mute掉,即不再從該channel上讀取數據。當處理完成之后,才將該channelunmute,即之后可以從該socket上讀取數據。而client端則是通過InFlightRequests#canSendMore控制。

代碼中關于這段邏輯的注釋如下:

/*?In?the?"Plaintext"?setting,?we?are?using?socketChannel?to?read?&?write?to?the?network.?But?for?the?"SSL"?setting,
*?we?encrypt?the?data?before?we?use?socketChannel?to?write?data?to?the?network,?and?decrypt?before?we?return?the?responses.
*?This?requires?additional?buffers?to?be?maintained?as?we?are?reading?from?network,?since?the?data?on?the?wire?is?encrypted
*?we?won't?be?able?to?read?exact?no.of?bytes?as?kafka?protocol?requires.?We?read?as?many?bytes?as?we?can,?up?to?SSLEngine's
*?application?buffer?size.?This?means?we?might?be?reading?additional?bytes?than?the?requested?size.
*?If?there?is?no?further?data?to?read?from?socketChannel?selector?won't?invoke?that?channel?and?we've?have?additional?bytes
*?in?the?buffer.?To?overcome?this?issue?we?added?"stagedReceives"?map?which?contains?per-channel?deque.?When?we?are
*?reading?a?channel?we?read?as?many?responses?as?we?can?and?store?them?into?"stagedReceives"?and?pop?one?response?during
*?the?poll?to?add?the?completedReceives.?If?there?are?any?active?channels?in?the?"stagedReceives"?we?set?"timeout"?to?0
*?and?pop?response?and?add?to?the?completedReceives.

*?Atmost?one?entry?is?added?to?"completedReceives"?for?a?channel?in?each?poll.?This?is?necessary?to?guarantee?that
?????*?requests?from?a?channel?are?processed?on?the?broker?in?the?order?they?are?sent.?Since?outstanding?requests?added
?????*?by?SocketServer?to?the?request?queue?may?be?processed?by?different?request?handler?threads,?requests?on?each
?????*?channel?must?be?processed?one-at-a-time?to?guarantee?ordering.
*/

End

本文分析了kafka network層的實現,在閱讀kafka源碼時,如果不把network層搞清楚會比較迷,比如req/resp的順序保障機制、真正進行網絡IO的不是send方法等等。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

博湖县| 麻江县| 封开县| 会泽县| 江孜县| 久治县| 玛沁县| 大渡口区| 邵阳市| 临猗县| 米易县| 梨树县| 昌宁县| 永清县| 开平市| 连平县| 阳原县| 镇赉县| 西城区| 商河县| 澄江县| 宜宾市| 青铜峡市| 凤台县| 兴仁县| 自治县| 合川市| 加查县| 台山市| 汉寿县| 崇明县| 临西县| 策勒县| 怀化市| 海林市| 乐东| 万州区| 关岭| 靖远县| 綦江县| 申扎县|