中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Redisson如何實現分布式鎖、鎖續約

發布時間:2023-03-07 11:52:05 來源:億速云 閱讀:302 作者:iii 欄目:開發技術

這篇文章主要介紹了Redisson如何實現分布式鎖、鎖續約的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Redisson如何實現分布式鎖、鎖續約文章都會有所收獲,下面我們一起來看看吧。

一、基礎

0)Redisson版本說明、案例

使用當前(2022年12月初)最新的版本:3.18.1;

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.18.1</version>
</dependency>

案例

案例采用redis-cluster集群的方式;

public class Main {
    public static void main(String[] args) throws Exception {
        // 1.配置Redis-Cluster集群節點的ip和port 
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress("redis://127.0.0.1:7001")
                .addNodeAddress("redis://127.0.0.1:7002")
                .addNodeAddress("redis://127.0.0.1:7003")
                .addNodeAddress("redis://127.0.0.1:7004");
        // 2.創建Redisson的客戶端 
        RedissonClient redisson = Redisson.create(config);
        // 3.測試Redisson可重?鎖的加鎖、釋放鎖
        testLock(redisson);
    }

    private static void testLock(RedissonClient redisson) throws InterruptedException {
        // 1.獲取key為"anyLock"的鎖對象
        final RLock lock = redisson.getLock("test_lock");
        boolean locked = true;
        try {
            //2.1:加鎖 
            lock.lock();
            // 2.2:加鎖,并設置嘗試獲取鎖超時時間30s、鎖超時?動釋放的時間10s 
//            locked = lock.tryLock(30, 10, TimeUnit.SECONDS);
            if (locked)
                System.out.println("加鎖成功!" + new Date());
            
            Thread.sleep(20 * 1000);
            System.out.println("鎖邏輯執行完畢!" + new Date());

        } finally {
            // 3.釋放鎖 
            lock.unlock();
        }
    }
}

1)Redisson連接Redis的方式

redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群;在分布式鎖的實現上區別在于hash槽的獲取方式。

具體配置方式見Redisson的GitHub

Redisson如何實現分布式鎖、鎖續約

2)用到的Redis命令

分布式鎖主要需要以下redis命令:

EXISTS key:當 key 存在,返回1;不存在,返回0。

GETSET key value:將給定 key 的值設為 value ,并返回 key 的舊值 (old value);當 key 存在但不是字符串類型時,返回一個錯誤;當key不存在時,返回nil。

GET key:返回 key 所關聯的字符串值,如果 key 不存在那么返回 nil。

DEL key [KEY &hellip;]:刪除給定的一個或多個 key(不存在的 key 會被忽略),返回實際刪除的key的個數(integer)。

DEL key1 key2 key3

HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值并返回1;如果field已有,那么就更新value的值,并返回0。

HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field有一個不存在返回0。

HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment;

如果鍵key不存在,一個保存了哈希對象{field=value}的key將被創建;如果字段field不存在,在進行當前操作前,feild將被創建,且對應的值被置為0;返回值是increment。

PEXPIRE key milliseconds:設置存活時間,單位是毫秒。EXPIRE操作單位是秒。

PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。

3)用到的lua腳本語義

Redisson源碼中,執行redis命令的是lua腳本,其中主要有如下幾個概念:

  • redis.call():執行redis命令。

  • KEYS[n]:指腳本中第n個參數,比如KEYS[1]指腳本中的第一個參數。

  • ARGV[n]:指腳本中第n個參數的值,比如ARGV[1]指腳本中的第一個參數的值。

  • 返回值中nil與false同一個意思。

在redis執行lua腳本時,相當于一個redis級別的鎖,不能執行其他操作,類似于原子操作,這也是redisson實現的一個關鍵點。

另外,如果lua腳本執行過程中出現了異常或者redis服務器宕機了,會將腳本中已經執行的命令在AOF、RDB日志中刪除;即LUA腳本執行報錯會進行回滾操作。

二、源碼分析

1、RLock

Redisson如何實現分布式鎖、鎖續約

