中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么在SpringBoot中使用Redis實現分布式鎖

發布時間:2023-03-29 11:20:22 來源:億速云 閱讀:135 作者:iii 欄目:開發技術

這篇文章主要介紹了怎么在SpringBoot中使用Redis實現分布式鎖的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么在SpringBoot中使用Redis實現分布式鎖文章都會有所收獲,下面我們一起來看看吧。

一、Redis實現分布式鎖原理

為什么需要分布式鎖

在聊分布式鎖之前,有必要先解釋一下,為什么需要分布式鎖

與分布式鎖相對就的是單機鎖,我們在寫多線程程序時,避免同時操作一個共享變量產生數據問題,通常會使用一把鎖來互斥以保證共享變量的正確性,其使用范圍是在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥呢?現在的業務應用通常是微服務架構,這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序導致臟數據,此時就需要引入分布式鎖了。

怎么在SpringBoot中使用Redis實現分布式鎖

想要實現分布式鎖,必須借助一個外部系統,所有進程都去這個系統上申請加鎖。而這個外部系統,必須要實現互斥能力,即兩個請求同時進來,只會給一個進程加鎖成功,另一個失敗。這個外部系統可以是數據庫,也可以是Redis或Zookeeper,但為了追求性能,我們通常會選擇使用Redis或Zookeeper來做。

Redis本身可以被多個客戶端共享訪問,正好就是一個共享存儲系統,可以用來保存分布式鎖。而且 Redis 的讀寫性能高,可以應對高并發的鎖操作場景。本文主要探討如何基于Redis實現分布式鎖以及實現過程中可能面臨的問題。

分布式鎖如何實現

作為分布式鎖實現過程中的共享存儲系統,Redis可以使用鍵值對來保存鎖變量,在接收和處理不同客戶端發送的加鎖和釋放鎖的操作請求。那么,鍵值對的鍵和值具體是怎么定的呢?我們要賦予鎖變量一個變量名,把這個變量名作為鍵值對的鍵,而鎖變量的值,則是鍵值對的值,這樣一來,Redis就能保存鎖變量了,客戶端也就可以通過Redis的命令操作來實現鎖操作。

想要實現分布式鎖,必須要求Redis有互斥的能力。可以使用SETNX命令,其含義是SET IF NOT EXIST,即如果key不存在,才會設置它的值,否則什么也不做。兩個客戶端進程可以執行這個命令,達到互斥,就可以實現一個分布式鎖。

以下展示了Redis使用key/value對保存鎖變量,以及兩個客戶端同時請求加鎖的操作過程。

怎么在SpringBoot中使用Redis實現分布式鎖

加鎖操作完成后,加鎖成功的客戶端,就可以去操作共享資源,例如,修改MySQL的某一行數據。操作完成后,還要及時釋放鎖,給后來者讓出操作共享資源的機會。如何釋放鎖呢?直接使用DEL命令刪除這個key即可。這個邏輯非常簡單,整體的流程寫成偽代碼就是下面這樣。

// 加鎖
SETNX lock_key 1
// 業務邏輯
DO THINGS
// 釋放鎖
DEL lock_key

但是,以上實現存在一個很大的問題,當客戶端1拿到鎖后,如果發生下面的場景,就會造成死鎖。

程序處理業務邏輯異常,沒及時釋放鎖進程掛了,沒機會釋放鎖

以上情況會導致已經獲得鎖的客戶端一直占用鎖,其他客戶端永遠無法獲取到鎖。

如何避免死鎖

為了解決以上死鎖問題,最容易想到的方案是在申請鎖時,在Redis中實現時,給鎖設置一個過期時間,假設操作共享資源的時間不會超過10s,那么加鎖時,給這個key設置10s過期即可。

但以上操作還是有問題,加鎖、設置過期時間是2條命令,有可能只執行了第一條,第二條卻執行失敗,例如:

1.SETNX執行成功,執行EXPIRE時由于網絡問題,執行失敗
2.SETNX執行成功,Redis異常宕機,EXPIRE沒有機會執行
3.SETNX執行成功,客戶端異常崩潰,EXPIRE沒有機會執行

總之這兩條命令如果不能保證是原子操作,就有潛在的風險導致過期時間設置失敗,依舊有可能發生死鎖問題。幸好在Redis 2.6.12之后,Redis擴展了SET命令的參數,可以在SET的同時指定EXPIRE時間,這條操作是原子的,例如以下命令是設置鎖的過期時間為10秒。

