您好,登錄后才能下訂單哦!
這篇文章主要介紹Java中如何實現消息隊列任務的平滑關閉,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
前言
消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
消息隊列應用場景
消息隊列在實際應用中常用的使用場景:異步處理,應用解耦,流量削鋒和消息通訊四個場景。
1.問題背景
對于消息隊列任務的監聽,我們一般使用Java寫一個獨立的程序,在Linux服務器上運行。當訂閱者程序啟動后,會通過消息隊列客戶端接收消息,放入線程池中并發的處理。
那么問題來了,當我們修改程序后,需要重新啟動時,如何保證消息都能夠被處理呢?
一些開源的消息隊列中間件,會提供ACK機制(消息確認機制),當訂閱者處理完消息后,會通知服務端刪除對應消息,如果訂閱者出現異常,服務端未收到確認消費,則會重試發送。
那如果消息隊列中間件沒有提供ACK機制,或者為了高吞度量的考慮關閉了ACK功能,如何最大可能保證消息都能夠被處理呢?
正常來說,訂閱者程序關閉后,消息會在隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的消息是不會丟失的。可能出現的問題就是在關閉的一瞬間,已經從消息隊列中取出,但還沒有被處理的消息。
因此我們需要一套平滑關閉的機制,保證在重啟的時候,已接收的消息可以得到正常處理。
2.問題分析
平滑關閉的思路如下:
在關閉程序時,首先關閉消息訂閱,保證不再接收新的消息。
關閉線程池,等待線程池中的消息處理完畢。
程序退出。
關閉消息訂閱:消息隊列的客戶端都會提供關閉連接的方法,具體可以自行查看API。
關閉線程池:Java的ThreadPoolExecutor線程池提供shutdown()
和shutdownNow()
兩個方法,區別是前者會等待線程池中的消息都處理完畢,后者會直接停止所有線程并返回未處理完的線程List。因為我們需要使用shutdown()
方法進行關閉,并通過isTerminated()
方法,判斷線程池是否已經關閉。
那么問題又來了,我們如何通知到程序,需要執行關閉操作呢?
在Linux中,進程的關閉是通過信號傳遞的,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以通過 kill -l,查看kill命令的其它信號量。
這里提供兩種關閉方法:
程序中添加Runtime.getRuntime().addShutdownHook
鉤子方法,SIGTERM,SIGINT,SIGHUP三種信號都會觸發該方法(分別對應kill -1/kill -2/kill -15,Ctrl+C也會觸發SIGINT信號)。
程序中通過Signal類注冊信號監聽,比如USR2(對應kill -12),在handle方法中執行關閉操作。
補充說明:addShutdownHook方法和handle方法中如果再調用System.exit
,會造成deadlock,使進程無法正常退出。
偽代碼分別如下
Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { //關閉訂閱者 //關閉線程池 //退出 } });
//注冊linux kill信號量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { //關閉訂閱者 //關閉線程池 //退出 } });
模擬Demo
下面通過一個demo模擬相關邏輯操作
首先模擬一個生產者,每秒生產5個消息
然后模擬一個訂閱者,收到消息后,放入線程池進行處理,線程池固定4個線程,每個線程處理時間1秒,這樣線程池每秒會積壓1個消息。
package com.lujianing.demo; import sun.misc.Signal; import sun.misc.SignalHandler; import java.util.concurrent.*; /** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */ public class MsgClient { //模擬消費線程池 同時4個線程處理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模擬消息生產任務 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判斷是否關閉訂閱 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { //注冊鉤子方法 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { close(); } }); BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模擬消息隊列生產者 private static void producer(final BlockingQueue queue){ //每200毫秒向隊列中放入一個消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } }, 0L, 200L, TimeUnit.MILLISECONDS); } //模擬消息隊列消費者 生產者每秒生產5個 消費者4個線程消費1個1秒 每秒積壓1個 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //從隊列中拿到消息 final String msg = (String)queue.take(); //放入線程池處理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看線程池堆積消息個數 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } private static void close(){ System.out.println("收到kill消息,執行關閉操作"); //關閉訂閱消費 isClose = true; //關閉線程池,等待線程池積壓消息處理 THREAD_POOL.shutdown(); //判斷線程池是否關閉 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判斷線程池積壓數量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("訂閱者關閉,線程池處理完畢"); } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注冊linux kill信號量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { close(); } }); } } }
當我們在服務上運行時,通過控制臺可以看到相關的輸出信息,demo中輸出了線程池的積壓消息個數
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
另打開一個終端,通過ps命令查看進程號,或者通過nohup啟動Java進程拿到進程id
ps -fe|grep MsgClient
當我們執行kill -12 pid
的時候 可以看到關閉業務邏輯
以上是“Java中如何實現消息隊列任務的平滑關閉”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。