中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

?基于redis的分布式鎖怎么實現

發布時間:2021-12-23 12:00:43 來源:億速云 閱讀:142 作者:iii 欄目:云計算

本篇內容介紹了“基于redis的分布式鎖怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

基于redis的分布式鎖

/**
*分布式鎖工廠類
*/
public class RedisLockUtil {
	private static final Logger logger =  Logger.getLogger(RedisLockUtil.class);
	private static Object schemeLock = new Object();
	private static Map<String,RedisLockUtil> instances = new ConcurrentHashMap();
	public static RedisLockUtil getInstance(String schema){
		RedisLockUtil u = instances.get(schema);
		if(u==null){
			synchronized(schemeLock){
				u = instances.get(schema);
				if(u == null){
					LockObserver lo = new LockObserver(schema);		
					u = new RedisLockUtil(schema,lo);
					instances.put(schema, u);
				}
			}
		}
		return u;
	}

	private Object mutexLock = new Object();
	private Map<String,Object> mutexLockMap = new ConcurrentHashMap();
	private Map<String,RedisReentrantLock> cache  = new ConcurrentHashMap<String,RedisReentrantLock>();
	private DelayQueue<RedisReentrantLock> dq = new DelayQueue<RedisReentrantLock>();
	private AbstractLockObserver lo;
	public RedisLockUtil(String schema, AbstractLockObserver lo){	
		Thread th = new Thread(lo);
		th.setDaemon(false);
		th.setName("Lock Observer:"+schema);
		th.start();
		clearUselessLocks(schema);
		this.lo = lo;
	}

	public void clearUselessLocks(String schema){
		Thread th = new Thread(new Runnable(){
			@Override
			public void run() {
				while(!SystemExitListener.isOver()){
					try {
						RedisReentrantLock t = dq.take();						
						if(t.clear()){
							String key = t.getKey();
							synchronized(getMutex(key)){
								cache.remove(key);
							}
						}
						t.resetCleartime();
					} catch (InterruptedException e) {
					}
				}
			}

		});
		th.setDaemon(true);
		th.setName("Lock cleaner:"+schema);
		th.start();
	}

	private Object getMutex(String key){
		Object mx = mutexLockMap.get(key);
		if(mx == null){
			synchronized(mutexLock){
				mx = mutexLockMap.get(key);
				if(mx==null){
					mx = new Object();
					mutexLockMap.put(key,mx);
				}
			}
		}
		return mx;
	}

	private RedisReentrantLock getLock(String key,boolean addref){
		RedisReentrantLock lock = cache.get(key);
		if(lock == null){
			synchronized(getMutex(key)){
				lock = cache.get(key);
				if(lock == null){
					lock = new RedisReentrantLock(key,lo);
					cache.put(key, lock);
				}
			}
		}
		if(addref){
			if(!lock.incRef()){
				synchronized(getMutex(key)){
					lock = cache.get(key);
					if(!lock.incRef()){
						lock = new RedisReentrantLock(key,lo);
						cache.put(key, lock);
					}
				}
			}
		}
		return lock;
	}

	public void reset(){
		for(String s : cache.keySet()){
			getLock(s,false).unlock();
		}
	}