SET lock_key 1 EX 10 NX

至此,解決了死鎖問題,但還是有其他問題。想像下面這個這樣一種場景:

怎么在SpringBoot中使用Redis實現分布式鎖

  1. 客戶端1加鎖成功,開始操作共享資源

  2. 客戶端1操作共享資源耗時太久,超過了鎖的過期時間,鎖失效(鎖被自動釋放)

  3. 客戶端2加鎖成功,開始操作共享資源

  4. 客戶端1操作共享資源完成,在finally塊中手動釋放鎖,但此時它釋放的是客戶端2的鎖。

這里存在兩個嚴重的問題:

  • 鎖過期

  • 釋放了別人的鎖

第1個問題是評估操作共享資源的時間不準確導致的,如果只是一味增大過期時間,只能緩解問題降低出現問題的概率,依舊無法徹底解決問題。原因在于客戶端在拿到鎖之后,在操作共享資源時,遇到的場景是很復雜的,既然是預估的時間,也只能是大致的計算,不可能覆蓋所有導致耗時變長的場景

第2個問題是釋放了別人的鎖,原因在于釋放鎖的操作是無腦操作,并沒有檢查這把鎖的歸屬,這樣解鎖不嚴謹。如何解決呢?

鎖被別人給釋放了

解決辦法是,客戶端在加鎖時,設置一個只有自己知道的唯一標識進去,例如可以是自己的線程ID,如果是redis實現,就是SET key unique_value EX 10 NX。之后在釋放鎖時,要先判斷這把鎖是否歸自己持有,只有是自己的才能釋放它。

//釋放鎖 比較unique_value是否相等,避免誤釋放
if redis.get("key") == unique_value then
    return redis.del("key")

這里釋放鎖使用的是GET + DEL兩條命令,這時又會遇到原子性問題了。

  1. 客戶端1執行GET,判斷鎖是自己的

  2. 客戶端2執行了SET命令,強制獲取到鎖(雖然發生概念很低,但要嚴謹考慮鎖的安全性)

  3. 客戶端1執行DEL,卻釋放了客戶端2的鎖

由此可見,以上GET + DEL兩個命令還是必須原子的執行才行。怎樣原子執行兩條命令呢?答案是Lua腳本,可以把以上邏輯寫成Lua腳本,讓Redis執行。因為Redis處理每個請求是單線程執行的,在執行一個Lua腳本時其它請求必須等待,直到這個Lua腳本處理完成,這樣一來GET+DEL之間就不會有其他命令執行了。

以下是使用Lua腳本(unlock.script)實現的釋放鎖操作的偽代碼,其中,KEYS[1]表示lock_key,ARGV[1]是當前客戶端的唯一標識,這兩個值都是我們在執行 Lua腳本時作為參數傳入的。

//Lua腳本語言,釋放鎖 比較unique_value是否相等,避免誤釋放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

最后我們執行以下命令,即可

redis-cli  --eval  unlock.script lock_key , unique_value

這樣一路優先下來,整個加鎖、解鎖流程就更嚴謹了,先小結一下,基于Redis實現的分布式鎖,一個嚴謹的流程如下:

  1. 加鎖時要設置過期時間SET lock_key unique_value EX expire_time NX

  2. 操作共享資源

  3. 釋放鎖:Lua腳本,先GET判斷鎖是否歸屬自己,再DEL釋放鎖

有了這個嚴謹的鎖模型,我們還需要重新思考之前的那個問題,鎖的過期時間不好評估怎么辦。

如何確定鎖的過期時間

前面提到過,過期時間如果評估得不好,這個鎖就會有提前過期的風險,一種妥協的解決方案是,盡量冗余過期時間,降低鎖提前過期的概率,但這個方案并不能完美解決問題。是否可以設置這樣的方案,加鎖時,先設置一個預估的過期時間,然后開啟一個守護線程,定時去檢測這個鎖的失效時間,如果鎖快要過期了,操作共享資源還未完成,那么就自動對鎖進行續期,重新設置過期時間

這是一種比較好的方案,已經有一個庫把這些工作都封裝好了,它就是Redisson。Redisson是一個Java語言實現的Redis SDK客戶端,在使用分布式鎖時,它就采用了自動續期的方案來避免鎖過期,這個守護線程我們一般叫它看門狗線程。這個SDK提供的API非常友好,它可以像操作本地鎖一樣操作分布式鎖。客戶端一旦加鎖成功,就會啟動一個watch dog看門狗線程,它是一個后臺線程,會每隔一段時間(這段時間的長度與設置的鎖的過期時間有關)檢查一下,如果檢查時客戶端還持有鎖key(也就是說還在操作共享資源),那么就會延長鎖key的生存時間。

