您好,登錄后才能下訂單哦!
使用Redisson怎么實現一個分布式鎖,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時,重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細節進行一一分析。
鎖的接口定義了一下方法:
分布式鎖當中加鎖,我們常用的加鎖接口:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面我們來看一下方法的具體實現:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time = 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先我們看到調用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據鎖名稱的過期時間TTL來判定的(TTL
下面我們接著看一下tryAcquire的實現:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
可以看到真正獲取鎖的操作經過一層get操作里面執行的,這里為何要這么操作,本人也不是太理解,如有理解錯誤,歡迎指正。
get 是由CommandAsyncExecutor(一個線程Executor)封裝的一個Executor
設置一個單線程的同步控制器CountDownLatch,用于控制單個線程的中斷信息。個人理解經過中間的這么一步:主要是為了支持線程可中斷操作。
public V get(RFuture future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個線程的阻塞問題,每分鐘高達200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
我們進一步往下看:
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判斷鎖是否有超時時間,有過期時間的話,會在后面獲取鎖的時候設置進去。沒有過期時間的話,則會用默認的
private long lockWatchdogTimeout = 30 * 1000;
下面我們在進一步往下分析真正獲取鎖的操作:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把里面的重點信息做了以下三點總結:
1:真正執行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執行。
2:鎖真正持久化到Redis時,用的hash類型key field value
3:獲取鎖的三個參數:getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時間;getLockName則是鎖對應的線程級別的名稱,因為支持相同線程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學可能會問,這樣不是很縝密:不同的JVM可能會生成相同的threadId,所以Redission這里加了一個區分度很高的UUID;
Lua腳本中的執行分為以下三步:
1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時把邏輯鎖名稱KEYS[1],線程級別的鎖名稱[ARGV[2],value=1,設置到redis。并設置邏輯鎖名稱的過期時間ARGV[2],返回;
2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時會自增對應的value值,記錄重入次數;并更新鎖的過期時間
3:key不存,直接返回key的剩余過期時間(-2)
關于使用Redisson怎么實現一個分布式鎖問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。