RLock接口主要繼承了Lock接口,并擴展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime參數,用來設置鎖的過期時間,如果超過leaseTime還沒有解鎖的話,redis就強制解鎖;leaseTime的默認時間是30s。

獲取RLock對象

RLock lock = redissonClient.getLock("test_lock");

RLock對象表示?個鎖對象,我們要某一個key加鎖時,需要先獲取?個鎖對象。

Redisson如何實現分布式鎖、鎖續約

這里并沒有具體請求Redis進行加鎖的邏輯,而只是調用RedissonLock的構造函數,設置一些變量。

2、加鎖流程

進入到Rlock#lock()方法,先看主流程;關于競爭鎖等待時間、鎖超時釋放時間的配置、使用,在流程中穿插著聊。

0)加鎖流程圖

Redisson如何實現分布式鎖、鎖續約

1)加鎖到哪臺機器

lock()方法執行鏈路:

Redisson如何實現分布式鎖、鎖續約

走到這里,已經可以看到加鎖的底層邏輯:LUA腳本。

而lua腳本只是??串字符串,作為evalWriteAsync()?法的?個參數?已;所以下?步進到evalWriteAsync()?法中:

Redisson如何實現分布式鎖、鎖續約

走到這里會調用ConnectionManager#getEntry(String)方法;

在創建RedissonClient時,筆者配置的是Redis-Cluster,而走到這里卻會進入到MasterSlaveConnectionManager,實際上實例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager繼承自MasterSlaveConnectionManager,并且ClusterConnectionManager沒有重寫getEntry(String)方法,所以會進入到MasterSlaveConnectionManager#getEntry(String)方法。

ConnectionManager#getEntry(String)方法會根據傳入的key名稱找到相應的Redis節點、目標master。

Redis-Cluster集群中的數據分布式是 通過?個?個的hash slot來實現的,Redis-Cluster集群總共16384個hash slot,它們都 會被均勻分布到所有的master節點上;這里ClusterConnectionManager通過key名稱計算出相應的hash slot方式如下:

?先通過key計算出CRC16值,然后 CRC16值對16384進?取模,進?得到hash slot。

@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }

    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        if (end != -1 && start + 1 < end) {
            key = key.substring(start + 1, end);
        }
    }

    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
}

這?計算出key的hash slot之后,就可以通過hash slot 去看?看哪個master上有這個hash slot,如果某個master上有個這個hash slot,那么這個 key當然就會落到該master節點上,執?加鎖指令也就應該在該master上執?。

下面進入本文重點,可重入鎖的各種加鎖、釋放鎖。

2)Client第一次加鎖

在尋找應該在哪臺Redis機器上加鎖時,在RedissonLock#tryLockInnerAsync()方法中我們看到了一堆LUA腳本:

Redisson如何實現分布式鎖、鎖續約

LUA腳本參數解析:

  • KEYS[1] 表示的是 getName() ,即鎖key的名稱,比如案例中的 test_lock;

  • ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s;

  • ARGV[2] 表示的是 getLockName(threadId) ,唯一標識當前訪問線程,使用鎖對象id+線程id(UUID:ThreadId)方式表示,用于區分不同服務器上的線程。

    • UUID用來唯?標識?個客戶端,因為會有多個客戶端的多個線程加鎖;

    • 結合起來的UUID:ThreadId 表示:具體哪個客戶端上的哪個線程過來加鎖,通 過這樣的組合?式唯?標識?個線程。

LUA腳本邏輯:

  • 如果鎖名稱不存在;

    • 則向redis中添加一個key為test_lock的HASH結構、添加一個field為線程id,值=1的鍵值對{field:increment},表示此線程的重入次數為1;

    • 設置test_lock的過期時間,防止當前服務器出問題后導致死鎖,然后return nil; end;返回nil,lua腳本執行完畢;

  • 如果鎖存在,檢測當前線程是否持有鎖;

    • 如果是當前線程持有鎖,hincrby將該線程重入的次數++;并重新設置鎖的過期時間;返回nil,lua腳本執行完畢;

    • 如果不是當前線程持有鎖,pttl返回鎖的過期時間,單位ms。

