中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用Redisson怎么實現一個分布式鎖

發布時間:2021-06-17 11:18:41 來源:億速云 閱讀:154 作者:Leah 欄目:編程語言

使用Redisson怎么實現一個分布式鎖,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時,重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細節進行一一分析。

鎖的接口定義了一下方法:

使用Redisson怎么實現一個分布式鎖

分布式鎖當中加鎖,我們常用的加鎖接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我們來看一下方法的具體實現:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return true;
  }
  
  time -= (System.currentTimeMillis() - current);
  if (time <= 0) {
   acquireFailed(threadId);
   return false;
  }
  
  current = System.currentTimeMillis();
  final RFuture subscribeFuture = subscribe(threadId);
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
   if (!subscribeFuture.cancel(false)) {
    subscribeFuture.addListener(new FutureListener() {
     @Override
     public void operationComplete(Future future) throws Exception {
      if (subscribeFuture.isSuccess()) {
       unsubscribe(subscribeFuture, threadId);
      }
     }
    });
   }
   acquireFailed(threadId);
   return false;
  }

  try {
   time -= (System.currentTimeMillis() - current);
   if (time <= 0) {
    acquireFailed(threadId);
    return false;
   }
  
   while (true) {
    long currentTime = System.currentTimeMillis();
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     return true;
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time = 0 && ttl < time) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
     acquireFailed(threadId);
     return false;
    }
   }
  } finally {
   unsubscribe(subscribeFuture, threadId);
  }
//  return get(tryLockAsync(waitTime, leaseTime, unit));
 }

首先我們看到調用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據鎖名稱的過期時間TTL來判定的(TTL

下面我們接著看一下tryAcquire的實現:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
 return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正獲取鎖的操作經過一層get操作里面執行的,這里為何要這么操作,本人也不是太理解,如有理解錯誤,歡迎指正。

get 是由CommandAsyncExecutor(一個線程Executor)封裝的一個Executor

設置一個單線程的同步控制器CountDownLatch,用于控制單個線程的中斷信息。個人理解經過中間的這么一步:主要是為了支持線程可中斷操作。

public V get(RFuture future) {
 if (!future.isDone()) {
  final CountDownLatch l = new CountDownLatch(1);
  future.addListener(new FutureListener() {
   @Override
   public void operationComplete(Future future) throws Exception {
    l.countDown();
   }
  });
  
  boolean interrupted = false;
  while (!future.isDone()) {
   try {
    l.await();
   } catch (InterruptedException e) {
    interrupted = true;
   }
  }
  
  if (interrupted) {
   Thread.currentThread().interrupt();
  }
 }

 // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個線程的阻塞問題,每分鐘高達200毫秒
 // future.awaitUninterruptibly();
 if (future.isSuccess()) {
  return future.getNow();
 }

 throw convertException(future);
}

我們進一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 if (leaseTime != -1) {
  return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.addListener(new FutureListener() {
  @Override
  public void operationComplete(Future future) throws Exception {
   if (!future.isSuccess()) {
    return;
   }

   Long ttlRemaining = future.getNow();
   // lock acquired
   if (ttlRemaining == null) {
    scheduleExpirationRenewal(threadId);
   }
  }
 });
 return ttlRemainingFuture;
}

首先判斷鎖是否有超時時間,有過期時間的話,會在后面獲取鎖的時候設置進去。沒有過期時間的話,則會用默認的

private long lockWatchdogTimeout = 30 * 1000;

下面我們在進一步往下分析真正獲取鎖的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
     "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重點信息做了以下三點總結:

1:真正執行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執行。

2:鎖真正持久化到Redis時,用的hash類型key field value

3:獲取鎖的三個參數:getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時間;getLockName則是鎖對應的線程級別的名稱,因為支持相同線程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學可能會問,這樣不是很縝密:不同的JVM可能會生成相同的threadId,所以Redission這里加了一個區分度很高的UUID;

Lua腳本中的執行分為以下三步:

1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時把邏輯鎖名稱KEYS[1],線程級別的鎖名稱[ARGV[2],value=1,設置到redis。并設置邏輯鎖名稱的過期時間ARGV[2],返回;

2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時會自增對應的value值,記錄重入次數;并更新鎖的過期時間

3:key不存,直接返回key的剩余過期時間(-2)

關于使用Redisson怎么實現一個分布式鎖問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

湖北省| 广宁县| 揭阳市| 二连浩特市| 黄石市| 邵阳县| 江山市| 金门县| 北辰区| 沙田区| 营山县| 安福县| 夏河县| 安乡县| 元朗区| 高密市| 盐城市| 自治县| 朝阳市| 平顶山市| 吴桥县| 泌阳县| 城口县| 闻喜县| 闵行区| 开化县| 安阳市| 罗源县| 连云港市| 龙南县| 宁海县| 桦甸市| 乌拉特前旗| 广饶县| 凤凰县| 松溪县| 密山市| 墨玉县| 遵义市| 永定县| 合水县|