中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java中Pulsar?InterruptedException異常怎么解決

發布時間:2023-02-23 14:02:57 來源:億速云 閱讀:128 作者:iii 欄目:開發技術

本篇內容主要講解“java中Pulsar InterruptedException異常怎么解決”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“java中Pulsar InterruptedException異常怎么解決”吧!

背景

java中Pulsar?InterruptedException異常怎么解決

今天收到業務團隊反饋線上有個應用往 Pulsar 中發送消息失敗了,經過日志查看得知是發送消息時候拋出了 java.lang.InterruptedException 異常。

和業務溝通后得知是在一個 gRPC 接口中觸發的消息發送,大約持續了半個小時的異常后便恢復正常了,這是整個問題的背景。

前置排查

拿到該問題后首先排查下是否是共性問題,查看了其他的應用沒有發現類似的異常;同時也查看了 Pulsar broker 的監控大盤,在這個時間段依然沒有波動和異常;

這樣可以初步排除是 Pulsar 服務端的問題。

接著便是查看應用那段時間的負載情況,從應用 QPS 到 JVM 的各個內存情況依然沒發現有什么明顯的變化。

Pulsar 源碼排查

既然看起來應用本身和 Pulsar broker 都沒有問題的話那就只能從異常本身來排查了。

首先第一步要得知具體使用的是 Pulsar-client 是版本是多少,因為業務使用的是內部基于官方 SDK 封裝 springboot starter 所以第一步還得排查這個 starter 是否有影響。

通過查看源碼基本排除了 starter 的嫌疑,里面只是簡單的封裝了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下來便只能是分析堆棧了,因為 Pulsar-client 的部分實現源碼是沒有直接打包到依賴中的,反編譯的話許多代碼行數對不上,所以需要將官方的源碼拉到本地,切換到對于的分支進行查看。

這一步稍微有點麻煩,首先是代碼庫還挺大的,加上之前如果沒有準備好 Pulsar 的開發環境的話估計會勸退一部分人;

但其實大部分問題都是網絡造成的,只要配置一些 Maven 鏡像多試幾次總會編譯成功。

我這里直接將分支切換到 branch-2.8

從堆棧的頂部開始排查 TypedMessageBuilderImpl.java:91

java中Pulsar?InterruptedException異常怎么解決

看起來是內部異步發送消息的時候拋了異常。

接著往下看到這里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

java中Pulsar?InterruptedException異常怎么解決

看起來是這里沒錯,但是代碼行數明顯不對;因為 2.8 這個分支也是修復過幾個版本,所以中間有修改導致代碼行數與最新代碼對不上也正常。

semaphore.get().acquire();

不過初步來看應該是這行代碼拋出的線程終端異常,這里看起來只有他最有可能了。

java中Pulsar?InterruptedException異常怎么解決

為了確認是否是真的是這行代碼,這個文件再往前翻了幾個版本最終確認了就是這行代碼沒錯了。

我們點開java.util.concurrent.Semaphore#acquire()的源碼,

    /**
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

通過源碼會發現 acquire() 函數確實會響應中斷,一旦檢測到當前線程被中斷后便會拋出 InterruptedException 異常。

定位問題

所以問題的原因基本確定了,就是在 Pulsar 的發送消息線程被中斷了導致的,但為啥會被中斷還需要繼續排查。

我們知道線程中斷是需要調用 Thread.currentThread().interrupt(); API的,首先猜測是否 Pulsar 客戶端內部有個線程中斷了這個發送線程。

于是我在 pulsar-client 這個模塊中搜索了相關代碼:

java中Pulsar?InterruptedException異常怎么解決

排除掉和 producer 不相關的地方,其余所有中斷線程的代碼都是在有了該異常之后繼續傳遞而已;所以初步來看 pulsar-client 內部沒有主動中斷的操作。

既然 Pulsar 自己沒有做,那就只可能是業務做的了?

于是我在業務代碼中搜索了一下:

java中Pulsar?InterruptedException異常怎么解決

果然在業務代碼中搜到了唯一一處中斷的地方,而且通過調用關系得知這段代碼是在消息發送前執行的,并且和 Pulsar 發送函數處于同一線程。

大概的偽代碼如下:

        List.of(1, 2, 3).stream().map(e -> {
                    return CompletableFuture.supplyAsync(() -> {
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                        return e;
                    });
                }
        ).collect(Collectors.toList()).forEach(f -> {
            try {
                Integer integer = f.get();
                log.info("====" + integer);
                if (integer==3){
                    TimeUnit.SECONDS.sleep(10);
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
	   MessageId send = producer.newMessage().value(msg.getBytes()).send();

執行這段代碼可以完全復現同樣的堆棧。

幸好中斷這里還打得有日志:

java中Pulsar?InterruptedException異常怎么解決

java中Pulsar?InterruptedException異常怎么解決

通過日志搜索發現異常的時間和這個中斷的日志時間點完全重合,這樣也就知道根本原因了。

因為業務線程和消息發送線程是同一個,在某些情況下會執行 Thread.currentThread().interrupt();,其實單純執行這行函數并不會發生什么,只要沒有去響應這個中斷,也就是 Semaphore 源碼中的判斷了線程中斷的標記:

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

但恰好這里業務中斷后自己并沒有去判斷這個標記,導致 Pulsar 內部去判斷了,最終拋出了這個異常。

到此,相信大家對“java中Pulsar InterruptedException異常怎么解決”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

习水县| 呼和浩特市| 保亭| 石首市| 伊金霍洛旗| 景泰县| 墨玉县| 金沙县| 衢州市| 广汉市| 仁寿县| 延吉市| 嘉禾县| 文安县| 龙门县| 石柱| 五大连池市| 宣威市| 武定县| 青海省| 邮箱| 三河市| 南木林县| 洛隆县| 宜丰县| 青浦区| 平泉县| 武功县| 张家界市| 卢湾区| 治县。| 肇庆市| 扶沟县| 永济市| 苍梧县| 酉阳| 洪洞县| 灵宝市| 饶平县| 温宿县| 寿宁县|