第一次加鎖時,key肯定不存在與master節點上;

會執行下列LUA腳本對應的Redis指令:

hset test_lock UUID:ThreadId 1 
pexpire test_lock 30000

此時,Redis中多一個Hash結構的key(test_lock):

test_lock : 
{
    UUID:ThreadId:1
}

這里的1使用來做鎖重入的。

pexpire指令為test_lock這個key設置過期時間為30s,即:30s后這個key會?動過期被刪除,key對應的鎖在那時也就被釋放了。

總體來看,加鎖的邏輯很簡單:

在key對應的hash數據結構中記錄了? 下當前是哪個客戶端的哪個線程過來加鎖了,然后設置了?下key的過期時間為30s。 3)加鎖成功之后的鎖續約

成功加鎖后,lua腳本返回nil,即null。

Redisson如何實現分布式鎖、鎖續約

加鎖成功之后,tryLockInnerAsync()?法返回;再結合Java8的Stream,對加鎖結果進一步處理;

因為加鎖成功后返回的是nil,這是lua腳本的返回形式,體現到java代碼中的返回值為:null。
又由于RLock#lock()方法傳入的leaseTime是-1,所以進入到scheduleExpirationRenewal(long)方法做鎖續約。

Redisson如何實現分布式鎖、鎖續約

renewExpirationAsync()方法負責做具體的鎖續約:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

這里LUA腳本的邏輯很簡單:

  • 判斷當前key中,是否還被線程UUID:ThreadId持有鎖,持有則設置過期時間為30s(續命)。

鎖續約(看門狗機制)其實就是每次加鎖成功后,會?上開啟?個后臺線程, 每隔10s檢查?下key是否存在,如果存在就為key續期30s。

  • 這里的10s,取自配置的lockWatchdogTimeout參數,默認為30 * 1000 ms;

  • 所以?個key往往當過期時間慢慢消逝到20s左右時就?會被定時任務重置為了30s,這樣就能保證:只要這個定時任務還在、這個key還在,就?直維持加鎖。

如果當前持有鎖的線程被中斷了,會停止鎖續約,即殺死看門狗;

Redisson如何實現分布式鎖、鎖續約

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

所謂的停止鎖續約,實際就是將當前線程的threadId從看門狗緩存中移除,后續在執行鎖續約時,如果發現看門狗緩存中已經沒有了當前線程threadId,則直接退出鎖續約 并且 不再延時10s開啟一個定時任務。

如果加鎖時指定了leaseTime > 0,則不會開門狗機制,表示強制鎖leaseTime 毫秒后過期。一共有三種加鎖方式可以做到,如下:

  • RLock#lock(long leaseTime, TimeUnit unit)

  • RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)

  • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

4)重入加鎖(相同線程多次加鎖)

再次回到加鎖的LUA腳本:

Redisson如何實現分布式鎖、鎖續約

同一個線程對分布式鎖多次加鎖時,會走以下邏輯:

  • 判斷當前key是否被當前線程持有,如果是則增加鎖重入的次數,并重新設置鎖的過期時間為30s;

對應的Redis命令為:

hexists test_lock UUID:ThreadId
hincrby test_lock UUID:ThreadId 1
pexpire test_lock 30000

此時Redis中test_key對應的數據結構從

test_lock : 
{
    UUID:ThreadId:1
}

變成:

test_lock : 
{
    UUID:ThreadId:2
}

并將key的過期時間重新設置為30s。

鎖重入成功之后,后臺也會開啟?個watchdog后臺線程做鎖續約,每隔10s檢查?下key,如果key存在就將key的過期時間重新設置為30s。

Redisson可重?加鎖的語義,實際是通過Hash結構的key中某個線程(UUID:ThreadId)對應的加鎖次數來表示的。

5)鎖競爭(其他線程加鎖失敗)

再再次回到加鎖的LUA腳本:

Redisson如何實現分布式鎖、鎖續約

