一、使用分布式鎖要滿足的幾個條件:
1、系統是一個分布式系統(關鍵是分布式,單機的可以使用ReentrantLock或者synchronized代碼塊來實現)
2、共享資源(各個系統訪問同一個資源,資源的載體可能是傳統關系型數據庫或者NoSQL)
3、同步訪問(即有很多個進程同事訪問同一個共享資源。沒有同步訪問,誰管你資源競爭不競爭)
二、應用的場景例子
管理后臺的部署架構(多臺tomcat服務器+redis【多臺tomcat服務器訪問一臺redis】+mysql【多臺tomcat服務器訪問一臺服務器上的mysql】)就滿足使用分布式鎖的條件。多臺服務器要訪問redis全局緩存的資源,如果不使用分布式鎖就會出現問題。?看如下偽代碼:
long?N=0L; //N從redis獲取值 if(N<p>上面的代碼主要實現的功能:</p><p>從redis獲取值N,對數值N進行邊界檢查,自加1,然后N寫回redis中。?這種應用場景很常見,像秒殺,全局遞增ID、IP訪問限制等。</p><p>以IP訪問限制來說,惡意攻擊者可能發起無限次訪問,并發量比較大,分布式環境下對N的邊界檢查就不可靠,因為從redis讀的N可能已經是臟數據。</p><p>傳統的加鎖的做法(如java的synchronized和Lock)也沒用,因為這是分布式環境,這個同步問題的救火隊員也束手無策。在這危急存亡之秋,分布式鎖終于有用武之地了。</p><p>分布式鎖可以基于很多種方式實現,比如zookeeper、redis...。不管哪種方式,他的基本原理是不變的:用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識。</p><p>這里主要講如何用redis實現分布式鎖。</p><p><strong>三、使用redis的setNX命令實現分布式鎖 </strong></p><p>1、實現的原理</p><p>Redis為單進程單線程模式,采用隊列模式將并發訪問變成串行訪問,且多客戶端對Redis的連接并不存在競爭關系。redis的SETNX命令可以方便的實現分布式鎖。</p><p>2、基本命令解析</p><p>1)setNX(SET?if?Not?eXists)</p><p>語法:</p><pre class="brush:js;toolbar:false">SETNX?key?value
將?key?的值設為?value?,當且僅當?key?不存在。
若給定的?key?已經存在,則?SETNX?不做任何動作。
SETNX?是『SET if Not eXists』(如果不存在,則 SET)的簡寫
返回值:
設置成功,返回?1?。
設置失敗,返回?0?。
?例子:
redis>?EXISTS?job????????????????#?job?不存在 (integer)?0 redis>?SETNX?job?"programmer"????#?job?設置成功 (integer)?1 redis>?SETNX?job?"code-farmer"???#?嘗試覆蓋?job?,失敗 (integer)?0 redis>?GET?job???????????????????#?沒有被覆蓋 "programmer"
所以我們使用執行下面的命令
SETNX?lock.foo?<current></current>
如返回1,則該客戶端獲得鎖,把lock.foo的鍵值設置為時間值表示該鍵已被鎖定,該客戶端最后可以通過DEL lock.foo來釋放該鎖。
如返回0,表明該鎖已被其他客戶端取得,這時我們可以先返回或進行重試等對方完成或等待鎖超時。
2)getSET
語法:
GETSET?key?value
將給定?key?的值設為?value?,并返回?key?的舊值(old value)。
當?key?存在但不是字符串類型時,返回一個錯誤。
返回值:
返回給定?key?的舊值。
當?key?沒有舊值時,也即是,?key?不存在時,返回?nil?。
3)get
語法:
GET?key
返回值:
當?key?不存在時,返回?nil?,否則,返回?key?的值。
如果?key?不是字符串類型,那么返回一個錯誤
四、解決死鎖
上面的鎖定邏輯有一個問題:如果一個持有鎖的客戶端失敗或崩潰了不能釋放鎖,該怎么解決?
我們可以通過鎖的鍵對應的時間戳來判斷這種情況是否發生了,如果當前的時間已經大于lock.foo的值,說明該鎖已失效,可以被重新使用。
發生這種情況時,可不能簡單的通過DEL來刪除鎖,然后再SETNX一次(講道理,刪除鎖的操作應該是鎖擁有這執行的,這里只需要等它超時即可),當多個客戶端檢測到鎖超時后都會嘗試去釋放它,這里就可能出現一個競態條件,讓我們模擬一下這個場景:?
C0操作超時了,但它還持有著鎖,C1和C2讀取lock.foo檢查時間戳,先后發現超時了。 C1 發送DEL lock.foo C1 發送SETNX lock.foo 并且成功了。 C2 發送DEL lock.foo C2 發送SETNX lock.foo 并且成功了。 這樣一來,C1,C2都拿到了鎖!問題大了!
幸好這種問題是可以避免的,讓我們來看看C3這個客戶端是怎樣做的:?
C3發送SETNX lock.foo 想要獲得鎖,由于C0還持有鎖,所以Redis返回給C3一個0?
C3發送GET lock.foo 以檢查鎖是否超時了,如果沒超時,則等待或重試。?
反之,如果已超時,C3通過下面的操作來嘗試獲得鎖:?
GETSET lock.foo
? 通過GETSET,C3拿到的時間戳如果仍然是超時的,那就說明,C3如愿以償拿到鎖了。?
如果在C3之前,有個叫C4的客戶端比C3快一步執行了上面的操作,那么C3拿到的時間戳是個未超時的值,這時,C3沒有如期獲得鎖,需要再次等待或重試。留意一下,盡管C3沒拿到鎖,但它改寫了C4設置的鎖的超時值,不過這一點非常微小的誤差帶來的影響可以忽略不計。
注意:為了讓分布式鎖的算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起,操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。??
五、代碼實現
expireMsecs 鎖持有超時,防止線程在入鎖以后,無限的執行下去,讓鎖無法釋放?
timeoutMsecs 鎖等待超時,防止線程饑餓,永遠沒有入鎖執行代碼的機會?
注意:項目里面需要先搭建好redis的相關配置
import?org.slf4j.Logger; import?org.slf4j.LoggerFactory; import?org.springframework.dao.DataAccessException; import?org.springframework.data.redis.connection.RedisConnection; import?org.springframework.data.redis.core.RedisCallback; import?org.springframework.data.redis.core.RedisTemplate; import?org.springframework.data.redis.serializer.StringRedisSerializer; /** ?*?Redis?distributed?lock?implementation. ?* ?*?@author?zhengcanrui ?*/ public?class?RedisLock?{ ????private?static?Logger?logger?=?LoggerFactory.getLogger(RedisLock.class); ????private?RedisTemplate?redisTemplate; ????private?static?final?int?DEFAULT_ACQUIRY_RESOLUTION_MILLIS?=?100; ????/** ?????*?Lock?key?path. ?????*/ ????private?String?lockKey; ????/** ?????*?鎖超時時間,防止線程在入鎖以后,無限的執行等待 ?????*/ ????private?int?expireMsecs?=?60?*?1000; ????/** ?????*?鎖等待時間,防止線程饑餓 ?????*/ ????private?int?timeoutMsecs?=?10?*?1000; ????private?volatile?boolean?locked?=?false; ????/** ?????*?Detailed?constructor?with?default?acquire?timeout?10000?msecs?and?lock?expiration?of?60000?msecs. ?????* ?????*?@param?lockKey?lock?key?(ex.?account:1,?...) ?????*/ ????public?RedisLock(RedisTemplate?redisTemplate,?String?lockKey)?{ ????????this.redisTemplate?=?redisTemplate; ????????this.lockKey?=?lockKey?+?"_lock"; ????} ????/** ?????*?Detailed?constructor?with?default?lock?expiration?of?60000?msecs. ?????* ?????*/ ????public?RedisLock(RedisTemplate?redisTemplate,?String?lockKey,?int?timeoutMsecs)?{ ????????this(redisTemplate,?lockKey); ????????this.timeoutMsecs?=?timeoutMsecs; ????} ????/** ?????*?Detailed?constructor. ?????* ?????*/ ????public?RedisLock(RedisTemplate?redisTemplate,?String?lockKey,?int?timeoutMsecs,?int?expireMsecs)?{ ????????this(redisTemplate,?lockKey,?timeoutMsecs); ????????this.expireMsecs?=?expireMsecs; ????} ????/** ?????*?@return?lock?key ?????*/ ????public?String?getLockKey()?{ ????????return?lockKey; ????} ????private?String?get(final?String?key)?{ ????????Object?obj?=?null; ????????try?{ ????????????obj?=?redisTemplate.execute(new?RedisCallback<object>()?{ ????????????????@Override ????????????????public?Object?doInRedis(RedisConnection?connection)?throws?DataAccessException?{ ????????????????????StringRedisSerializer?serializer?=?new?StringRedisSerializer(); ????????????????????byte[]?data?=?connection.get(serializer.serialize(key)); ????????????????????connection.close(); ????????????????????if?(data?==?null)?{ ????????????????????????return?null; ????????????????????} ????????????????????return?serializer.deserialize(data); ????????????????} ????????????}); ????????}?catch?(Exception?e)?{ ????????????logger.error("get?redis?error,?key?:?{}",?key); ????????} ????????return?obj?!=?null???obj.toString()?:?null; ????} ????private?boolean?setNX(final?String?key,?final?String?value)?{ ????????Object?obj?=?null; ????????try?{ ????????????obj?=?redisTemplate.execute(new?RedisCallback<object>()?{ ????????????????@Override ????????????????public?Object?doInRedis(RedisConnection?connection)?throws?DataAccessException?{ ????????????????????StringRedisSerializer?serializer?=?new?StringRedisSerializer(); ????????????????????Boolean?success?=?connection.setNX(serializer.serialize(key),?serializer.serialize(value)); ????????????????????connection.close(); ????????????????????return?success; ????????????????} ????????????}); ????????}?catch?(Exception?e)?{ ????????????logger.error("setNX?redis?error,?key?:?{}",?key); ????????} ????????return?obj?!=?null???(Boolean)?obj?:?false; ????} ????private?String?getSet(final?String?key,?final?String?value)?{ ????????Object?obj?=?null; ????????try?{ ????????????obj?=?redisTemplate.execute(new?RedisCallback<object>()?{ ????????????????@Override ????????????????public?Object?doInRedis(RedisConnection?connection)?throws?DataAccessException?{ ????????????????????StringRedisSerializer?serializer?=?new?StringRedisSerializer(); ????????????????????byte[]?ret?=?connection.getSet(serializer.serialize(key),?serializer.serialize(value)); ????????????????????connection.close(); ????????????????????return?serializer.deserialize(ret); ????????????????} ????????????}); ????????}?catch?(Exception?e)?{ ????????????logger.error("setNX?redis?error,?key?:?{}",?key); ????????} ????????return?obj?!=?null???(String)?obj?:?null; ????} ????/** ?????*?獲得?lock. ?????*?實現思路:?主要是使用了redis?的setnx命令,緩存了鎖. ?????*?reids緩存的key是鎖的key,所有的共享,?value是鎖的到期時間(注意:這里把過期時間放在value了,沒有時間上設置其超時時間) ?????*?執行過程: ?????*?1.通過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖 ?????*?2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值 ?????* ?????*?@return?true?if?lock?is?acquired,?false?acquire?timeouted ?????*?@throws?InterruptedException?in?case?of?thread?interruption ?????*/ ????public?synchronized?boolean?lock()?throws?InterruptedException?{ ????????int?timeout?=?timeoutMsecs; ????????while?(timeout?>=?0)?{ ????????????long?expires?=?System.currentTimeMillis()?+?expireMsecs?+?1; ????????????String?expiresStr?=?String.valueOf(expires);?//鎖到期時間 ????????????if?(this.setNX(lockKey,?expiresStr))?{ ????????????????//?lock?acquired ????????????????locked?=?true; ????????????????return?true; ????????????} ????????????String?currentValueStr?=?this.get(lockKey);?//redis里的時間 ????????????if?(currentValueStr?!=?null?&&?Long.parseLong(currentValueStr)?<p>調用:</p> <pre class="brush:js;toolbar:false">RedisLock?lock?=?new?RedisLock(redisTemplate,?key,?10000,?20000); ?try?{ ????????????if(lock.lock())?{ ???????????????????//需要加鎖的代碼 ????????????????} ????????????} ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????}finally?{ ????????????//為了讓分布式鎖的算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起, ????????????//操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。?————這里沒有做 ????????????lock.unlock(); ????????}
六、一些問題
1、為什么不直接使用expire設置超時時間,而將時間的毫秒數其作為value放在redis中?
如下面的方式,把超時的交給redis處理:
lock(key,?expireSec){ isSuccess?=?setnx?key if?(isSuccess) expire?key?expireSec }
這種方式貌似沒什么問題,但是假如在setnx后,redis崩潰了,expire就沒有執行,結果就是死鎖了。鎖永遠不會超時。
?2、為什么前面的鎖已經超時了,還要用getSet去設置新的時間戳的時間獲取舊的值,然后和外面的判斷超時時間的時間戳比較呢?
因為是分布式的環境下,可以在前一個鎖失效的時候,有兩個進程進入到鎖超時的判斷。如:
C0超時了,還持有鎖,C1/C2同時請求進入了方法里面
C1/C2獲取到了C0的超時時間
C1使用getSet方法
C2也執行了getSet方法
假如我們不加?oldValueStr.equals(currentValueStr) 的判斷,將會C1/C2都將獲得鎖,加了之后,能保證C1和C2只能一個能獲得鎖,一個只能繼續等待。
注意:這里可能導致超時時間不是其原本的超時時間,C1的超時時間可能被C2覆蓋了,但是他們相差的毫秒及其小,這里忽略了。
更多redis知識請關注redis入門教程欄目。