	/**
	 * 嘗試加鎖
	 * 如果當前線程已經擁有該鎖的話,直接返回,表示不用再次加鎖,此時不應該再調用unlock進行解鎖
	 * 
	 * @param key
	 * @return
	 * @throws Exception 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public LockStat lock(String key) {
		return lock(key,-1);
	}

	public LockStat lock(String key,int timeout) {
		RedisReentrantLock ll = getLock(key,true);
		ll.incRef();
		try{
			if(ll.isOwner(false)){
				ll.descrRef();
				return LockStat.NONEED;
			}
			if(ll.lock(timeout)){
				return LockStat.SUCCESS;
			}else{
				ll.descrRef();
				if(ll.setCleartime()){
					dq.put(ll);
				}
				return null;
			}
		}catch(LockNotExistsException e){
			ll.descrRef();
			return lock(key,timeout);
		}catch(RuntimeException e){
			ll.descrRef();
			throw e;
		}
	}

	public void unlock(String key,LockStat stat) {
		unlock(key,stat,false);
	}

	public void unlock(String key,LockStat stat,boolean keepalive){
		if(stat == null) return;
		if(LockStat.SUCCESS.equals(stat)){
			RedisReentrantLock lock =  getLock(key,false);
			boolean candestroy = lock.unlock();
			if(candestroy && !keepalive){
				if(lock.setCleartime()){
					dq.put(lock);
				}
			}
		}
	}

	public static enum LockStat{
		NONEED,
		SUCCESS
	}
}



/**
*分布式鎖本地代理類
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger =  Logger.getLogger(RedisReentrantLock.class);
   private ReentrantLock reentrantLock = new ReentrantLock();

   private RedisLock redisLock;
   private long timeout = 3*60;
   private CountDownLatch lockcount = new CountDownLatch(1);

   private String key;
   private AbstractLockObserver observer;

   private int ref = 0;
   private Object refLock = new Object();
   private boolean destroyed = false;

   private long cleartime = -1;

   public RedisReentrantLock(String key,AbstractLockObserver observer) {
    this.key = key;
    this.observer = observer;
    initWriteLock();
   }

public boolean isDestroyed() {
return destroyed;
}

private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}

@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
    },observer);
}

public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
    ref ++;
    }
return true;
}

public void descrRef(){
synchronized(refLock){
    ref --;
    }
}

public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
    if(ref > 0){
    return false;
    }
    destroyed = true;
    redisLock.clear();
    redisLock = null;
    return true;
    }
}

   public boolean lock(long timeout) throws LockNotExistsException{
    if(timeout <= 0) timeout = this.timeout;
    //incRef();
       reentrantLock.lock();//多線程競爭時,先拿到第一層鎖
       if(redisLock == null){
        reentrantLock.unlock();
        //descrRef();
        throw new LockNotExistsException();
       }
       try{
        lockcount = new CountDownLatch(1);
        boolean res = redisLock.trylock(timeout);
        if(!res){  
        lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
        if(!redisLock.doExpire()){
        reentrantLock.unlock();
return false;
}
        }
        return true;
       }catch(InterruptedException e){
        reentrantLock.unlock();
        return false;
       }
   }

   public boolean lock() throws LockNotExistsException {
    return lock(timeout);
   }

   public boolean unlock(){
    if(!isOwner(true)) {
    try{
    throw new RuntimeException("big ================================================ error.key:"+key);
    }catch(Exception e){
    logger.error("err:"+e,e);
    }
    return false;
    }
       try{
        redisLock.unlock();
        reentrantLock.unlock();//多線程競爭時,釋放最外層鎖
       }catch(RuntimeException e){
        reentrantLock.unlock();//多線程競爭時,釋放最外層鎖
        throw e;
       }finally{
        descrRef();
       }
       return canDestroy();
   }

   public boolean canDestroy(){
    synchronized(refLock){
    return ref <= 0;
    }
   }

   public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public boolean isOwner(boolean check) {
    synchronized(refLock){
    if(redisLock == null) {
    logger.error("reidsLock is null:key="+key);
    return false;
    }
    boolean a = reentrantLock.isHeldByCurrentThread();
    boolean b = redisLock.isOwner();
    if(check){
    if(!a || !b){
    logger.error(key+";a:"+a+";b:"+b);
    }
    }
    return a && b;
    }
   }

public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}

public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}

@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
       long l = this.cleartime - t.cleartime;

if(l > 0) return 1 ; //比當前的小則返回1,比當前的大則返回-1,否則為0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    return d;
}

}



/**
*使用Redis實現的分布式鎖
*基本工作原理如下:
*1. 使用setnx(key,時間戮+超時),如果設置成功,則直接拿到鎖
*2. 如果設置不成功,獲取key的值v1(它的到期時間戮),跟當前時間對比,看是否已經超時
*3. 如果超時(說明拿到鎖的結點已經掛掉),v2=getset(key,時間戮+超時+1),判斷v2是否等于v1,如果相等,加鎖成功,否則加鎖失敗,等過段時間再重試(200MS)
*/
public class RedisLock implements LockListener{
	private String key;
	private boolean owner = false;
	private AbstractLockObserver observer = null;
	private LockListener lockListener = null;
	private boolean waiting = false;
	private long expire;//鎖超時時間,以秒為單位
	private boolean expired = false;

	public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
		this.key = key;
		this.lockListener = lockListener;
		this.observer = observer;
	}

	public boolean trylock(long expire) {
		synchronized(this){
			if(owner){
				return true;
			}
			this.expire = expire;
			this.expired = false;
			if(!waiting){
				owner = observer.tryLock(key,expire);
				if(!owner){
					waiting = true;
					observer.addLockListener(key, this);
				}
			}
			return owner;
		}
	}

	public boolean isOwner() {
		return owner;
	}

	public void unlock() {
		synchronized(this){
			observer.unLock(key);
			owner = false;
		}
	}

	public void clear() {
		synchronized(this){
			if(waiting) {
				observer.removeLockListener(key);
				waiting = false;
			}
		}
	}

	public boolean doExpire(){
		synchronized(this){
			if(owner) return true;
			if(expired) return false;
			expired =  true;
			clear();
		}
		return false;
	}