如果分布式鎖已經被其他線程持有,LUA腳本會執行以下邏輯:

返回當前key的剩余存活時間,因為不是返回nil,也就代表著加鎖失敗;

對應的Redis的命令為:

pttl test_lock

針對加鎖方式的不同,加鎖失敗的邏輯也不同;可以分兩大類:指定了加鎖失敗的等待時間waitTime和未指定waitTime。

  • 未執行加鎖失敗的等待時間waitTime:獲取分布式鎖失敗會一直重試,直到獲取鎖成功。比如下列加鎖方法:

    • Rlock#lock():一直嘗試獲取分布式鎖,直到獲取鎖成功。

    • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

    • RLock#lock(long leaseTime, TimeUnit unit)

  • 指定了加鎖失敗的等待時間waitTime:獲取分布式鎖會超時,超時之后返回加鎖失敗;

    • Rlock#tryLock(long waitTime, TimeUnit unit):指定獲取鎖失敗的等待時間。在等待時間范圍之內進行重試,超時則返回加鎖失敗。

    • Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit):同樣是指定獲取鎖失敗的等待時間,并且強制指定鎖過期的時間(不開啟看門狗)。在等待時間范圍之內進行重試,超時則返回加鎖失敗。

可以簡單的概述為RLock接口下的tryLock()方法獲取鎖會失敗,lock()方法獲取鎖一定會成功。

1> 一直重試直到加鎖成功

Rlock#lock()方法為例:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    pubSub.timeout(future);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            // lock() 或 lockInterruptibly()為入口走到這里時。leaseTime為-1,表示會開始開門狗;如果leaseTime大于0,則不會開啟開門狗;
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 因為Semaphore的可用資源為0,所以這里就等價于Thread.sleep(ttl);
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
}

首先訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}),其他線程解鎖后,會發布解鎖的消息;這里收到消息會立即嘗試獲取鎖;訂閱解鎖channel的超時時間默認為7.5s。也就說獲取鎖失敗7.5s之內,如果其他線程釋放鎖,當前線程可以立即嘗試獲取到鎖。

獲取鎖失敗之后會進??個while死循環中:

每休息鎖的存活時間ttl之后,就嘗試去獲取鎖,直到成功獲取到鎖才會跳出while死循環。

2> 等待鎖超時返回加鎖失敗

Rlock#tryLock(long waitTime, TimeUnit unit)為例:

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }

    // 獲取鎖剩余的等待時長
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 獲取鎖超時,返回獲取分布式鎖失敗
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 訂閱解鎖channel的超時時長為 獲取鎖剩余的等待時長
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                "Unable to acquire subscription lock after " + time + "ms. " +
                        "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 收到解鎖channel的消息之后,走到這里,再次判斷獲取鎖等待時長是否超時
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
    
        // while循環中嘗試去獲取鎖
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // waiting for message
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 如果獲取鎖失敗后,鎖存活時長 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 如果獲取鎖失敗后,鎖存活時間 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

加鎖存在超時時間 相比于 一直重試直到加鎖成功,只是多一個時間限制,具體差異體現在:訂閱解鎖channel的超時時長、獲取鎖失敗后線程的睡眠時長、重試獲取鎖次數的限制;

獲取分布式鎖失敗之后,立即判斷當前獲取鎖是否超時,如果超時,則返回加鎖失敗;
否者,訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}),其他線程解鎖后,會發布解鎖的消息;
訂閱解鎖channel的超時時間為 獲取鎖剩余的等待時長。 在這個時間范圍之內,如果其他線程釋放鎖,當前線程收到解鎖channel的消息之后再次判斷獲取鎖是否超時,如果不超時,嘗試獲取鎖。
獲取鎖之后會進??個while死循環中: 如果獲取鎖超時,則返回加鎖失敗;
否者讓線程睡眠: 如果鎖存活時長ttl 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長;
如果鎖存活時間ttl 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長;
線程睡眠完之后,判斷獲取鎖是否超時,不超時則嘗試去獲取鎖。

3、釋放鎖流程

