在分布式系統中,各個進程(本文使用進程來描述分布式系統中的運行主體,它們可以在同一個物理節點上也可以在不同的物理節點上)相互之間通常是需要協調進行運作的,有時是不同進程所處理的數據有依賴關系,必須按照一定的次序進行處理,有時是在一些特定的時間需要某個進程處理某些事務等等,人們通常會使用分布式鎖、選舉算法等技術來協調各個進程之間的行為。因為分布式系統本身的復雜特性,以及對于容錯性的要求,這些技術通常是重量級的,比如 Paxos 算法,欺負選舉算法,ZooKeeper 等,側重于消息的通信而不是共享內存,通常也是出了名的復雜和難以理解,當在具體的實現和實施中遇到問題時都是一個挑戰。
Redis 經常被人們認為是一種 NoSQL 軟件,但其本質上是一種分布式的數據結構服務器軟件,提供了一個分布式的基于內存的數據結構存儲服務。在實現上,僅使用一個線程來處理具體的內存數據結構,保證它的數據操作命令的原子特性;它同時還支持基于 Lua 的腳本,每個 Redis 實例使用同一個 Lua 解釋器來解釋運行 Lua 腳本,從而 Lua 腳本也具備了原子特性,這種原子操作的特性使得基于共享內存模式的分布式系統的協調方式成了可能,而且具備了很大的吸引力,和復雜的基于消息的機制不同,基于共享內存的模式對于很多技術人員來說明顯容易理解的多,特別是那些已經了解多線程或多進程技術的人。在具體實踐中,也并不是所有的分布式系統都像分布式數據庫系統那樣需要嚴格的模型的,而所使用的技術也不一定全部需要有堅實的理論基礎和數學證明,這就使得基于 Redis 來實現分布式系統的協調技術具備了一定的實用價值,實際上,人們也已經進行了不少嘗試。本文就其中的一些協調技術進行介紹。
signal/wait 操作
在分布式系統中,有些進程需要等待其它進程的狀態的改變,或者通知其它進程自己的狀態的改變,比如,進程之間有操作上的依賴次序時,就有進程需要等待,有進程需要發射信號通知等待的進程進行后續的操作,這些工作可以通過 Redis 的 Pub/Sub 系列命令來完成,比如:
import random
single_cast_script="""
local channels = redis.call('pubsub', 'channels', ARGV[1]..'*');
if #channels == 0
then
return 0;
end;
local index= math.mod(math.floor(tonumber(ARGV[2])), #channels) + 1;
return redis.call( 'publish', channels[index], ARGV[3]); """
def wait_single( channel, myid):
return wait( channel + myid )
def signal_single( channel, data):
rand_num = int(random.random() * 65535)
return rc.eval( single_cast_script, 0, channel, str(rand_num), str(data) )
鎖的一個簡單直接的實現方法就是用 SET NX 命令設置一個設定了存活周期 TTL 的 Key 來獲取鎖,通過刪除 Key 來釋放鎖,通過存活周期來保證避免死鎖。不過這個方法存在單點故障風險,如果部署了 master/slave 節點,則在特定條件下可能會導致安全性方面的沖突,比如:
在 Redlock 算法中,通過類似于下面這樣的命令進行加鎖:
SET resource_name my_random_value NX PX 30000
if redis.call("get",KEYS[1]) == ARGV[1] then return
redis.call("del",KEYS[1])else return 0end
Redlock 算法不需要保證 Redis 節點之間的時鐘是同步的(不論是物理時鐘還是邏輯時鐘),這點和傳統的一些基于同步時鐘的分布式鎖算法有所不同。Redlock 算法的具體的細節可以參閱 Redis 的官方文檔,以及文檔中列出的多種語言版本的實現。
選舉算法
在分布式系統中,經常會有些事務是需要在某個時間段內由一個進程來完成,或者由一個進程作為 leader 來協調其它的進程,這個時候就需要用到選舉算法,傳統的選舉算法有欺負選舉算法(霸道選舉算法)、環選舉算法、Paxos 算法、Zab 算法 (ZooKeeper) 等,這些算法有些依賴于消息的可靠傳遞以及時鐘同步,有些過于復雜,難以實現和驗證。新的 Raft 算法相比較其它算法來說已經容易了很多,不過它仍然需要依賴心跳廣播和邏輯時鐘,leader 需要不斷地向 follower 廣播消息來維持從屬關系,節點擴展時也需要其它算法配合。
選舉算法和分布式鎖有點類似,任意時刻最多只能有一個 leader 資源。當然,我們也可以用前面描述的分布式鎖來實現,設置一個 leader 資源,獲得這個資源鎖的為 leader,鎖的生命周期過了之后,再重新競爭這個資源鎖。這是一種競爭性的算法,這個方法會導致有比較多的空檔期內沒有 leader 的情況,也不好實現 leader 的連任,而 leader 的連任是有比較大的好處的,比如 leader 執行任務可以比較準時一些,查看日志以及排查問題的時候也方便很多,如果我們需要一個算法實現 leader 可以連任,那么可以采用這樣的方法:
import redis
rc = redis.Redis()
local_selector = 0def master():
global local_selector
master_selector = rc.incr('master_selector')
if master_selector == 1:
# initial / restarted
local_selector = master_selector
else:
if local_selector > 0: # I'm the master before
if local_selector > master_selector: # lost, maybe the db is fail-overed.
local_selector = 0
else: # continue to be the master
local_selector = master_selector
if local_selector > 0: # I'm the current master
rc.expire('master_selector', 20) return local_selector > 0
這個算法鼓勵連任,只有當前的 leader 發生故障或者執行某個任務所耗時間超過了任期、或者 Redis 節點發生故障恢復之后才需要重新選舉出新的 leader。在 master/slave 模式下,如果 master 節點發生故障,某個 slave 節點提升為新的 master 節點,即使當時 master_selector 值尚未能同步成功,也不會導致出現兩個 leader 的情況。如果某個 leader 一直連任,則 master_selector 的值會一直遞增下去,考慮到 master_selector 是一個 64 位的整型類型,在可預見的時間內是不可能溢出的,加上每次進行 leader 更換的時候 master_selector 會重置為從 1 開始,這種遞增的方式是可以接受的,但是碰到 Redis 客戶端(比如 Node.js)不支持 64 位整型類型的時候就需要針對這種情況作處理。如果當前 leader 進程處理時間超過了任期,則其它進程可以重新生成新的 leader 進程,老的 leader 進程處理完畢事務后,如果新的 leader 的進程經歷的任期次數超過或等于老的 leader 進程的任期次數,則可能會出現兩個 leader 進程,為了避免這種情況,每個 leader 進程在處理完任期事務之后都應該檢查一下自己的處理時間是否超過了任期,如果超過了任期,則應當先設置 local_selector 為 0 之后再調用 master 檢查自己是否是 leader 進程。
消息隊列
消息隊列是分布式系統之間的通信基本設施,通過消息可以構造復雜的進程間的協調操作和互操作。Redis 也提供了構造消息隊列的原語,比如 Pub/Sub 系列命令,就提供了基于訂閱/發布模式的消息收發方法,但是 Pub/Sub 消息并不在 Redis 內保持,從而也就沒有進行持久化,適用于所傳輸的消息即使丟失了也沒有關系的場景。
如果要考慮到持久化,則可以考慮 list 系列操作命令,用 PUSH 系列命令(LPUSH, RPUSH 等)推送消息到某個 list,用 POP 系列命令(LPOP, RPOP,BLPOP,BRPOP 等)獲取某個 list 上的消息,通過不同的組合方式可以得到 FIFO,FILO,比如:
import redis
rc = redis.Redis()
def fifo_push(q, data):
rc.lpush(q, data)
def fifo_pop(q):
return rc.rpop(q)
def filo_push(q, data):
rc.lpush(q, data)
def filo_pop(q):
return rc.lpop(q)
def safe_fifo_push(q, data):
rc.lpush(q, data)
def safe_fifo_pop(q, cache):
msg = rc.rpoplpush(q, cache) # check and do something on msg
rc.lrem(cache, 1) # remove the msg in cache list. return msg
新聞熱點
疑難解答