怎么在SpringBoot中使用Redis實現分布式鎖

那如果客戶端在加鎖成功后就宕機了呢?宕機了那么看門狗任務就不存在了,也就無法為鎖續期了,鎖到期自動失效。

Redis的部署方式對鎖的影響

上面討論的情況,都是鎖在單個Redis 實例中可能產生的問題,并沒有涉及到Redis的部署架構細節。

Redis發展到現在,幾種常見的部署架構有:

  • 單機模式;

  • 主從模式;

  • 哨兵(sentinel)模式;

  • 集群模式;

我們使用Redis時,一般會采用主從集群+哨兵的模式部署,哨兵的作用就是監測redis節點的運行狀態。普通的主從模式,當master崩潰時,需要手動切換讓slave成為master,使用主從+哨兵結合的好處在于,當master異常宕機時,哨兵可以實現故障自動切換,把slave提升為新的master,繼續提供服務,以此保證可用性。那么當主從發生切換時,分布式鎖依舊安全嗎?

怎么在SpringBoot中使用Redis實現分布式鎖

想像這樣的場景:

  1. 客戶端1在master上執行SET命令,加鎖成功

  2. 此時,master異常宕機,SET命令還未同步到slave上(主從復制是異步的)

  3. 哨兵將slave提升為新的master,但這個鎖在新的master上丟失了,導致客戶端2來加鎖成功了,兩個客戶端共同操作共享資源

可見,當引入Redis副本后,分布式鎖還是可能受到影響。即使Redis通過sentinel保證高可用,如果這個master節點由于某些原因發生了主從切換,那么就會出現鎖丟失的情況。

集群模式+Redlock實現高可靠的分布式鎖

為了避免Redis實例故障而導致的鎖無法工作的問題,Redis的開發者 Antirez提出了分布式鎖算法Redlock。Redlock算法的基本思路,是讓客戶端和多個獨立的Redis實例依次請求加鎖,如果客戶端能夠和半數以上的實例成功地完成加鎖操作,那么我們就認為,客戶端成功地獲得分布式鎖了,否則加鎖失敗。這樣一來,即使有單個Redis實例發生故障,因為鎖變量在其它實例上也有保存,所以,客戶端仍然可以正常地進行鎖操作,鎖變量并不會丟失。

來具體看下Redlock算法的執行步驟。Redlock算法的實現要求Redis采用集群部署模式,無哨兵節點,需要有N個獨立的Redis實例(官方推薦至少5個實例)。接下來,我們可以分成3步來完成加鎖操作。

怎么在SpringBoot中使用Redis實現分布式鎖

第一步是,客戶端獲取當前時間。

第二步是,客戶端按順序依次向N個Redis實例執行加鎖操作。

這里的加鎖操作和在單實例上執行的加鎖操作一樣,使用SET命令,帶上NX、EX/PX選項,以及帶上客戶端的唯一標識。當然,如果某個Redis實例發生故障了,為了保證在這種情況下,Redlock算法能夠繼續運行,我們需要給加鎖操作設置一個超時時間。如果客戶端在和一個Redis實例請求加鎖時,一直到超時都沒有成功,那么此時,客戶端會和下一個Redis實例繼續請求加鎖。加鎖操作的超時時間需要遠遠地小于鎖的有效時間,一般也就是設置為幾十毫秒。

第三步是,一旦客戶端完成了和所有Redis實例的加鎖操作,客戶端就要計算整個加鎖過程的總耗時。

客戶端只有在滿足兩個條件時,才能認為是加鎖成功,條件一是客戶端從超過半數(大于等于 N/2+1)的Redis實例上成功獲取到了鎖;條件二是客戶端獲取鎖的總耗時沒有超過鎖的有效時間。

為什么大多數實例加鎖成功才能算成功呢?多個Redis實例一起來用,其實就組成了一個分布式系統。在分布式系統中總會出現異常節點,所以在談論分布式系統時,需要考慮異常節點達到多少個,也依舊不影響整個系統的正確運行。這是一個分布式系統的容錯問題,這個問題的結論是:如果只存在故障節點,只要大多數節點正常,那么整個系統依舊可以提供正確服務。