1)Client主動嘗試釋放鎖

進入到Rlock#unlock()方法;

Redisson如何實現分布式鎖、鎖續約

和加鎖的方式?樣,釋放鎖也是通過lua腳本來完成的;

LUA腳本參數解析:

  • KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock;

  • KEYS[2] 表示getChanelName() 表示的是發布訂閱過程中使用的Chanel;

  • ARGV[1] 表示的是LockPubSub.unLockMessage,解鎖消息,實際代表的是數字 0,代表解鎖消息;

  • ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s;

  • ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用鎖對象id+線程id, 表示當前訪問線程,用于區分不同服務器上的線程。

LUA腳本邏輯:

  • 如果鎖名稱不存在;

  • 可能是因為鎖過期導致鎖不存在,也可能是并發解鎖。

  • 則發布鎖解除的消息,返回1,lua腳本執行完畢;

  • 如果鎖存在,檢測當前線程是否持有鎖;

  • 如果是當前線程持有鎖,定義變量counter,接收執行incrby將該線程重入的次數&ndash;的結果;

  • 如果重入次數大于0,表示該線程還有其他任務需要執行;重新設置鎖的過期時間;返回0,lua腳本執行完畢;

  • 否則表示該線程執行結束,del刪除該鎖;并且publish發布該鎖解除的消息;返回1,lua腳本執行完畢;

  • 如果不是當前線程持有鎖 或 其他情況,都返回nil,lua腳本執行完畢。

腳本執行結束之后,如果返回值不是0或1,即當前線程去釋放其他線程的加鎖時,拋出異常。

通過LUA腳本釋放鎖成功之后,會將看門狗殺死;

Redisson如何實現分布式鎖、鎖續約

2)Client主動強制釋放鎖

forceUnlockAsync()方法被調用的地方很多,大多都是在清理資源時刪除鎖。

@Override
public RFuture<Boolean> forceUnlockAsync() {
    cancelExpirationRenewal(null);
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('del', KEYS[1]) == 1) then "
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    + "return 1 "
                    + "else "
                    + "return 0 "
                    + "end",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}

LUA腳本邏輯:

邏輯比較簡單粗暴:刪除鎖成功則并發布鎖被刪除的消息,返回1結束,否則返回0結束。

3)Client宕機,鎖超時釋放

如果Redisson客戶端剛加鎖成功,并且未指定releaseTime,后臺會啟動一個定時任務watchdog每隔10s檢查key:key如果存在就為它?動續命到30s;在watchdog定時任務存在的情況下,如果不是主動釋放鎖,那么key將會?直的被watchdog這個定時任務維持加鎖。

但是如果客戶端宕機了,定時任務watchdog也就沒了,也就沒有鎖續約機制了,那么過完30s之后,key會?動被刪除、key對應的鎖也自動被釋放了。

4)不啟動鎖續約的超時釋放鎖

如果在加鎖時指定了leaseTime,加鎖成功之后,后臺并不會啟動一個定時任務watchdog做鎖續約;key存活leaseTime 毫秒之后便會自動被刪除、key對應的鎖也就自動被釋放了;無論當前線程的業務邏輯是否執行完畢。

比如使用如下方式加鎖:

  • RLock#lock(long leaseTime, TimeUnit unit)

  • RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)

  • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

關于“Redisson如何實現分布式鎖、鎖續約”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Redisson如何實現分布式鎖、鎖續約”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

敦煌市| 磐石市| 绥滨县| 东海县| 东至县| 溧水县| 丰台区| 旬阳县| 东城区| 永平县| 鲁甸县| 庆阳市| 义乌市| 庄浪县| 凌海市| 喜德县| 徐水县| 青神县| 元朗区| 广灵县| 富蕴县| 七台河市| 水城县| 新源县| 石门县| 太白县| 团风县| 璧山县| 镇康县| 平顶山市| 博爱县| 昌乐县| 彰武县| 西贡区| 温宿县| 渭源县| 仙桃市| 秦皇岛市| 青铜峡市| 潮安县| 巍山|