您好,登錄后才能下訂單哦!
這篇文章主要介紹了Netty分布式客戶端接入流程是什么的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Netty分布式客戶端接入流程是什么文章都會有所收獲,下面我們一起來看看吧。
在剖析接入流程之前我們首先補充下第一章有關創建channel的知識:
我們在第一章剖析過channel的創建, 其中NioServerSocketChannel中有個構造方法:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
當時我們并沒有剖析config相關知識, 在這一章首先對此做一個補充, 這里我們看到每一個NioServerSocketChannel都擁有一個config屬性, 這個屬性存放著NioServerSocketChannel的相關配置, 這里創建一個NioServerSocketChannelConfig對象, 并將當前channel, 和channel對應的java底層的socket對象進行了傳入, NioServerSocketChannelConfig其實是NioServerSocketChannel的內部類
我們跟到NioServerSocketChannelConfig類的構造方法中:
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); }
我們繼續跟入其父類DefaultServerSocketChannelConfig的構造方法中:
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; }
這里繼續調用了其父類的構造方法, 并保存了jdk底層的socket對象, 并且調用其父類DefaultChannelConfig的構造方法
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }
這里調用了自身的構造方法, 傳入了channel和一個AdaptiveRecvByteBufAllocator對象
AdaptiveRecvByteBufAllocator是一個緩沖區分配器, 用于分配一個緩沖區Bytebuf的, 有關Bytebuf的相關內容會在后面的章節詳細講解, 這里可以簡單介紹作為了解, 就當對于之后知識的預習
Bytebuf相當于jdk的ByetBuffer, Netty對其做了重新的封裝, 用于讀寫channel中的字節流, 熟悉Nio的同學對此應該并不陌生, AdaptiveRecvByteBufAllocator就是用于分配netty中ByetBuff的緩沖區分配器, 根據名字, 我們不難看出這個緩沖區是一個可變大小的字節緩沖區
我們跟到AdaptiveRecvByteBufAllocator的構造方法中:
public AdaptiveRecvByteBufAllocator() { //DEFAULT_MINIMUM:最小緩沖區長度64字節 //DEFAULT_INITIAL:初始容量1024字節 //最大容量65536字節 this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); }
這里調用自身的構造方法并且傳入了三個屬性, 這三個屬性的含義分別為:
DEFAULT_MINIMUM
:代表要分配的緩沖區長度最少為64個字節
DEFAULT_INITIAL
:代表要分配的緩沖區的初始容量為1024個字節
DEFAULT_MAXIMUM
:代表要分配的緩沖區最大容量為65536個字節
我們跟到this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM)方法中
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略驗證代碼 //最小容量在table中的下標 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下標 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
其中這里初始化了三個屬性, 分別是:
minIndex
:最小容量在size_table中的下標
maxIndex
:最大容量在table中的下標
initial
:初始容量1024個字節
這里的size_table就是一個數組, 里面盛放著byteBuf可分配的內存大小的集合, 分配的bytebuf無論是擴容還是收縮, 內存大小都屬于size_table中的元素, 那么這個數組是如何初始化的, 我們跟到這個屬性中:
private static final int[] SIZE_TABLE;
我們看到是一個final修飾的靜態成員變量, 我們跟到static塊中看它的初始化過程:
static { //List集合 List<Integer> sizeTable = new ArrayList<Integer>(); //從16開始, 每遞增16添加到List中, 直到大于等于512 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } //從512開始, 倍增添加到List中, 直到內存溢出 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } //初始化數組 SIZE_TABLE = new int[sizeTable.size()]; //將list的內容放入數組中 for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
首先創建一個Integer類型的list用于盛放內存元素
這里通過兩組循環為list添加元素
首先看第一組循環:
for (int i = 16; i < 512; i += 16) { sizeTable.add(i); }
這里是通過16平移的方式, 直到512個字節, 將每次平移之后的內存大小添加到list中
再看第二組循環
for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); }
超過512之后, 再通過倍增的方式循環, 直到int類型內存溢出, 將每次倍增之后大小添加到list中
最后初始化SIZE_TABLE數組, 將list中的元素按下表存放到數組中
這樣就初始化了內存數組
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { //忽略驗證代碼 //最小容量在table中的下標 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //最大容量在table中的下標 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
這里分別根據傳入的最小和最大容量去SIZE_TABLE中獲取其下標
我們跟到getSizeTableIndex(minimum)中:
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
這里是通過二分查找去獲取其下表
if (SIZE_TABLE[minIndex] < minimum)這里判斷最小容量下標所屬的內存大小是否小于最小值, 如果小于最小值則下標+1
最大容量的下標獲取原理同上, 判斷最大容量下標所屬內存大小是否大于最大值, 如果是則下標-1
我們回到DefaultChannelConfig的構造方法:
public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }
剛才我們剖析過了AdaptiveRecvByteBufAllocator()的創建過程, 我們繼續跟到this()中:
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }
我們看到這里初始化了channel, 在channel初始化之前, 調用了setRecvByteBufAllocator(allocator, channel.metadata())方法, 顧名思義, 這是用于設置緩沖區分配器的方法, 第一個參數是我們剛剛分析過的新建的AdaptiveRecvByteBufAllocator對象, 第二個傳入的是與channel綁定的ChannelMetadata對象, ChannelMetadata對象是什么?
我們跟進到metadata()方法當中, 由于是channel是NioServerSocketChannel, 所以調用到了NioServerSocketChannel的metadata()方法:
public ChannelMetadata metadata() { return METADATA; }
這里返回了一個成員變量METADATA, 跟到這個成員變量中:
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
這里創建了一個ChannelMetadata對象, 并在構造方法中傳入false和16
public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) { //省略驗證代碼 //false this.hasDisconnect = hasDisconnect; //16 this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead; }
這里做的事情非常簡單, 只初始化了兩個屬性:
hasDisconnect=false
defaultMaxMessagesPerRead=16
defaultMaxMessagesPerRead=16代表在讀取對方的鏈接或者channel的字節流時(無論server還是client), 最多只循環16次, 后面的講解將會看到
剖析完了ChannelMetadata對象的創建, 我們回到DefaultChannelConfig的構造方法:
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }
跟到setRecvByteBufAllocator(allocator, channel.metadata())方法中:
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator; }
首先會判斷傳入的緩沖區分配器是不是MaxMessagesRecvByteBufAllocator類型的, 因為AdaptiveRecvByteBufAllocator實現了MaxMessagesRecvByteBufAllocator接口, 所以此條件成立
之后將其轉換成MaxMessagesRecvByteBufAllocator類型,
然后調用其maxMessagesPerRead(metadata.defaultMaxMessagesPerRead())方法,
這里會走到其子類DefaultMaxMessagesRecvByteBufAllocator的maxMessagesPerRead(int maxMessagesPerRead)方法中,
其中參數metadata.defaultMaxMessagesPerRead()返回就是ChannelMetadata的屬性defaultMaxMessagesPerRead,
也就是16
跟到maxMessagesPerRead(int maxMessagesPerRead)方法中:
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { //忽略驗證代碼 //初始化為16 this.maxMessagesPerRead = maxMessagesPerRead; return this; }
這里將自身屬性maxMessagesPerRead設置為16, 然后返回自身
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } rcvBufAllocator = allocator; }
設置完了內存分配器的maxMessagesPerRead屬性, 最后將DefaultChannelConfig自身的成員變量rcvBufAllocator設置成我們初始化完畢的allocator對象
關于“Netty分布式客戶端接入流程是什么”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Netty分布式客戶端接入流程是什么”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。