您好,登錄后才能下訂單哦!
這篇文章主要介紹Netty對JDK緩沖區中內存池零拷貝優化的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
NIO中緩沖區是數據傳輸的基礎,JDK通過ByteBuffer實現,Netty框架中并未采用JDK原生的ByteBuffer,而是構造了ByteBuf。
ByteBuf對ByteBuffer做了大量的優化,比如說內存池,零拷貝,引用計數(不依賴GC),本文主要是分析這些優化,學習這些優化思想,學以致用,在實際工程中,借鑒這些優化方案和思想。
直接內存和堆內存
首先先講一下這里面需要用的基礎知識,在JVM中 內存可分為兩大塊,一個是堆內存,一個是直接內存。這里簡單介紹一下
堆內存:
堆內存是Jvm所管理的內存,相比方法區,棧內存,堆內存是最大的一塊。所有的對象實例實例以及數組都要在堆上分配。
Java的垃圾收集器是可以在堆上回收垃圾。
直接內存:
JVM使用Native函數在堆外分配內存,之后通過Java堆中的DirectByteBuffer對象作為這塊內存的引用進行操作。直接內存不會受到Java堆的限制,只受本機內存影響。
Java的GC只會在老年區滿了觸發Full GC時,才會去順便清理直接內存的廢棄對象。
JDK原生緩沖區ByteBuffer
在NIO中,所有數據都是用緩沖區處理的。讀寫數據,都是在緩沖區中進行的。緩存區實質是是一個數組,通常使用字節緩沖區——ByteBuffer。
屬性:
使用方式:
ByteBuffer可以申請兩種方式的內存,分別為堆內存和直接內存,首先看申請堆內存。
// 申請堆內存 ByteBuffer HeapbyteBuffer = ByteBuffer.allocate(1024);
很簡單,就一行代碼,再看看allocate方法。
public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
其實就是new一個HeapByteBuffer對象。這個 HeapByteBuffer繼承自ByteBuffer,構造器采用了父類的構造器,如下所示:
HeapByteBuffer(int cap, int lim) { // package-private super(-1, 0, lim, cap, new byte[cap], 0); /* hb = new byte[cap]; offset = 0; */ }//ByteBuffer構造器 ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; this.offset = offset; }
結合ByteBuffer的四個屬性,初始化的時候就可以賦值capaticy,limit,position,mark,至于byte[] hb, int offsef這兩個屬性,JDK文檔給出的解釋是 backing array , and array offset 。它是一個回滾數組,offset是數組的偏移值。
申請直接內存:
// 申請直接內存 ByteBuffer DirectbyteBuffer = ByteBuffer.allocateDirect(1024);
allocateDirect()實際上就是new的一個DirectByteBuffer對象,不過這個new 一個普通對象不一樣。這里使用了Native函數來申請內存,在Java中就是調用unsafe對象
public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; } View Code
申請方法不同的內存有不同的用法。接下來看一看ByteBuffer的常用方法與如何使用
ByteBuffer的常用方法與使用方式
Bytebuf的讀和寫是使用put()和get()方法實現的
// 讀操作public byte get() { return hb[ix(nextGetIndex())]; }final int nextGetIndex() { if (position >= limit) throw new BufferUnderflowException(); return position++; }// 寫操作public ByteBuffer put(byte x) { hb[ix(nextPutIndex())] = x; return this; }final int nextPutIndex() { if (position >= limit) throw new BufferOverflowException(); return position++; }
從代碼中可以看出,讀和寫操作都會改變ByteBuffer的position屬性,這兩個操作是共用的position屬性。這樣就會帶來一個問題,讀寫操作會導致數據出錯啊,數據位置出錯。
ByteBuffer提供了flip()方法,讀寫模式切換,切換的時候會改變position和limit的位置。看看flip()怎么實現的:
public final Buffer flip() { // 1. 設置 limit 為當前位置 limit = position; // 2. 設置 position 為0 position = 0; mark = -1; return this; }
這里就不重點介紹了,有些細節可以自己去深究。
Netty的ByteBuf
Netty使用的自身的ByteBuf對象來進行數據傳輸,本質上使用了外觀模式對JDK的ByteBuffer進行封裝。
相較于原生的ByteBuffer,Netty的ByteBuf做了很多優化,零拷貝,內存池加速,讀寫索引。
為什么要使用內存池?
首先要明白一點,Netty的內存池是不依賴于JVM本身的GC的。
回顧一下直接內存的GC:
上文提到Java的GC只會在老年區滿了觸發Full GC時,才會去順便清理直接內存的廢棄對象。
JVM中的直接內存,存在堆內存中其實就是DirectByteBuffer類,它本身其實很小,真的內存是在堆外,這里是映射關系。
每次申請直接內存,都先看看是否超限 —— 直接內存的限額默認(可用 -XX:MaxDirectMemorySize 重新設定)。
如果超過限額,就會主動執行System.gc(),這樣會帶來一個影響,系統會中斷100ms。如果沒有成功回收直接內存,并且還是超過直接內存的限額,就會拋出OOM——內存溢出。
繼續從GC角度分析,DirectByteBuffer熬過了幾次young gc之后,會進入老年代。當老年代滿了之后,會觸發Full GC。
因為本身很小,很難占滿老年代,因此基本不會觸發Full GC,帶來的后果是大量堆外內存一直占著不放,無法進行內存回收。
還有最后一個辦法,就是依靠申請額度超限時觸發的system.gc(),但是前面提到,它會中斷進程100ms,如果在這100ms的之間,系統未完成GC,仍會拋出OOM。
所以這個最后一個辦法也不是完全保險的。
Netty使用了引用計數的方式,主動回收內存。回收的對象包括非池直接內存,和內存池中的內存。
內存池的內存泄露檢測?
Netty中使用引用計數機制來管理資源,ByteBuf實際上是實現了ReferenceCounted接口,當實例化ByteBuf對象時,引用計數加1。
當應用代碼保持一個對象引用時,會調用retain方法將計數增加1,對象使用完畢進行釋放,調用release將計數器減1.
當引用計數變為0時,對象將釋放所有的資源,返回內存池。
Netty內存泄漏檢測級別:
禁用(DISABLED) - 完全禁止泄露檢測。不推薦。
簡單(SIMPLE) - 告訴我們取樣的1%的緩沖是否發生了泄露。默認。
高級(ADVANCED) - 告訴我們取樣的1%的緩沖發生泄露的地方
偏執(PARANOID) - 跟高級選項類似,但此選項檢測所有緩沖,而不僅僅是取樣的那1%。此選項在自動測試階段很有用。如果構建(build)輸出包含了LEAK,可認為構建失敗也可以使用JVM的-Dio.netty.leakDetectionLevel選項來指定泄漏檢測級別。
內存跟蹤
在內存池中分配內存,得到的ByteBuf對象都是經過 toLeakAwareBuffer()方法封裝的,該方法作用就是對ByteBuf對象進行引用計數,使用 SimpleLeakAwareByteBuf或者 AdvancedLeakAwareByteBuf 來包裝ByteBuf。此外該方法只對非池內存中的直接內存和內存池中的內存進行內存泄露檢測。
//裝飾器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf來包裝原始的ByteBufprotected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeakTracker<ByteBuf> leak; //根據設置的Level來選擇使用何種裝飾器 switch (ResourceLeakDetector.getLevel()) { case SIMPLE://創建用于跟蹤和表示內容泄露的ResourcLeak對象 leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { //只在ByteBuf.order方法中調用ResourceLeak.record buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { //只在ByteBuf.order方法中調用ResourceLeak.record buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; default: break; } return buf; }
實際上,內存泄露檢測是在 AbstractByteBuf.leakDetector.track(buf)進行的,來看看track方法的具體實現。
/** * Creates a new {@link ResourceLeakTracker} which is expected to be closed via * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated. * * @return the {@link ResourceLeakTracker} or {@code null} */ @SuppressWarnings("unchecked") public final ResourceLeakTracker<T> track(T obj) { return track0(obj); } @SuppressWarnings("unchecked") private DefaultResourceLeak track0(T obj) { Level level = ResourceLeakDetector.level; // 不進行內存跟蹤 if (level == Level.DISABLED) { return null; } if (level.ordinal() < Level.PARANOID.ordinal()) { //如果監控級別低于PARANOID,在一定的采樣頻率下報告內存泄露 if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } return null; } //每次需要分配 ByteBuf 時,報告內存泄露情況 reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); }
再來看看返回對象——DefaultResourceLeak,他的實現方式如下:
private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
它繼承了虛引用WeakReference,虛引用完全不影響目標對象的垃圾回收,但是會在目標對象被VM垃圾回收時加入到引用隊列,
正常情況下ResourceLeak對象,會將監控的資源的引用計數為0時被清理掉。
但是當資源的引用計數失常,ResourceLeak對象也會被加入到引用隊列.
存在著這樣一種情況:沒有成對調用ByteBuf的retain和relaease方法,導致ByteBuf沒有被正常釋放,當 ResourceLeak(引用隊列) 中存在元素時,即表明有內存泄露。
Netty中的 reportLeak()方法來報告內存泄露情況,通過檢查引用隊列來判斷是否有內存泄露,并報告跟蹤情況.
方法代碼如下:
View Code
Handler中的內存處理機制
Netty中有handler鏈,消息有本Handler傳到下一個Handler。所以Netty引入了一個規則,誰是最后使用者,誰負責釋放。
根據誰最后使用誰負責釋放的原則,每個Handler對消息可能有三種處理方式
對原消息不做處理,調用 ctx.fireChannelRead(msg)把原消息往下傳,那不用做什么釋放。
將原消息轉化為新的消息并調用 ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。
如果已經不再調用ctx.fireChannelRead(msg)傳遞任何消息,那更要把原消息release掉。
假設每一個Handler都把消息往下傳,Handler并也不知道誰是啟動Netty時所設定的Handler鏈的最后一員,所以Netty在Handler鏈的最末補了一個TailHandler,如果此時消息仍然是ReferenceCounted類型就會被release掉。
總結:
1.Netty在不同的內存泄漏檢測級別情況下,采樣概率是不一樣的,在Simple情況下出現了Leak,要設置“-Dio.netty.leakDetectionLevel=advanced”再跑一次代碼,找到創建和訪問的地方。
2.Netty中的內存泄露檢測是通過對ByteBuf對象進行裝飾,利用虛引用和引用計數來對非池中的直接內存和內存池中內存進行跟蹤,判斷是否發生內存泄露。
3.計數器基于 AtomicIntegerFieldUpdater,因為ByteBuf對象很多,如果都把int包一層AtomicInteger花銷較大,而AtomicIntegerFieldUpdater只需要一個全局的靜態變量。
Netty中的內存單位
Netty中將內存池分為五種不同的形態:Arena,ChunkList,Chunk,Page,SubPage.
首先來看Netty最大的內存單位PoolArena——連續的內存塊。它是由多個PoolChunkList和兩個SubPagePools(一個是tinySubPagePool,一個是smallSubPagePool)組成的。如下圖所示:
1.PoolChunkList是一個雙向的鏈表,PoolChunkList負責管理多個PoolChunk的生命周期。
2.PoolChunk中包含多個Page,Page的大小默認是8192字節,也可以設置系統變量io.netty.allocator.pageSize來改變頁的大小。自定義頁大小有如下限制:1.必須大于4096字節,2.必須是2的整次數冪。
3.塊(PoolChunk)的大小是由頁的大小和maxOrder算出來的,計算公式是: chunkSize = 2^{maxOrder} * pageSize。 maxOrder的默認值是11,也可以通過io.netty.allocator.maxOrder系統變量設置,只能是0-14的范圍,所以chunksize的默認大小為:(2^11)*8192=16MB
Page中包含多個SubPage。
PoolChunk內部維護了一個平衡二叉樹,如下圖所示:
PoolSubPage
通常一個頁(page)的大小就達到了10^13(8192字節),通常一次申請分配內存沒有這么大,可能很小。
于是Netty將頁(page)劃分成更小的片段——SubPage
Netty定義這樣的內存單元是為了更好的分配內存,接下來看一下一個ByteBuf是如何在內存池中申請內存的。
Netty如何分配內存池中的內存?
分配原則:
內存池中的內存分配是在PoolArea中進行的。
申請小于PageSize(默認8192字節)的內存,會在SubPagePools中進行分配,如果申請內存小于512字節,則會在tingSubPagePools中進行分配,如果大于512小于PageSize字節,則會在smallSubPagePools進行分配。
申請大于PageSize的內存,則會在PoolChunkList中進行分配。
申請大于ChunkSize的內存,則不會在內存池中申請,而且也不會重用該內存。
應用中在內存池中申請內存的方法:
// 在內存池中申請 直接內存 ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 在內存池中申請 堆內存 ByteBuf heapByteBuf = ByteBufAllocator.DEFAULT.heapBuffer(1024);
接下來,一層一層的看下來,在Netty中申請內存是如何實現的。就拿申請直接內存舉例,首先看directBuffer方法。
// directBuffer方法實現 @Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); } // 校驗申請大小,返回申請的直接內存 @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } //PooledByteBufAllocator類中的 newDirectBuffer方法的實現 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // Netty避免每個線程對內存池的競爭,在每個線程都提供了PoolThreadCache線程內的內存池 PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; // 如果緩存存在,則分配內存 final ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // 緩存不存在,則分配非池內存 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } // 通過toLeakAwareBuffer包裝成內存泄漏檢測的buffer return toLeakAwareBuffer(buf); }
一般情況下,內存都是在buf = directArena.allocate(cache, initialCapacity, maxCapacity)這行代碼進行內存分配的,也就是說在內存的連續塊PoolArena中進行的內存分配。
接下來,我們根據內存分配原則來進行內存研讀PoolArena中的allocate方法。
1 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 2 PooledByteBuf<T> buf = newByteBuf(maxCapacity); 3 allocate(cache, buf, reqCapacity); 4 return buf; 5 } 6 7 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { 8 final int normCapacity = normalizeCapacity(reqCapacity); 9 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize10 int tableIdx;11 PoolSubpage<T>[] table;12 boolean tiny = isTiny(normCapacity);13 if (tiny) { // < 51214 15 // 如果申請內存小于512字節,則會在tingSubPagePools中進行分配16 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {17 // was able to allocate out of the cache so move on18 return;19 }20 tableIdx = tinyIdx(normCapacity);21 table = tinySubpagePools;22 } else {23 // 如果大于512小于PageSize字節,則會在smallSubPagePools進行分配24 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {25 // was able to allocate out of the cache so move on26 return;27 }28 tableIdx = smallIdx(normCapacity);29 table = smallSubpagePools;30 }31 32 final PoolSubpage<T> head = table[tableIdx];33 34 /** 35 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and 36 * {@link PoolChunk#free(long)} may modify the doubly linked list as well. 37 */38 synchronized (head) {39 final PoolSubpage<T> s = head.next;40 if (s != head) {41 assert s.doNotDestroy && s.elemSize == normCapacity;42 long handle = s.allocate();43 assert handle >= 0;44 s.chunk.initBufWithSubpage(buf, handle, reqCapacity);45 incTinySmallAllocation(tiny);46 return;47 }48 }49 synchronized (this) {50 allocateNormal(buf, reqCapacity, normCapacity);51 }52 53 incTinySmallAllocation(tiny);54 return;55 }56 if (normCapacity <= chunkSize) {57 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {58 // was able to allocate out of the cache so move on59 return;60 }61 synchronized (this) {62 allocateNormal(buf, reqCapacity, normCapacity);63 ++allocationsNormal;64 }65 } else {66 // Huge allocations are never served via the cache so just call allocateHuge67 allocateHuge(buf, reqCapacity);68 }69 }
如何使用內存池?
底層IO處理線程的緩沖區使用堆外直接緩沖區,減少一次IO復制。業務消息的編解碼使用堆緩沖區,分配效率更高,而且不涉及到內核緩沖區的復制問題。
Netty默認不使用內存池,需要在創建服務端或者客戶端的時候進行配置。
//Boss線程池內存池配置. .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) //Work線程池內存池配置. .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
本人的想法是:
1.I/O處理線程使內存池中的直接內存,開啟以上配置
2.在handler處理業務的時候,使用內存池中的堆內存
還有一點值得注意的是:在使用完內存池中的ByteBuf,一定要記得釋放,即調用release():
// 在內存池中申請 直接內存 ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 歸還到內存池 directByteBuf.release();
如果handler繼承了SimpleChannelInboundHandler,那么它將會自動釋放Bytefuf.詳情可見:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { // autoRelease默認為true if (autoRelease && release) { // 釋放Bytebuf,歸還到內存池 ReferenceCountUtil.release(msg); } } }
零拷貝:
該部分是重點介紹的部分,首先將它與傳統的I/O read和write操作作對比,看看有什么不同,首先需要理解一下用戶態和內存態的概念
用戶態(User Mode)和內核態(Kernel Mode),也可以叫用戶空間和內核
用戶態:受限的訪問內存,并且不允許訪問硬件設備。
內核態:本質上是一個軟件,可以控制計算機的硬件資源(如網卡,硬盤),可以訪問內存所有數據。
用戶程序都是運行在用戶態中的,比如JVM,就是用戶程序,所以它運行在用戶態中。
用戶態是不能直接訪問硬件設備的,如果需要一次I/O操作,那就必須利用系統調用機制切換到內核態(用戶態與內核態之間的轉換稱為上下文切換),進行硬盤讀寫。
比如說一次傳統網絡I/O:
第一步,從用戶態切換到內核態,將用戶緩沖區的數據拷貝到內核緩沖區,執行send操作。
第二步,數據發送由底層的操作系統進行,此時從內核態切換到用戶態,將內核緩存區的數據拷貝到網卡的緩沖區
總結:也就是一次普通的網絡I/O,至少經過兩次上下文切換,和兩次內存拷貝。
什么是零拷貝?
當需要傳輸的數據遠大于內核緩沖區的大小時,內核緩沖區就成為I/O的性能瓶頸。零拷貝就是杜絕了內核緩沖區與用戶緩沖區的的數據拷貝。
所以零拷貝適合大數據量的傳輸。
拿傳統的網絡I/O做對比,零拷貝I/O是怎樣的一個過程:
用戶程序執行transferTo(),將用戶緩沖區待發送的數據拷貝到網卡緩沖區。
很簡單,一步完成,中間少了用戶態到內存態的拷貝。
Netty中零拷貝如何實現
Netty的中零拷貝與上述零拷貝是不一樣的,它并不是系統層面上的零拷貝,只是相對于ByteBuf而言的。
Netty中的零拷貝:
1.CompositeByteBuf,將多個ByteBuf合并為一個邏輯上的ByteBuf,避免了各個ByteBuf之間的拷貝。
使用方式:
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponents(true, ByteBuf1, ByteBuf1);
注意: addComponents第一個參數必須為true,那么writeIndex才不為0,才能從compositeByteBuf中讀到數據。
2.wrapedBuffer()方法,將byte[]數組包裝成ByteBuf對象。
byte[] bytes = data.getBytes();ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
Unpooled.wrappedBuffer(bytes)就是進行了byte[]數組的包裝工作,過程中不存在內存拷貝。
即包裝出來的ByteBuf和byte[]數組指向了同一個存儲空間。因為值引用,所以bytes修改也會影響 byteBuf 的值。
3.ByteBuf的分割,slice()方法。將一個ByteBuf對象切分成多個ByteBuf對象。
ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024);ByteBuf header = directByteBuf.slice(0,50);ByteBuf body = directByteBuf.slice(51,1024);
header和body兩個ByteBuf對象實際上還是指向directByteBuf的存儲空間。
以上是“Netty對JDK緩沖區中內存池零拷貝優化的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。