中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ?broker文件清理源碼分析

發布時間:2023-04-03 17:08:18 來源:億速云 閱讀:277 作者:iii 欄目:開發技術

本篇內容介紹了“RocketMQ broker文件清理源碼分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

    1. broker 清理文件介紹

    1.1 哪些文件需要清理

    首先我們需要介紹下在RocketMQ中哪些文件需要清理,其實可以想一想,在RocketMQ中哪些文件是一直在往里面寫入東西的,最容易想到的就是commitlog 了,因為在一個broker 進程中,所有的普通消息,事務消息,系統消息啥的都往這個commitlog中寫,隨著時間的越來越長,然后commitlog就會越積攢越多,肯定會有磁盤放不下的那一天,而且我們消息消費完成后,那些被消費完成后的消息其實作用就很小了,可能會有這么一個場景,比如說我線上出現了某個問題,我想看下關于這個問題的消息有沒有被消費到,可能你會用到這個消息,但是這種問題一般就是比較緊急的,最近實效的,之前那些消息其實作用就基本沒有了,所以就需要清理掉之前的消息。其實不光commitlog需要清理,還需要清理一下ConsumeQueueindexFile , 因為你commitlog里面的消息都被清理了,ConsumeQueueindexFile 再保存著之前的一些數據,就是純粹浪費空間了。

    所以說 broker 文件清理主要是清理commitlog , ConsumeQueue , indexFile

    1.2 RocketMQ文件清理的機制

    我們介紹下RocketMQ文件清理的機制,RocketMQ默認是清理72小時之前的消息,然后它有幾個觸發條件, 默認是凌晨4點觸發清理, 除非你你這個磁盤空間占用到75% 以上了。在清理commitlog 的時候,并不是一條消息一條消息的清理,拿到所有的MappedFile(拋去現在還在用著的,也就是最后一個) ,然后比對每個MappedFile的最后一條消息的時間,如果是72小時之前的就把MappedFile對應的文件刪除了,銷毀對應MappedFile,這種情況的話只要你MappedFile 最后一條消息還在存活實效內的話,它就不會清理你這個MappedFile,就算你這個MappedFile 靠前的消息過期了。但是有一種情況它不管你消息超沒超過72小時,直接就是刪,那就是磁盤空間不足的時候,也就是占了85%以上了,就會立即清理。

    清理完成commitlog 之后,就會拿到commitlog中最小的offset ,然后去ConsumeQueueindexFile中把小于offset 的記錄刪除掉。清理ConsumeQueue 的時候也是遍歷MappedFile ,然后它的最后一條消息(unit)小于commitlog中最小的offset 的話,就說明這個MappedFile都小于offset ,因為他們是順序追加寫的,這個MappedFile 就會清理掉,如果你MappedFile 最后一個unit不是小于offset 的話,這個MappedFile 就不刪了。

    2. 源碼解析

    我們來看下源碼是怎樣實現的: 在broker 存儲器DefaultMessageStore 啟動(start)的時候,會添加幾個任務調度,其中有一個就是文件清理的:

    private void addScheduleTask() {
        // todo 清理過期文件 每隔10s
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // todo
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
        ...
    }

    默認是10s執行一次,可以看到它調用了DefaultMessageStorecleanFilesPeriodically方法:

    private void cleanFilesPeriodically() {
        // todo 清除CommitLog文件
        this.cleanCommitLogService.run();
        // todo 清除ConsumeQueue文件
        this.cleanConsumeQueueService.run();
    }

    2.1 清理commitlog

    我們先來看下關于commitlog的清理工作:

    public void run() {
        try {
            // todo 刪除過期文件
            this.deleteExpiredFiles();
            this.redeleteHangedFile();
        } catch (Throwable e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    我們看下deleteExpiredFiles 方法的實現:

    private void deleteExpiredFiles() {
        int deleteCount = 0;
        // 文件保留時間,如果超過了該時間,則認為是過期文件,可以被刪除
        long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
        // 刪除物理文件的間隔時間,在一次清除過程中,可能需要被刪除的文件不止一個,該值指定兩次刪除文件的間隔時間
        int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
        // 在清除過期文件時,如
        //果該文件被其他線程占用(引用次數大于0,比如讀取消息),此時會
        //阻止此次刪除任務,同時在第一次試圖刪除該文件時記錄當前時間
        //戳,destroyMapedFileIntervalForcibly表示第一次拒絕刪除之后能
        //保留文件的最大時間,在此時間內,同樣可以被拒絕刪除,超過該時
        //間后,會將引用次數設置為負數,文件將被強制刪除
        int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
        // 指定刪除文件的時間點,RocketMQ通過deleteWhen設置每天在
        //固定時間執行一次刪除過期文件操作,默認凌晨4點
        boolean timeup = this.isTimeToDelete();
        // todo 檢查磁盤空間是否充足,如果磁盤空間不充足,則返回true,表示應該觸發過期文件刪除操作
        boolean spacefull = this.isSpaceToDelete();
        // 預留手工觸發機制,可以通過調用excuteDeleteFilesManualy
        //方法手工觸發刪除過期文件的操作,目前RocketMQ暫未封裝手工觸發
        //文件刪除的命令
        boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
        if (timeup || spacefull || manualDelete) {
            if (manualDelete)
                this.manualDeleteFileSeveralTimes--;
            boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
            log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                fileReservedTime,
                timeup,
                spacefull,
                manualDeleteFileSeveralTimes,
                cleanAtOnce);
            fileReservedTime *= 60 * 60 * 1000;
            // todo 文件的銷毀和刪除
            deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                destroyMapedFileIntervalForcibly, cleanAtOnce);
            if (deleteCount > 0) {
            } else if (spacefull) {
                log.warn("disk space will be full soon, but delete file failed.");
            }
        }
    }

    開始幾個參數,一個是文件保留實效默認是72小時,你可以使用fileReservedTime來配置,一個是刪除文件的間隔100ms,再就是強行銷毀MappedFile120s(這個為啥要強行銷毀,因為它還害怕還有地方用著這個MappedFile,它有個專門的引用計數器,比如說我還有地方要讀它的消息,這個時候計數器就是+1的)。

    接著就是判斷到沒到刪除的那個時間,它默認是凌晨4點才能刪除

    private boolean isTimeToDelete() {
        // 什么時候刪除,默認是凌晨4點 -> 04
        String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
        // 判斷是不是到點了 就是判斷的當前小時 是不是等于 默認的刪除時間
        if (UtilAll.isItTimeToDo(when)) {
            DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
            return true;
        }
        return false;
    }

    再接著就是看看空間是不是充足,看看磁盤空間使用占比是什么樣子的:

    private boolean isSpaceToDelete() {
        // 表示CommitLog文件、ConsumeQueue文件所在磁盤分區的最大使用量,如果超過該值,則需要立即清除過期文件
        double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
        // 表示是否需要立即執行清除過期文件的操作
        cleanImmediately = false;
        {
            // 當前CommitLog目錄所在的磁盤分區的磁盤使用率,通過File#getTotalSpace方法獲取文件所在磁盤分區的總容量,
            //通過File#getFreeSpace方法獲取文件所在磁盤分區的剩余容量
            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
            // diskSpaceWarningLevelRatio:默認0.90。如果磁盤分區使用率超過該閾值,將設置磁盤為不可寫,此時會拒絕寫入新消息
            // 如果當前磁盤分區使用率大于diskSpaceWarningLevelRatio,應該立即啟動過期文件刪除操作
            if (physicRatio > diskSpaceWarningLevelRatio) {
                // 設置 磁盤不可寫
                boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                if (diskok) {
                    DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
                }
                cleanImmediately = true;
            //diskSpaceCleanForciblyRatio:默認0.85 如果磁盤分區使用超過該閾值,建議立即執行過期文件刪除,但不會拒絕寫入新消息
            // 如果當前磁盤分區使用率大于diskSpaceCleanForciblyRatio,建議立即執行過期文件清除
            } else if (physicRatio > diskSpaceCleanForciblyRatio) {
                cleanImmediately = true;
            } else {
                // 設置 磁盤可以寫入
                boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                if (!diskok) {
                    DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
                }
            }
            // 如果當前磁盤使用率小于diskMaxUsedSpaceRatio,則返回false,表示磁盤使用率正常,
            // 否則返回true,需要執行刪除過期文件
            if (physicRatio < 0 || physicRatio > ratio) {
                DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
                return true;
            }
        }
        /**
         * 對consumeQueue 做同樣的判斷
         */
        ...
        return false;
    }

    這里其實不光是判斷 commitlog的存儲區域,后面還有段判斷ConsumeQueue的存儲區域的,然后與這塊邏輯一樣,就沒有放上。這里就是獲取默認的最大使用占比 就是75% ,接著就是看看commitlog 存儲的那地方使用了多少了,如果是使用90% 了,就設置runningFlag 說磁盤滿了,立即清理設置成true,這個參數設置成true之后,就不會管你消息有沒有超過72小時,如果你使用了85% 以上了,也是設置立即清理,如果超過75% 返回true。好了,磁盤占用空間這塊我們就看完了。

    接著看上面deleteExpiredFiles方法實現,還有一個手動清除的,這塊我沒有找到哪里有用到的,如果后續找到,會補充上, 判斷 到了清理的點 或者是磁盤空間滿了 或者是手動刪除了,滿足一個條件就ok了,如果是立即清除是個true,它這里這個cleanAtOnce 變量就是true了,因為前面那個強制清理是默認開啟的。

    接著計算了一下fileReservedTime 就是將小時轉成了毫秒,為了后面好比對,最后就是調用commitlog的deleteExpiredFile 方法清理了:

    /**
     * 刪除過期的文件
     * @param expiredTime 過期時間 默認72小時
     * @param deleteFilesInterval 刪除文件的間隔 100ms
     * @param intervalForcibly  強制刪除 1000 * 120
     * @param cleanImmediately 是不是要一次性清理了
     * @return
     */
    public int deleteExpiredFile(
        final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately
    ) {
        // todo
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }

    可以看到commitlog 對象調用mappedFileQueuedeleteExpiredFileByTime 方法來處理的,這個mappedFileQueue 就是管理了一堆MappedFile:

    /**
     * 刪除文件
     *
     * 從倒數第二個文件開始遍歷,計算文件的最大存活時間,即文件的最后一次更新時間+文件存活時間(默認
     * 72小時),如果當前時間大于文件的最大存活時間或需要強制刪除文
     * 件(當磁盤使用超過設定的閾值)時,執行MappedFile#destory方
     * 法,清除MappedFile占有的相關資源,如果執行成功,將該文件加入
     * 待刪除文件列表中,最后統一執行File#delete方法將文件從物理磁盤
     * 中刪除。
     */
    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        // 拿到mappedFile的引用
        Object[] mfs = this.copyMappedFiles(0);
        if (null == mfs)
            return 0;
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                // 計算文件的最大存活時間,即文件的最后一次更新時間+文件存活時間(默認72小時)
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                // 如果當前時間大于文件的最大存活時間 或 需要強制刪除文件(當磁盤使用超過設定的閾值)時
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    // todo 執行destroy方法
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;
                        // 一批 最多刪除10 個
                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }
                        // 刪除間隔
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }
        // todo 統一執行File#delete方法將文件從物理磁盤中刪除
        deleteExpiredFile(files);
        return deleteCount;
    }

    這里首先是拿到所有MappedFile的引用,然后就是遍歷了,可以看到它這個length是-1的,也就是最后一個MappedFile 是遍歷不到的,這個是肯定的,因為最后一個MappedFile肯定是在用著的,如果你來個強制清理,一下清理了,就沒法提供服務了。

    遍歷的時候,拿到對應MappedFile 里面最后一條消息,看看它的寫入時間是不是已經過了這個過期時間了,或者直接強制刪除,就會執行MappedFile的銷毀方法,而且帶著銷毀時間:

    /**
     * 銷毀方法
     * @param intervalForcibly 表示拒絕被銷毀的最大存活時間
     * @return
     */
    public boolean destroy(final long intervalForcibly) {
        // todo
        this.shutdown(intervalForcibly);
        // 清理結束
        if (this.isCleanupOver()) {
            try {
                // 關閉文件通道,
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");
                long beginTime = System.currentTimeMillis();
                // 刪除物理文件
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeElapsedTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }
            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }
        return false;
    }
    public void shutdown(final long intervalForcibly) {
        // 關閉MappedFile
        if (this.available) {
            this.available = false;
            // 初次關閉的時間戳
            this.firstShutdownTimestamp = System.currentTimeMillis();
            // todo 嘗試釋放資源
            this.release();
        } else if (this.getRefCount() > 0) {
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                this.refCount.set(-1000 - this.getRefCount());
                this.release();
            }
        }
    }

    這里就不詳細說了,其實就是shutdown,然后過了120s后強制把引用清了,之后就是關閉channel,刪除對應文件。

    接著往下說,就是銷毀成功了,會記錄刪除數量,判斷刪了多少了,一批是最多刪10個的,這塊應該是怕影響性能的,你一直刪的的話,這東西很消耗磁盤性能,容易影響其他寫入,讀取功能,如果你銷毀失敗,直接就停了。最后就是將刪除的這些MappedFileMappedFileQueue中刪除掉。再回到commitlog clean servicerun方法:

    public void run() {
        try {
            // todo 刪除過期文件
            this.deleteExpiredFiles();
            // todo 
            this.redeleteHangedFile();
        } catch (Throwable e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    我們deleteExpiredFiles 方法已經介紹完了,然后再來看看第二個方法是干嘛的,這個其實就是判斷第一個MappedFile 還可不可用了,如果不可用的話,就刪了,這塊有可能是上面 deleteExpiredFiles 方法MappedFile銷毀失敗,然后設置了不可用,但是沒有清理掉,所以這塊再來善后下:

    private void redeleteHangedFile() {
        // redeleteHangedFileInterval間隔 默認1000*120
        int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
        // 當前時間戳
        long currentTimestamp = System.currentTimeMillis();
        if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
            this.lastRedeleteTimestamp = currentTimestamp;
            // 獲取強制銷毀Mapped文件間隔
            int destroyMapedFileIntervalForcibly =
                DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            // todo 重新刪除第一個MappedFile 
            if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
            }
        }
    }
    public boolean retryDeleteFirstFile(final long intervalForcibly) {
        // 獲取到 第一個mappedFile
        MappedFile mappedFile = this.getFirstMappedFile();
        if (mappedFile != null) {
            // 不可用
            if (!mappedFile.isAvailable()) {
                log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
                // 銷毀
                boolean result = mappedFile.destroy(intervalForcibly);
                if (result) {
                    log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
                    List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
                    tmpFiles.add(mappedFile);
                    this.deleteExpiredFile(tmpFiles);
                } else {
                    log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
                }
                return result;
            }
        }
        return false;
    }

    這塊就是看第一個MappedFile 還可不可用,不可用的話,就銷毀掉。好了commitlog 文件清理源碼就解析完成了。接下來看下這個ConsumeQueue與indexFile的清理。

    2.2 ConsumeQueue 清理

    private void cleanFilesPeriodically() {
        // todo 清除CommitLog文件
        this.cleanCommitLogService.run();
        // todo 清除ConsumeQueue文件
        this.cleanConsumeQueueService.run();
    }

    DefaultMessageStore.CleanConsumeQueueService#run:

    public void run() {
        try {
            // 刪除 過期的file
            this.deleteExpiredFiles();
        } catch (Throwable e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    接下來DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

    private void deleteExpiredFiles() {
        // 刪除間隔 100
        int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
        // 獲取 commitLog 的最小offset
        long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
        if (minOffset > this.lastPhysicalMinOffset) {
            // 上次 清理 到哪了
            this.lastPhysicalMinOffset = minOffset;
            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
            // 遍歷刪除
            for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    // 進行刪除
                    int deleteCount = logic.deleteExpiredFile(minOffset);
                    // 間隔
                    if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                        try {
                            Thread.sleep(deleteLogicsFilesInterval);
                        } catch (InterruptedException ignored) {
                        }
                    }
                }
            }
            // todo 刪除 過期的 indexFile
            DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
        }
    }

    首先是獲取刪除間隔,然后拿到commitlog中最小的那個offset ,接著就是判斷上次清理位置與最小offset 比較,如果offset 大于它上次清理的位置的話,就說明 它得把最小offset之前的清理掉。先是記錄最后一次清理的offset是最小offset , 接著就是遍歷所有的ConsumeQueue ,調用每個ConsumeQueuedeleteExpiredFile 方法來清理,我們來看下這個方法:

    public int deleteExpiredFile(long offset) {
        // 進行銷毀 然后得到銷毀個數
        int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
        // 糾正最小偏移量
        this.correctMinOffset(offset);
        return cnt;
    }

    CQ_STORE_UNIT_SIZE 這個就是每個unit 占20個字節,見。

    /**
     * 刪除過期的file
     * @param offset 最小offset
     * @param unitSize 大小為20字節
     * @return
     */
    public int deleteExpiredFileByOffset(long offset, int unitSize) {
        Object[] mfs = this.copyMappedFiles(0);
        List<MappedFile> files = new ArrayList<MappedFile>();
        int deleteCount = 0;
        if (null != mfs) {
            int mfsLength = mfs.length - 1;
            for (int i = 0; i < mfsLength; i++) {
                boolean destroy;
                MappedFile mappedFile = (MappedFile) mfs[i];
                // 最后一個單元位置到這個MappedFile結束,其實就是獲取最后一個單元
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
                if (result != null) {
                    // 獲取最大的offset
                    long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                    result.release();
                    // 判斷是否銷毀 如果小于offset 就要銷毀
                    destroy = maxOffsetInLogicQueue < offset;
                    if (destroy) {
                        log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                            + maxOffsetInLogicQueue + ", delete it");
                    }
                } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                    log.warn("Found a hanged consume queue file, attempting to delete it.");
                    destroy = true;
                } else {
                    log.warn("this being not executed forever.");
                    break;
                }
                // 進行銷毀
                if (destroy && mappedFile.destroy(1000 * 60)) {
                    files.add(mappedFile);
                    deleteCount++;
                } else {
                    break;
                }
            }
        }
        // 刪除引用
        deleteExpiredFile(files);
        return deleteCount;
    }

    它的刪除跟commitlog 的差不多,只不過commitlog 是根據時間來判斷的,它是根據commitlog 的offset 來判斷的,判斷要不要刪除這個MappedFile,如果這個MappedFile最后一個unit 存儲的offset 小于 commitlog 最小的offset 的話就要銷毀了。接著就是銷毀,超時時間是1分鐘,最后是刪除引用。

    2.3 indexFile 清理

    最后我們來看下 indexFile的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

    private void deleteExpiredFiles() {
        // 刪除間隔 100
        int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
        // 獲取 commitLog 的最小offset
        long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
        if (minOffset > this.lastPhysicalMinOffset) {
            ...
            // todo 刪除 過期的 indexFile
            DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
        }
    }
    /**
     * 刪除 過期文件
     * @param offset 最小的offset 小于這個offset都要刪除
     */
    public void deleteExpiredFile(long offset) {
        Object[] files = null;
        try {
            // 獲取讀鎖
            this.readWriteLock.readLock().lock();
            if (this.indexFileList.isEmpty()) {
                return;
            }
            // 獲取第一個indexFile 的一個offset
            long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
            if (endPhyOffset < offset) {
                files = this.indexFileList.toArray();
            }
        } catch (Exception e) {
            log.error("destroy exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }
        if (files != null) {
            // 找到需要刪除的indexFile
            List<IndexFile> fileList = new ArrayList<IndexFile>();
            for (int i = 0; i < (files.length - 1); i++) {
                IndexFile f = (IndexFile) files[i];
                if (f.getEndPhyOffset() < offset) {
                    fileList.add(f);
                } else {
                    break;
                }
            }
            // 刪除
            this.deleteExpiredFile(fileList);
        }
    }

    可以看到,先是拿第一個indexFile 看看有沒有小于commitlog 最小offset 的情況發生,這里也是拿的indexFile最后一個offset 做的對比,因為這塊也是按照offset大小 前后順序處理的,最后一個的offest 肯定是這個indexFile中最大的了,如果第一個indexFile滿足了的話,就會拿到所有引用,然后遍歷找出符合條件的indexFile, 調用deleteExpiredFile方法遍歷銷毀:

    private void deleteExpiredFile(List<IndexFile> files) {
        if (!files.isEmpty()) {
            try {
                this.readWriteLock.writeLock().lock();
                for (IndexFile file : files) {
                    // 銷毀
                    boolean destroyed = file.destroy(3000);
                    // 從index 集合中移除
                    destroyed = destroyed && this.indexFileList.remove(file);
                    if (!destroyed) {
                        log.error("deleteExpiredFile remove failed.");
                        break;
                    }
                }
            } catch (Exception e) {
                log.error("deleteExpiredFile has exception.", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }
        }
    }

    這里就是遍歷銷毀,然后移除對這個indexFile管理。

    “RocketMQ broker文件清理源碼分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    偏关县| 奉化市| 蒙山县| 广饶县| 罗源县| 锡林郭勒盟| 平陆县| 盐源县| 廉江市| 九台市| 无锡市| 阿巴嘎旗| 崇义县| 岑巩县| 青岛市| 乌审旗| 石棉县| 施秉县| 保定市| 凤冈县| 渝中区| 和平区| 九龙县| 岗巴县| 绥化市| 绥芬河市| 达拉特旗| 宜春市| 新邵县| 上杭县| 长阳| 尼勒克县| 红桥区| 阳信县| 长武县| 三河市| 浏阳市| 辽阳县| 东丽区| 禹城市| 女性|