您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Redisson分布式信號量RSemaphore如何使用”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Redisson分布式信號量RSemaphore如何使用”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
@Test public void testRSemaphore() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore"); // 設置5個許可,模擬五個停車位 rSemaphore.trySetPermits(5); // 創建10個線程,模擬10輛車過來停車 for (int i = 1; i <= 10; i++) { new Thread(() -> { try { rSemaphore.acquire(); System.out.println(Thread.currentThread().getName() + "進入停車場..."); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)); System.out.println(Thread.currentThread().getName() + "離開停車場..."); rSemaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }, "A" + i).start(); } try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } }
初始化RSemaphore,需要調用trySetPermits()設置許可數量:
/** * 嘗試設置許可數量,設置成功,返回true,否則返回false */ boolean trySetPermits(int permits);
trySetPermits()內部調用了trySetPermitsAsync():
// 異步設置許可 @Override public RFuture<Boolean> trySetPermitsAsync(int permits) { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 判斷分布式信號量的key是否存在,如果不存在,才設置 "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " // set "semaphore" permits // 使用String數據結構設置信號量的許可數 + "redis.call('set', KEYS[1], ARGV[1]); " // 發布一條消息到redisson_sc:{semaphore}通道 + "redis.call('publish', KEYS[2], ARGV[1]); " // 設置成功,返回1 + "return 1;" + "end;" // 否則返回0 + "return 0;", Arrays.asList(getRawName(), getChannelName()), permits); if (log.isDebugEnabled()) { future.thenAccept(r -> { if (r) { log.debug("permits set, permits: {}, name: {}", permits, getName()); } else { log.debug("unable to set permits, permits: {}, name: {}", permits, getName()); } }); } return future; }
可以看到,設置許可數量底層使用LUA腳本,實際上就是使用redis的String數據結構,保存了我們指定的許可數量。如下圖:
參數說明:
KEYS[1]: 我們指定的分布式信號量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
KEYS[2]: 釋放鎖的channel名稱,redisson_sc:{分布式信號量key},在本例中,就是redisson_sc:{semaphore}
ARGV[1]: 設置的許可數量
總結設置許可執行流程為:
get semaphore,獲取到semaphore信號量的當前的值
第一次數據為0, 然后使用set semaphore 3,將這個信號量同時能夠允許獲取鎖的客戶端的數量設置為3。(注意到,如果之前設置過了信號量,將無法再次設置,直接返回0。想要更改信號量總數可以使用addPermits方法)
然后redis發布一些消息,返回1
許可數量設置好之后,我們就可以調用acquire()方法獲取了,如果未傳入許可數量,默認獲取一個許可。
public void acquire() throws InterruptedException { acquire(1); } public void acquire(int permits) throws InterruptedException { // 嘗試獲取鎖成功,直接返回 if (tryAcquire(permits)) { return; } // 對于沒有獲取鎖的那些線程,訂閱redisson_sc:{分布式信號量key}通道的消息 CompletableFuture<RedissonLockEntry> future = subscribe(); semaphorePubSub.timeout(future); RedissonLockEntry entry = commandExecutor.getInterrupted(future); try { // 不斷循環嘗試獲取許可 while (true) { if (tryAcquire(permits)) { return; } entry.getLatch().acquire(); } } finally { // 取消訂閱 unsubscribe(entry); } // get(acquireAsync(permits)); }
可以看到,獲取許可的核心邏輯在tryAcquire()方法中,如果tryAcquire()返回true說明獲取許可成功,直接返回;如果返回false,說明當前沒有許可可以使用,則對于沒有獲取鎖的那些線程,訂閱redisson_sc:{分布式信號量key}通道的消息,并通過死循環不斷嘗試獲取鎖。
我們看一下tryAcquire()方法的邏輯,內部調用了tryAcquireAsync()方法:
// 異步獲取許可 @Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return new CompletableFutureWrapper<>(true); } return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 獲取當前剩余的許可數量 "local value = redis.call('get', KEYS[1]); " + // 許可不為空,并且許可數量 大于等于 當前線程申請的許可數量 "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + // 通過decrby減少剩余可用許可 "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + // 返回1 "return 1; " + "end; " + // 其它情況,返回0 "return 0;", Collections.<Object>singletonList(getRawName()), permits); }
從源碼可以看到,獲取許可就是操作redis中的數據,首先獲取到redis中剩余的許可數量,只有當剩余的許可數量大于線程申請的許可數量時,才獲取成功,返回1;否則獲取失敗,返回0;
總結加鎖執行流程為:
get semaphore,獲取到一個當前的值,比如說是3,3 > 1
decrby semaphore 1,將信號量允許獲取鎖的客戶端的數量遞減1,變成2
decrby semaphore 1
decrby semaphore 1
執行3次加鎖后,semaphore值為0
此時如果再來進行加鎖則直接返回0,然后進入死循環去獲取鎖
通過前面對RSemaphore獲取鎖的分析,我們很容易能猜到,釋放鎖,無非就是歸還許可數量到redis中。我們查看具體的源碼:
public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return new CompletableFutureWrapper<>((Void) null); } RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, // 通過incrby增加許可數量 "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + // 發布一條消息到redisson_sc:{semaphore}中 "redis.call('publish', KEYS[2], value); ", Arrays.asList(getRawName(), getChannelName()), permits); if (log.isDebugEnabled()) { future.thenAccept(o -> { log.debug("released, permits: {}, name: {}", permits, getName()); }); } return future; }
讀到這里,這篇“Redisson分布式信號量RSemaphore如何使用”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。