	@Override
	public void lockAcquired() {
		synchronized(this){
			if(expired){
				unlock();
				return;
			}
			owner = true;
			waiting = false;
		}
		lockListener.lockAcquired();
	}

	@Override
	public long getExpire() {
		return this.expire;
	}

	@Override
	public void lockError() {
		synchronized(this){
			owner = false;
			waiting = false;
			lockListener.lockError();
		}
	}

}



public class LockObserver extends AbstractLockObserver implements Runnable{
	private CacheRedisClient client;
	private Object mutex = new Object();
	private Map<String,LockListener> lockMap = new ConcurrentHashMap();
	private boolean stoped = false;
	private long interval = 500;
	private boolean terminated = false;
	private CountDownLatch doneSignal = new CountDownLatch(1);
	
	public LockObserver(String schema){
		client = new CacheRedisClient(schema);
		
		SystemExitListener.addTerminateListener(new ExitHandler(){
			public void run() {
				stoped = true;
				try {
					doneSignal.await();
				} catch (InterruptedException e) {
				}
			}
		});
	}
	
	
	public void addLockListener(String key,LockListener listener){
		if(terminated){
			listener.lockError();
			return;
		}
		synchronized(mutex){
			lockMap.put(key, listener);
		}
	}
	
	public void removeLockListener(String key){
		synchronized(mutex){
			lockMap.remove(key);
		}
	}
	
	@Override
	public void run() {
		while(!terminated){
			long p1 = System.currentTimeMillis();
			Map<String,LockListener> clone = new HashMap();
			synchronized(mutex){
				clone.putAll(lockMap);
			}			
			Set<String> keyset = clone.keySet();
			if(keyset.size() > 0){
				ConnectionFactory.setSingleConnectionPerThread(keyset.size());
				for(String key : keyset){
					LockListener ll = clone.get(key);
					try{
					    if(tryLock(key,ll.getExpire())) {
					    	ll.lockAcquired();
					    	removeLockListener(key);
					    }
					}catch(Exception e){
						ll.lockError();
						removeLockListener(key);
					}
				}
				ConnectionFactory.releaseThreadConnection();
			}else{
				if(stoped){
					terminated = true;
					doneSignal.countDown();
					return;
				}
			}
			try {
				long p2 = System.currentTimeMillis();
				long cost = p2 - p1;
				if(cost <= interval){
					Thread.sleep(interval - cost);
				}else{
					Thread.sleep(interval*2);
				}
			} catch (InterruptedException e) {
			}
		}
		
	}
	
	
	/**
	 * 超時時間單位為s!!!
	 * @param key
	 * @param expire
	 * @return
	 */
	public boolean tryLock(final String key,final long expireInSecond){
		if(terminated) return false;
		final long tt = System.currentTimeMillis();
		final long expire = expireInSecond * 1000;
		final Long ne = tt + expire;
		List<Object> mm = client.multi(key, new MultiBlock(){
			@Override
			public void execute() {
				transaction.setnxObject(key, ne);
				transaction.get(SafeEncoder.encode(key));
			}
		});
		Long res = (Long)mm.get(0);
	    if(new Long(1).equals(res)) {
	    	return true;
	    }else{
	    	byte[] bb = (byte[])mm.get(1);
			Long ex = client.deserialize(bb);
	    	if(ex == null || tt > ex){
	    		Long old = client.getSet(key, new Long(ne+1));
	    		if(old == null || (ex == null&&old==null) || (ex!=null&&ex.equals(old))){
	    			return true;
	    		}
	    	}
	    }
	    return false;
	}

	public void unLock(String key){
		client.del(key);
	}
}

使用本方案實現的分布式鎖,可以完美地解決鎖重入問題;通過引入超時也避免了死鎖問題;性能方面,筆者自測試結果如下:

500線程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
線程總時間:6553466;平均:13.106932
實際總時間:13609; 平均:0.027218

TPS達到35000,比方案1強了整整一個數量級

“基于redis的分布式鎖怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

尖扎县| 于都县| 大同市| 灌南县| 梅河口市| 东至县| 永和县| 米泉市| 黄山市| 鄂托克旗| 沅江市| 衢州市| 平乐县| 拜泉县| 龙南县| 博兴县| 盐亭县| 嘉黎县| 九台市| 葫芦岛市| 昭平县| 蓬安县| 鹿泉市| 大城县| 长宁区| 灵石县| 辽阳县| 恭城| 福贡县| 淳化县| 贺兰县| 华宁县| 攀枝花市| 津市市| 鸡东县| 玉屏| 淮阳县| 萝北县| 清镇市| 滨海县| 平南县|