您好,登錄后才能下訂單哦!
本篇內容介紹了“基于redis的分布式鎖怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
基于redis的分布式鎖
/** *分布式鎖工廠類 */ public class RedisLockUtil { private static final Logger logger = Logger.getLogger(RedisLockUtil.class); private static Object schemeLock = new Object(); private static Map<String,RedisLockUtil> instances = new ConcurrentHashMap(); public static RedisLockUtil getInstance(String schema){ RedisLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ LockObserver lo = new LockObserver(schema); u = new RedisLockUtil(schema,lo); instances.put(schema, u); } } } return u; } private Object mutexLock = new Object(); private Map<String,Object> mutexLockMap = new ConcurrentHashMap(); private Map<String,RedisReentrantLock> cache = new ConcurrentHashMap<String,RedisReentrantLock>(); private DelayQueue<RedisReentrantLock> dq = new DelayQueue<RedisReentrantLock>(); private AbstractLockObserver lo; public RedisLockUtil(String schema, AbstractLockObserver lo){ Thread th = new Thread(lo); th.setDaemon(false); th.setName("Lock Observer:"+schema); th.start(); clearUselessLocks(schema); this.lo = lo; } public void clearUselessLocks(String schema){ Thread th = new Thread(new Runnable(){ @Override public void run() { while(!SystemExitListener.isOver()){ try { RedisReentrantLock t = dq.take(); if(t.clear()){ String key = t.getKey(); synchronized(getMutex(key)){ cache.remove(key); } } t.resetCleartime(); } catch (InterruptedException e) { } } } }); th.setDaemon(true); th.setName("Lock cleaner:"+schema); th.start(); } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private RedisReentrantLock getLock(String key,boolean addref){ RedisReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } if(addref){ if(!lock.incRef()){ synchronized(getMutex(key)){ lock = cache.get(key); if(!lock.incRef()){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s,false).unlock(); } } /** * 嘗試加鎖 * 如果當前線程已經擁有該鎖的話,直接返回,表示不用再次加鎖,此時不應該再調用unlock進行解鎖 * * @param key * @return * @throws Exception * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) { return lock(key,-1); } public LockStat lock(String key,int timeout) { RedisReentrantLock ll = getLock(key,true); ll.incRef(); try{ if(ll.isOwner(false)){ ll.descrRef(); return LockStat.NONEED; } if(ll.lock(timeout)){ return LockStat.SUCCESS; }else{ ll.descrRef(); if(ll.setCleartime()){ dq.put(ll); } return null; } }catch(LockNotExistsException e){ ll.descrRef(); return lock(key,timeout); }catch(RuntimeException e){ ll.descrRef(); throw e; } } public void unlock(String key,LockStat stat) { unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive){ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ RedisReentrantLock lock = getLock(key,false); boolean candestroy = lock.unlock(); if(candestroy && !keepalive){ if(lock.setCleartime()){ dq.put(lock); } } } } public static enum LockStat{ NONEED, SUCCESS } }
/**
*分布式鎖本地代理類
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout <= 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();//多線程競爭時,先拿到第一層鎖
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException("big ================================================ error.key:"+key);
}catch(Exception e){
logger.error("err:"+e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();//多線程競爭時,釋放最外層鎖
}catch(RuntimeException e){
reentrantLock.unlock();//多線程競爭時,釋放最外層鎖
throw e;
}finally{
descrRef();
}
return canDestroy();
}
public boolean canDestroy(){
synchronized(refLock){
return ref <= 0;
}
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key="+key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+";a:"+a+";b:"+b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l > 0) return 1 ; //比當前的小則返回1,比當前的大則返回-1,否則為0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}
}
/** *使用Redis實現的分布式鎖 *基本工作原理如下: *1. 使用setnx(key,時間戮+超時),如果設置成功,則直接拿到鎖 *2. 如果設置不成功,獲取key的值v1(它的到期時間戮),跟當前時間對比,看是否已經超時 *3. 如果超時(說明拿到鎖的結點已經掛掉),v2=getset(key,時間戮+超時+1),判斷v2是否等于v1,如果相等,加鎖成功,否則加鎖失敗,等過段時間再重試(200MS) */ public class RedisLock implements LockListener{ private String key; private boolean owner = false; private AbstractLockObserver observer = null; private LockListener lockListener = null; private boolean waiting = false; private long expire;//鎖超時時間,以秒為單位 private boolean expired = false; public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) { this.key = key; this.lockListener = lockListener; this.observer = observer; } public boolean trylock(long expire) { synchronized(this){ if(owner){ return true; } this.expire = expire; this.expired = false; if(!waiting){ owner = observer.tryLock(key,expire); if(!owner){ waiting = true; observer.addLockListener(key, this); } } return owner; } } public boolean isOwner() { return owner; } public void unlock() { synchronized(this){ observer.unLock(key); owner = false; } } public void clear() { synchronized(this){ if(waiting) { observer.removeLockListener(key); waiting = false; } } } public boolean doExpire(){ synchronized(this){ if(owner) return true; if(expired) return false; expired = true; clear(); } return false; } @Override public void lockAcquired() { synchronized(this){ if(expired){ unlock(); return; } owner = true; waiting = false; } lockListener.lockAcquired(); } @Override public long getExpire() { return this.expire; } @Override public void lockError() { synchronized(this){ owner = false; waiting = false; lockListener.lockError(); } } }
使用本方案實現的分布式鎖,可以完美地解決鎖重入問題;通過引入超時也避免了死鎖問題;性能方面,筆者自測試結果如下:
public class LockObserver extends AbstractLockObserver implements Runnable{
private CacheRedisClient client;
private Object mutex = new Object();
private Map<String,LockListener> lockMap = new ConcurrentHashMap();
private boolean stoped = false;
private long interval = 500;
private boolean terminated = false;
private CountDownLatch doneSignal = new CountDownLatch(1);
public LockObserver(String schema){
client = new CacheRedisClient(schema);
SystemExitListener.addTerminateListener(new ExitHandler(){
public void run() {
stoped = true;
try {
doneSignal.await();
} catch (InterruptedException e) {
}
}
});
}
public void addLockListener(String key,LockListener listener){
if(terminated){
listener.lockError();
return;
}
synchronized(mutex){
lockMap.put(key, listener);
}
}
public void removeLockListener(String key){
synchronized(mutex){
lockMap.remove(key);
}
}
@Override
public void run() {
while(!terminated){
long p1 = System.currentTimeMillis();
Map<String,LockListener> clone = new HashMap();
synchronized(mutex){
clone.putAll(lockMap);
}
Set<String> keyset = clone.keySet();
if(keyset.size() > 0){
ConnectionFactory.setSingleConnectionPerThread(keyset.size());
for(String key : keyset){
LockListener ll = clone.get(key);
try{
if(tryLock(key,ll.getExpire())) {
ll.lockAcquired();
removeLockListener(key);
}
}catch(Exception e){
ll.lockError();
removeLockListener(key);
}
}
ConnectionFactory.releaseThreadConnection();
}else{
if(stoped){
terminated = true;
doneSignal.countDown();
return;
}
}
try {
long p2 = System.currentTimeMillis();
long cost = p2 - p1;
if(cost <= interval){
Thread.sleep(interval - cost);
}else{
Thread.sleep(interval*2);
}
} catch (InterruptedException e) {
}
}
}
/**
* 超時時間單位為s!!!
* @param key
* @param expire
* @return
*/
public boolean tryLock(final String key,final long expireInSecond){
if(terminated) return false;
final long tt = System.currentTimeMillis();
final long expire = expireInSecond * 1000;
final Long ne = tt + expire;
List<Object> mm = client.multi(key, new MultiBlock(){
@Override
public void execute() {
transaction.setnxObject(key, ne);
transaction.get(SafeEncoder.encode(key));
}
});
Long res = (Long)mm.get(0);
if(new Long(1).equals(res)) {
return true;
}else{
byte[] bb = (byte[])mm.get(1);
Long ex = client.deserialize(bb);
if(ex == null || tt > ex){
Long old = client.getSet(key, new Long(ne+1));
if(old == null || (ex == null&&old==null) || (ex!=null&&ex.equals(old))){
return true;
}
}
}
return false;
}
public void unLock(String key){
client.del(key);
}
}
500線程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
線程總時間:6553466;平均:13.106932
實際總時間:13609; 平均:0.027218
TPS達到35000,比方案1強了整整一個數量級
“基于redis的分布式鎖怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。