分布式鎖
經(jīng)常用于在解決分布式環(huán)境下的業(yè)務(wù)一致性和協(xié)調(diào)分布式環(huán)境。
實(shí)際業(yè)務(wù)場景中,比如說解決并發(fā)一瞬間的重復(fù)下單,重復(fù)確認(rèn)收貨,重復(fù)發(fā)現(xiàn)金券等。
使用分布式鎖的場景一般不能太多。
開源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock
開源相關(guān)群: .net 開源基礎(chǔ)服務(wù) 238543768
這里整理了C#.net關(guān)于redis分布式鎖和zookeeper分布式鎖的實(shí)現(xiàn),僅用于研究。(可能有bug)
采用ServiceStack.Redis實(shí)現(xiàn)Redis分布式鎖
/* * Redis分布式鎖 * 采用ServiceStack.Redis實(shí)現(xiàn)的Redis分布式鎖 * 詳情可閱讀其開源代碼 * 備注:不同版本的 ServiceStack.Redis 實(shí)現(xiàn)reidslock機(jī)制不同 xxf里面默認(rèn)使用2.2版本 */ public class RedisDistributedLock : BaseRedisDistributedLock { PRivate ServiceStack.Redis.RedisLock _lock; private RedisClient _client; public RedisDistributedLock(string redisserver, string key) : base(redisserver, key) { } public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut) { if (lockresult == LockResult.Success) throw new DistributedLockException("檢測到當(dāng)前鎖已獲取"); _client = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient(); /* * 閱讀源碼發(fā)現(xiàn)當(dāng)其獲取鎖后,redis連接資源會一直占用,知道獲取鎖的資源釋放后,連接才會跳出,可能會導(dǎo)致連接池資源的浪費(fèi)。 */
try { this._lock = new ServiceStack.Redis.RedisLock(_client, key, getlockTimeOut); lockresult = LockResult.Success; } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統(tǒng)級別嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult = LockResult.LockSystemExceptionFailure; } return lockresult; } public override void Dispose() { try { if (this._lock != null) this._lock.Dispose(); if (_client != null) this._client.Dispose(); } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } }
來自網(wǎng)絡(luò)的java實(shí)現(xiàn)Redis分布式鎖(C#版)
/* * Redis分布式鎖 * 采用網(wǎng)絡(luò)上java實(shí)現(xiàn)的Redis分布式鎖 * 參考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html * 詳情可閱讀其開源代碼 */ public class RedisDistributedLockFromJava : BaseRedisDistributedLock { public RedisDistributedLockFromJava(string redisserver, string key) : base(redisserver, key) { } public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut) { if (lockresult == LockResult.Success) throw new DistributedLockException("檢測到當(dāng)前鎖已獲取"); try { // 1. 通過SETNX試圖獲取一個lock
string @lock = key; long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime); long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : 0); long hassleepMilliseconds = 0; while (true) { using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + 1; /*Java以前版本都是用SetNX,但是這種是無法設(shè)置超時時間的,不是很理解為什么,
* 可能是因?yàn)樵瓉淼膔edis命令比較少導(dǎo)致的?現(xiàn)在用Add不知道效果如何.
因?qū)edis細(xì)節(jié)不了解,但個人懷疑若異常未釋放鎖經(jīng)常發(fā)生,可能會導(dǎo)致內(nèi)存逐步溢出*/
bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime)); //SETNX成功,則成功獲取一個鎖 if (acquired == true) { lockresult = LockResult.Success; } //SETNX失敗,說明鎖仍然被其他對象保持,檢查其是否已經(jīng)超時
else
{ var oldValueBytes = redisclient.Get(@lock); //超時 if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, 0) < CurrentUnixTimeMillis()) { /*此處雖然重設(shè)并獲取鎖,但是超時時間可能被覆蓋,故重設(shè)超時時間;若有進(jìn)程一直在嘗試獲取鎖,那么鎖存活時間應(yīng)該被延遲*/
var getValueBytes = redisclient.GetSet(@lock, BitConverter.GetBytes(value)); var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//這里如果程序異常終止,依然會有部分鎖未釋放的情況。 // 獲取鎖成功 if (getValueBytes == oldValueBytes) { lockresult = LockResult.Success; } // 已被其他進(jìn)程捷足先登了 else { lockresult = LockResult.GetLockTimeOutFailure; } } //未超時,則直接返回失敗 else { lockresult = LockResult.GetLockTimeOutFailure; } } } //成功拿到鎖 if (lockresult == LockResult.Success) break; //獲取鎖超時 if (hassleepMilliseconds >= getlockexpiredMilliseconds) { lockresult = LockResult.GetLockTimeOutFailure; break; } //繼續(xù)等待 System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime); hassleepMilliseconds += DistributedLockConfig.GetLockFailSleepTime; } } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統(tǒng)級別嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult = LockResult.LockSystemExceptionFailure; } return lockresult; } private long CurrentUnixTimeMillis() { return (long)(System.DateTime.UtcNow - new System.DateTime(1970, 1, 1, 0, 0, 0, System.DateTimeKind.Utc)).TotalMilliseconds; } public override void Dispose() { if (lockresult == LockResult.Success || lockresult == LockResult.LockSystemExceptionFailure) { try { long current = CurrentUnixTimeMillis(); using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { var v = redisclient.Get(key); if (v != null) { // 避免刪除非自己獲取得到的鎖 if (current < BitConverter.ToInt64(v, 0)) { redisclient.Del(key); } } } } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } } }
ServiceStack.Redis內(nèi)部實(shí)現(xiàn)版本(較舊)
/* * Redis分布式鎖 * 采用ServiceStack.Redis實(shí)現(xiàn)的Redis分布式鎖 * 詳情可閱讀其開源代碼 * 備注:不同版本的 ServiceStack.Redis 實(shí)現(xiàn)reidslock機(jī)制不同 * 拷貝自網(wǎng)絡(luò)開源代碼 較舊的實(shí)現(xiàn)版本 */ public class RedisDistributedLockFromServiceStack : BaseRedisDistributedLock { public RedisDistributedLockFromServiceStack(string redisserver, string key) : base(redisserver, key) { } public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut) { if (lockresult == LockResult.Success) throw new DistributedLockException("檢測到當(dāng)前鎖已獲取"); try { using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { ExecExtensions.RetryUntilTrue( () => { //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx //Calculate a unix time for when the lock should expire
TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime); //new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year DateTime expireTime = DateTime.UtcNow.Add(realSpan); string lockString = (expireTime.ToUnixTimeMs() + 1).ToString(); //Try to set the lock, if it does not exist this will succeed and the lock is obtained
var nx = redisClient.SetEntryIfNotExists(key, lockString); if (nx) { lockresult = LockResult.Success; return true; } //If we've gotten here then a key for the lock is present. This could be because the lock is
//correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
//Therefore we need to get the value of the lock to see when it should expire redisClient.Watch(key); string lockExpireString = redisClient.Get<string>(key); long lockExpireTime; if (!long.TryParse(lockExpireString, out lockExpireTime)) { redisClient.UnWatch(); // since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure; return false; } //If the expire time is greater than the current time then we can't let the lock go yet
if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs()) { redisClient.UnWatch(); // since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure; return false; } //If the expire time is less than the current time then it wasn't released properly and we can attempt to //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes //before we call Commit() below, the Commit will fail and return false, which means that another thread //was able to acquire the lock before we finished processing. using (var trans = redisClient.CreateTransaction()) // we started the "Watch" above; this tx will succeed if the value has not moved { trans.QueueCommand(r => r.Set(key, lockString)); //return trans.Commit(); //returns false if Transaction failed var t = trans.Commit(); if (t == false) lockresult = LockResult.GetLockTimeOutFailure; else lockresult = LockResult.Success; return t; } }, getlockTimeOut ); } } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統(tǒng)級別嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult = LockResult.LockSystemExceptionFailure; } return lockresult; } public override void Dispose() { try { using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { redisClient.Remove(key); } } catch (Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴(yán)重異常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } }
Zookeeper 版本實(shí)現(xiàn)分布式鎖
/* * 來源java網(wǎng)絡(luò)源碼的zookeeper分布式鎖實(shí)現(xiàn)(目前僅翻譯并簡單測試ok,未來集成入sdk) * 備注: 共享鎖在同一個進(jìn)程中很容易實(shí)現(xiàn),但是在跨進(jìn)程或者在不同 Server 之間就不好實(shí)現(xiàn)了。Zookeeper 卻很容易實(shí)現(xiàn)這個功能,實(shí)現(xiàn)方式也是需要獲得鎖的 Server 創(chuàng)建一個 EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),
然后調(diào)用 getChildren方法獲取當(dāng)前的目錄節(jié)點(diǎn)列表中最小的目錄節(jié)點(diǎn)是不是就是自己創(chuàng)建的目錄節(jié)點(diǎn),如果正是自己創(chuàng)建的,那么它就獲得了這個鎖,
如果不是那么它就調(diào)用 exists(String path, boolean watch) 方法并監(jiān)控 Zookeeper 上目錄節(jié)點(diǎn)列表的變化,一直到自己創(chuàng)建的節(jié)點(diǎn)是列表中最小編號的目錄節(jié)點(diǎn),
從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創(chuàng)建的目錄節(jié)點(diǎn)就行了。 */ public class ZooKeeprDistributedLockFromJava : IWatcher { private ZooKeeper zk; private string root = "/locks"; //根 private string lockName; //競爭資源的標(biāo)志 private string waitNode; //等待前一個鎖 private string myZnode; //當(dāng)前鎖 //private CountDownLatch latch; //計數(shù)器 private AutoResetEvent autoevent; private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(30000); private IList<Exception> exception = new List<Exception>(); /// <summary> /// 創(chuàng)建分布式鎖,使用前請確認(rèn)config配置的zookeeper服務(wù)可用 </summary> /// <param name="config"> 127.0.0.1:2181 </param> /// <param name="lockName"> 競爭資源標(biāo)志,lockName中不能包含單詞lock </param> public ZooKeeprDistributedLockFromJava(string config, string lockName) { this.lockName = lockName; // 創(chuàng)建一個與服務(wù)器的連接 try
{ zk = new ZooKeeper(config, sessionTimeout, this); var stat = zk.Exists(root, false); if (stat == null) { // 創(chuàng)建根節(jié)點(diǎn) zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); } } catch (KeeperException e) { throw e; } } /// <summary> /// zookeeper節(jié)點(diǎn)的監(jiān)視器 /// </summary> public virtual void Process(WatchedEvent @event) { if (this.autoevent != null) { this.autoevent.Set(); } } public virtual bool tryLock() { try { string splitStr = "_lock_"; if (lockName.Contains(splitStr)) { //throw new LockException("lockName can not contains //u000B");
} //創(chuàng)建臨時子節(jié)點(diǎn) myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential); Console.WriteLine(myZnode + " is created "); //取出所有子節(jié)點(diǎn) IList<string> subNodes = zk.GetChildren(root, false); //取出所有l(wèi)ockName的鎖 IList<string> lockObjNodes = new List<string>(); foreach (string node in subNodes) { if (node.StartsWith(lockName)) { lockObjNodes.Add(node); } } Array alockObjNodes = lockObjNodes.ToArray(); Array.Sort(alockObjNodes); Console.WriteLine(myZnode + "==" + lockObjNodes[0]); if (myZnode.Equals(root + "/" + lockObjNodes[0])) { //如果是最小的節(jié)點(diǎn),則表示取得鎖 return true; } //如果不是最小的節(jié)點(diǎn),找到比自己小1的節(jié)點(diǎn) string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1); waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1]; } catch (KeeperException e) { throw e; } return false; } public virtual bool tryLock(TimeSpan time) { try { if (this.tryLock()) { return true; } return waitForLock(waitNode, time); } catch (KeeperException e) { throw e; } return false; } private bool waitForLock(string lower, TimeSpan waitTime) { var stat = zk.Exists(root + "/" + lower, true); //判斷比自己小一個數(shù)的節(jié)點(diǎn)是否存在,如果不存在則無需等待鎖,同時注冊監(jiān)聽 if (stat != null) { Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower); autoevent = new AutoResetEvent(false); bool r = autoevent.WaitOne(waitTime); autoevent.Dispose(); autoevent = null; return r; } else return true; } public virtual void unlock() { try { Console.WriteLine("unlock " + myZnode); zk.Delete(myZnode, -1); myZnode = null; zk.Dispose(); } catch (KeeperException e) { throw e; } } }
以上代碼僅做參考,未壓測。
代碼粘貼有些問題,詳細(xì)請下載開源包運(yùn)行研究。
新聞熱點(diǎn)
疑難解答