在滿足了這兩個條件后,我們需要重新計算這把鎖的有效時間,計算的結果是鎖的最初有效時間減去客戶端為獲取鎖的總耗時。如果鎖的有效時間已經來不及完成共享數據的操作了,我們可以釋放鎖,以免出現還沒完成共享資源操作,鎖就過期了的情況

當然,如果客戶端在和所有實例執行完加鎖操作后,沒能同時滿足這兩個條件,那么,客戶端就要向所有Redis節點發起釋放鎖的操作。為什么釋放鎖,要操作所有的節點呢,不能只操作那些加鎖成功的節點嗎?因為在某一個Redis節點加鎖時,可能因為網絡原因導致加鎖失敗,例如一個客戶端在一個Redis實例上加鎖成功,但在讀取響應結果時由于網絡問題導致讀取失敗,那這把鎖其實已經在Redis上加鎖成功了。所以釋放鎖時,不管之前有沒有加鎖成功,需要釋放所有節點上的鎖以保證清理節點上的殘留的鎖

在Redlock算法中,釋放鎖的操作和在單實例上釋放鎖的操作一樣,只要執行釋放鎖的 Lua腳本就可以了。這樣一來,只要N個Redis實例中的半數以上實例能正常工作,就能保證分布式鎖的正常工作了。所以,在實際的業務應用中,如果你想要提升分布式鎖的可靠性,就可以通過Redlock算法來實現。

二、代碼實現Redis分布式鎖

1.SpringBoot整合redis用到最多的當然屬于我們的老朋友RedisTemplate,pom依賴如下:

<!-- springboot整合redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.Redis配置類:

package com.example.redisdemo.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置類
 * @author Keson
 * @date 21:20 2022/11/14
 * @Param
 * @return
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 設置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

3.Service層面

package com.example.redisdemo.service;

import com.example.redisdemo.entity.CustomerBalance;
import java.util.concurrent.Callable;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO
 * @date 2022/11/14 15:12
 */
public interface RedisService {

    <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception;
}
package com.example.redisdemo.service.impl;

import com.example.redisdemo.entity.CustomerBalance;
import com.example.redisdemo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Redis實現分布式鎖
 * @date 2022/11/14 15:13
 */
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {

    //設置默認過期時間
    private final static int DEFAULT_LOCK_EXPIRY_TIME = 20;
    //自定義lock key前綴
    private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{
        //自定義lock key
        String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode());
        //將UUID當做value,確保唯一性
        String lockReference = UUID.randomUUID().toString();

        try {
            if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) {
                throw new Exception("lock加鎖失敗");
            }
            return callable.call();
        } finally {
            unlock(lockKey, lockReference);
        }
    }

    //定義lock key
    String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) {
        return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode);
    }

    //redis加鎖
    private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) {
        Boolean locked;
        try {
            //SET_IF_ABSENT --> NX: Only set the key if it does not already exist.
            //SET_IF_PRESENT --> XX: Only set the key if it already exist.
            locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8),
                            Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        } catch (Exception e) {
            log.error("Lock failed for redis key: {}, value: {}", key, value);
            locked = false;
        }
        return locked != null && locked;
    }

    //redis解鎖
    private boolean unlock(String key, String value) {
        try {
            //使用lua腳本保證刪除的原子性,確保解鎖
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                            "then return redis.call('del', KEYS[1]) " +
                            "else return 0 end";
            Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
                            key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)));
            return unlockState == null || !unlockState;
        } catch (Exception e) {
            log.error("unLock failed for redis key: {}, value: {}", key, value);
            return false;
        }
    }
}

4.業務調用實現分布式鎖示例:

    @Override
    public int updateById(CustomerBalance customerBalance) throws Exception {
        return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance));
    }

關于“怎么在SpringBoot中使用Redis實現分布式鎖”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么在SpringBoot中使用Redis實現分布式鎖”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新邵县| 峨眉山市| 获嘉县| 桦川县| 呼玛县| 新干县| 舞钢市| 平原县| 宣化县| 牟定县| 隆回县| 西丰县| 百色市| 定州市| 焦作市| 富蕴县| 长岛县| 陇西县| 繁峙县| 石门县| 合江县| 彭阳县| 正定县| 岢岚县| 皮山县| 平果县| 揭西县| 上蔡县| 郸城县| 四平市| 平凉市| 安化县| 穆棱市| 巴楚县| 高要市| 永嘉县| 霍州市| 鹿邑县| 和田县| 庆元县| 石嘴山市|