中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用線程執行框架

發布時間:2021-12-04 13:48:13 來源:億速云 閱讀:156 作者:小新 欄目:開發技術

這篇文章給大家分享的是有關如何使用線程執行框架的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

場景

一個線程從某個地方接收消息(數據),可以是其他主機或者消息隊列,然后轉由另外的一個線程池來執行具體處理消息的邏輯,并且消息的處理速度小于接收消息的速度。這種情景很常見,試想一下,你會怎么設計和實現?

直觀想法

很顯然采用JUC的線程框架,可以迅速寫出代碼。

消息接收者:

public class Receiver {     private static volatile boolean inited = false;     private static volatile boolean shutdown = false;     private static volatile int cnt = 0;      private MessageHandler messageHandler;      public void start(){         Executors.newSingleThreadExecutor().execute(new Runnable() {             @Override             public void run() {                 while(!shutdown){                     init();                     recv();                 }             }         });     }      /**      * 模擬消息接收      */     public void recv(){             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start();     } }

消息處理:

public class MessageHandler {      private static final int THREAD_POOL_SIZE = 4;      private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);      public void handle(Message msg) {         try {             service.execute(new Runnable() {                 @Override                 public void run() {                     parseMsg(msg);                 }             });         } catch (Throwable e) {             System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) {                 e.printStackTrace();             }          }     } }

效果:這種方案導致的現象是接收到的消息會迅速堆積,我們從消息隊列(或者其他地方)取出了大量消息,但是處理線程的速度又跟不上,所以導致的問題是大量的Task會堆積在線程池底層維護的一個阻塞隊列中,這會極大的耗費存儲空間,影響系統的性能。

分析:當execute()一個任務的時候,如果有空閑的worker線程,那么投入運行,否則看設置的***線程個數,沒有達到線程個數限制就創建新線程,接新任務,否則就把任務緩沖到一個阻塞隊列中,問題就是這個隊列,默認的大小是沒有限制的,所以就會大量的堆積任務,必然耗費heap空間。

public static ExecutorService newFixedThreadPool(int nThreads) {         return new ThreadPoolExecutor(nThreads, nThreads,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>());     }  public LinkedBlockingQueue() {         this(Integer.MAX_VALUE); // capacity     }

如何使用線程執行框架

計數限制

面對上述問題,想到了要限制消息接收的速度,自然就想到了各種線程同步的原語,不過在這里最簡單的就是使用一個Volatile的計數器。

消息接收者:

public class Receiver {     private static volatile boolean inited = false;     private static volatile boolean shutdown = false;     private static volatile int cnt = 0;     private MessageHandler messageHandler;     public void start(){         Executors.newSingleThreadExecutor().execute(new Runnable() {             @Override             public void run() {                 while(!shutdown){                     init();                     recv();                 }             }         });     }      /**      * 模擬消息接收      */     public void recv(){             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start();     } }

消息處理:

public class MessageHandler {     private static final int THREAD_POOL_SIZE = 1;     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);      public void handle(Message msg){         try {             service.execute(new Runnable() {                  @Override                 public void run() {                     parseMsg(msg);                 }             });         } catch (Throwable e) {             System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message){ try { Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); }finally {             Receiver.limit --;         }      } }

效果:通過控制消息的個數來阻塞消息的接收過程,就不會導致任務的堆積,系統的內存消耗會比較平緩,限制消息的個數本質就和下面限制任務隊列大小一樣。

如何使用線程執行框架

使用同步隊列 SynchronousQueue

SynchronousQueue  雖名為隊列,但是其實不會緩沖任務的對象,只是作為對象傳遞的控制點,如果有空閑線程或者沒有達到***線程限制,就會交付給worker線程去執行,否則就會拒絕,我們需要自己實現對應的拒絕策略RejectedExecutionHandler,默認的是拋出異常RejectedExecutionException。

消息接收者同上。

消息處理:

public class MessageHandler {     private static final int THREAD_POOL_SIZE = 4;      ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,             new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {         @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {             System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("重新放任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }

效果:能夠控制消息的接收速度,但是我們需要在rejectedExecution中實現某種阻塞的操作,但是選擇在發生拒絕的時候把任務重新放回隊列,帶來的問題就是這個Task會發生饑餓現象。

如何使用線程執行框架

使用大小限制的阻塞隊列

使用LinkedBlockingQueue作為線程框架底層的任務緩沖區,并且設置大小限制,思想上和上述方案一樣,都是有一個阻塞的點,但是通過***的jvm monitor看到這里的CPU消耗更少,內存使用有所降低,并且波動小(具體原因有待探索)。

消息接收者同上。

消息處理:

public class MessageHandler {     private static final int THREAD_POOL_SIZE = 4;     private static final int BLOCK_QUEUE_CAP = 500;     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,             new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() {         @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {             System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("重新放任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { try { Thread.sleep(5000); System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread;         }     } }

如何使用線程執行框架

感謝各位的閱讀!關于“如何使用線程執行框架”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

双城市| 马龙县| 台东县| 宁强县| 顺平县| 沾化县| 霍城县| 镇原县| 蒙自县| 华容县| 蕲春县| 尼玛县| 莱阳市| 宁河县| 达孜县| 上高县| 郧西县| 西宁市| 济阳县| 宽城| 富阳市| 达孜县| 多伦县| 鄄城县| 湾仔区| 湖州市| 兴文县| 贵州省| 津南区| 柯坪县| 南平市| 三台县| 云南省| 云浮市| 临潭县| 湄潭县| 泉州市| 营口市| 和静县| 镇平县| 开原市|