您好,登錄后才能下訂單哦!
在分布式系統的很多場景中,我們為了保證數據的最終一致性,需要很多的技術方案來支持,比如分布式事務、分布式鎖等。
有的時候,我們需要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了很多并發處理相關的API,但是這些API在分布式場景中就無能為力了。也就是說單純的Java Api并不能提供分布式鎖的能力。
目前針對分布式鎖的實現目前有多種方案:
在分析這幾種實現方案之前我們先來想一下,我們需要的分布式鎖應該是怎么樣的?(這里以方法鎖為例,資源鎖同理)
可以保證在分布式部署的應用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執行。
要實現分布式鎖,最簡單的方式可能就是直接創建一張鎖表,然后通過操作該表中的數據來實現了。
當我們要鎖住某個方法或資源時,我們就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄。
創建這樣一張數據庫表:
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '備注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';
當我們想要鎖住某個方法時,執行以下SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因為我們對method_name做了唯一性約束,這里如果有多個請求同時提交到數據庫的話,數據庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執行方法體內容。
當方法執行完畢之后,想要釋放鎖的話,需要執行以下Sql:
delete from methodLock where method_name ='method_name'
上面這種簡單的實現有以下幾個問題:
這把鎖強依賴數據庫的可用性,數據庫是一個單點,一旦數據庫掛掉,會導致業務系統不可用。
這把鎖沒有失效時間,一旦解鎖操作失敗,就會導致鎖記錄一直在數據庫中,其他線程無法再獲得到鎖。
這把鎖只能是非阻塞的,因為數據的insert操作,一旦插入失敗就會直接報錯。沒有獲得鎖的線程并不會進入排隊隊列,要想再次獲得鎖就要再次觸發獲得鎖操作。
當然,我們也可以有其他方式解決上面的問題。
針對 數據庫是單點問題搞兩個數據庫,數據之前雙向同步。一旦掛掉快速切換到備庫上。
針對 沒有失效時間?只要做一個定時任務,每隔一定時間把數據庫中的超時數據清理一遍。
針對 非阻塞的?搞一個while循環,直到insert成功再返回成功。
針對 非重入的?在數據庫表中加個字段,記錄當前獲得鎖的機器的主機信息和線程信息,那么下次再獲取鎖的時候先查詢數據庫,如果當前機器的主機信息和線程信息在數據庫可以查到的話,直接把鎖分配給他就可以了。
除了可以通過增刪操作數據表中的記錄以外,其實還可以借助數據中自帶的鎖來實現分布式的鎖。
我們還用剛剛創建的那張數據庫表。可以通過數據庫的排他鎖來實現分布式鎖。 基于MySql的InnoDB引擎,可以使用以下方法來實現加鎖操作:
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx
for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
在查詢語句后面增加for update,數據庫會在查詢過程中給數據庫表增加排他鎖。當某條記錄被加上排他鎖之后,其他線程無法再在該行記錄上增加排他鎖。
我們可以認為獲得排它鎖的線程即可獲得分布式鎖,當獲取到鎖之后,可以執行方法的業務邏輯,執行完方法之后,再通過以下方法解鎖:
public void unlock(){
connection.commit();
}
通過connection.commit()操作來釋放鎖。
這種方法可以有效的解決上面提到的無法釋放鎖和阻塞鎖的問題。
阻塞鎖? for update語句會在執行成功后立即返回,在執行失敗時一直處于阻塞狀態,直到成功。
鎖定之后 服務宕機,無法釋放?使用這種方式,服務宕機之后數據庫會自己把鎖釋放掉。
但是還是無法直接解決數據庫單點和可重入問題。
總結一下使用數據庫來實現分布式鎖的方式,這兩種方式都是依賴數據庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過數據庫的排他鎖來實現分布式鎖。
數據庫實現分布式鎖的 優點: 直接借助數據庫,容易理解。
數據庫實現分布式鎖的 缺點: 會有各種各樣的問題,在解決問題的過程中會使整個方案變得越來越復雜。
操作數據庫需要一定的開銷,性能問題需要考慮。
相比較于基于數據庫實現分布式鎖的方案來說,基于緩存來實現在性能方面會表現的更好一點。而且很多緩存是可以集群部署的,可以解決單點問題。
目前有很多成熟的緩存產品,包括Redis,memcached等。
在實現的時候要注意的幾個關鍵點:
鎖信息必須是會過期超時的,不能讓一個線程長期占有一個鎖而導致死鎖;
幾個要用到的redis命令:
setnx(key, value):“set if not exits”,若該key-value不存在,則成功加入緩存并且返回1,否則返回0。
get(key):獲得key對應的value值,若不存在則返回nil。
getset(key, value):先獲取key對應的value值,若不存在則返回nil,然后將舊的value更新為新的value。
expire(key, seconds):設置key-value的有效期為seconds秒。
看一下流程圖:
在這個流程下,不會導致死鎖。
我采用Jedis作為Redis客戶端的api,下面來看一下具體實現的代碼。
public class RedisPool {
private static JedisPool pool;//jedis連接池
private static int maxTotal = 20;//最大連接數
private static int maxIdle = 10;//最大空閑連接數
private static int minIdle = 5;//最小空閑連接數
private static boolean testOnBorrow = true;//在取連接時測試連接的可用性
private static boolean testOnReturn = false;//再還連接時不測試連接的可用性
static {
initPool();//初始化連接池
}
public static Jedis getJedis(){
return pool.getResource();
}
public static void close(Jedis jedis){
jedis.close();
}
private static void initPool(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 5000, "liqiyao");
}
}
public class RedisPoolUtil {
private RedisPoolUtil(){}
private static RedisPool redisPool;
public static String get(String key){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.get(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long setnx(String key, String value){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.setnx(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static String getSet(String key, String value){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.getSet(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long expire(String key, int seconds){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.expire(key, seconds);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long del(String key){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.del(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
}
public class DistributedLockUtil {
private DistributedLockUtil(){
}
public static boolean lock(String lockName){//lockName可以為共享變量
名,也可以為方法名,主要是用于模擬鎖信息
System.out.println(Thread.currentThread() + "開始嘗試加鎖!");
Long result = RedisPoolUtil.setnx
(lockName, String.valueOf(System.currentTimeMillis() + 5000));
if (result != null && result.intValue() == 1){
System.out.println(Thread.currentThread() + "加鎖成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "執行業務邏輯!");
RedisPoolUtil.del(lockName);
return true;
} else {
String lockValueA = RedisPoolUtil.get(lockName);
if (lockValueA != null && Long.parseLong(lockValueA) >=
System.currentTimeMillis()){
String lockValueB = RedisPoolUtil.getSet(lockName,
String.valueOf(System.currentTimeMillis() + 5000));
if (lockValueB == null || lockValueB.equals(lockValueA)){
System.out.println(Thread.currentThread() + "加鎖成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "執行業務邏輯!");
RedisPoolUtil.del(lockName);
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
基于zookeeper臨時有序節點可以實現的分布式鎖。大致思想即為:每個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個唯一的
瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題。
鎖無法釋放?
使用Zookeeper可以有效的解決鎖無法釋放的問題,因為在創建鎖的時候,客戶端會在ZK中創建一個臨時節點,一旦客戶端獲取到鎖之后突然掛掉(Session連接斷開),那么這個臨時節點就會自動刪除掉。其他客戶端就可以再次獲得鎖。
非阻塞鎖?
使用Zookeeper可以實現阻塞的鎖,客戶端可以通過在ZK中創建順序節點,并且在節點上綁定監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端可以檢查自己創建的節點是不是當前所有節點中序號最小的,如果是,那么自己就獲取到鎖,便可以執行業務邏輯了。
不可重入?
使用Zookeeper也可以有效的解決不可重入的問題,客戶端在創建節點的時候,把當前客戶端的主機信息和線程信息直接寫入到節點中,下次想要獲取鎖的時候和當前最小的節點中的數據比對一下就可以了。如果和自己的信息一樣,那么自己直接獲取到鎖,如果不一樣就再創建一個臨時的順序節點,參與排隊。
單點問題?
使用Zookeeper可以有效的解決單點問題,ZK是集群部署的,只要集群中有半數以上的機器存活,就可以對外提供服務。
可以直接使用zookeeper第三方庫Curator客戶端,這個客戶端中封裝了一個可重入的鎖服務。
public boolean tryLock(long timeout, TimeUnit unit) throws
InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path),
delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}
Curator提供的InterProcessMutex是分布式鎖的實現。acquire方法用戶獲取鎖,release方法用于釋放鎖。
使用ZK實現的分布式鎖好像完全符合了本文開頭我們對一個分布式鎖的所有期望。但是,其實并不是,Zookeeper實現的分布式鎖其實存在一個缺點,那就是性能上可能并沒有緩存服務
那么高。因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。ZK中創建和刪除節點只能通過Leader服務器來執行,然后將數據同不到所有的Follower機器上。
使用Zookeeper實現分布式鎖的優點: 有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題。實現起來較為簡單。
使用Zookeeper實現分布式鎖的缺點 : 性能上不如使用緩存實現分布式鎖。 需要對ZK的原理有所了解。
從理解的難易程度角度(從低到高): 數據庫 > 緩存 > Zookeeper
從實現的復雜性角度(從低到高): Zookeeper >= 緩存 > 數據庫
從性能角度(從高到低): 緩存 > Zookeeper >= 數據庫
從可靠性角度(從高到低): Zookeeper > 緩存 > 數據庫
因此我個人更加傾向于使用緩存來實現,后續的文章中會基于Redis封裝一個我們自己的分布式鎖實現。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。