您好,登錄后才能下訂單哦!
這篇文章主要介紹了Redis怎么實現分布式鎖和等待序列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
在集群下,經常會因為同時處理發生資源爭搶和并發問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說白了在集群下沒啥用。這時我們就需要能在多臺 JVM 之間決定執行順序的鎖了,現在分布式鎖主要有 redis 、 Zookeeper 實現的,還有數據庫的方式,不過性能太差,也就是需要一個第三方的監管。
背景
最近在做一個消費 Kafka 消息的時候發現,由于線上的消費者過多,經常會遇到,多個機器同時處理一個主鍵類型的數據的情況發生,如果最后是執行更新操作的話,也就是一個更新順序的問題,但是如果恰好都需要插入數據的時候,會出現主鍵重復的問題。這是生產上不被允許的(因為公司有異常監管的機制,扣分啥的),這是就需要個分布式鎖了,斟酌后用了 Redis 的實現方式(因為網上例子多)
分析
redis 實現的分布式鎖,實現原理是 set 方法,因為多個線程同時請求的時候,只有一個線程可以成功并返回結果,還可以設置有效期,來避免死鎖的發生,一切都是這么的完美,不過有個問題,在 set 的時候,會直接返回結果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的線程進程處理,有兩種方式
丟棄
等待重試 由于我們的系統需要這些數據,那么只能重新嘗試獲取。這里使用 redis 的 List 類型實現等待序列的作用
代碼
直接上代碼 其實直接redis的工具類就可以解決了
package com.test import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.List; /** * @desc redis隊列實現方式 * @anthor * @date **/ public class RedisUcUitl { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private RedisUcUitl() { } /** * logger **/ /** * 存儲redis隊列順序存儲 在隊列首部存入 * * @param key 字節類型 * @param value 字節類型 */ public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) { return jedis.lpush(key, value); } /** * 移除列表中最后一個元素 并將改元素添加入另一個列表中 ,當列表為空時 將阻塞連接 直到等待超時 * * @param srckey * @param dstkey * @param timeout 0 表示永不超時 * @return */ public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) { return jedis.brpoplpush(srckey, dstkey, timeout); } /** * 返回制定的key,起始位置的redis數據 * @param redisKey * @param start * @param end -1 表示到最后 * @return */ public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) { return jedis.lrange(redisKey, start, end); } /** * 刪除key * @param redisKey */ public static void delete(Jedis jedis, final byte[] redisKey) { return jedis.del(redisKey); } /** * 嘗試加鎖 * @param lockKey key名稱 * @param requestId 身份標識 * @param expireTime 過期時間 * @return */ public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } /** * 釋放鎖 * @param lockKey key名稱 * @param requestId 身份標識 * @return */ public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) { final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return RELEASE_SUCCESS.equals(result); } }
業務邏輯主要代碼如下
1.先消耗隊列中的
while(true){ // 消費隊列 try{ // 被放入redis隊列的數據 序列化后的 byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1); if(bytes == null || bytes.isEmpty()){ // 隊列中沒數據時退出 break; } // 反序列化對象 Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes); // 塞入唯一的值 防止被其他線程誤解鎖 String requestId = UUID.randomUUID().toString(); boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100); if(lockGetFlag){ // 成功獲取鎖 進行業務處理 //TODO // 處理完畢釋放鎖 boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId); }else{ // 未能獲得鎖放入等待隊列 RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param)); } }catch(Exception e){ break; } }
2.處理最新接到的數據
同樣是走嘗試獲取鎖,獲取不到放入隊列的流程
一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下
public class ObjectSerialUtil { private ObjectSerialUtil() { // 工具類 } /** * 將Object對象序列化為byte[] * * @param obj 對象 * @return byte數組 * @throws Exception */ public static byte[] objectToBytes(Object obj) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); byte[] bytes = bos.toByteArray(); bos.close(); oos.close(); return bytes; } /** * 將bytes數組還原為對象 * * @param bytes * @return * @throws Exception */ public static Object bytesToObject(byte[] bytes) { try { ByteArrayInputStream bin = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bin); return ois.readObject(); } catch (Exception e) { throw new BaseException("反序列化出錯!", e); } } }
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Redis怎么實現分布式鎖和等待序列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。