開源QQ群: .net 開源基礎(chǔ)服務(wù) 238543768
開源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ
## 業(yè)務(wù)消息隊列 ##
業(yè)務(wù)消息隊列是應(yīng)用于業(yè)務(wù)的解耦和分離,應(yīng)具備分布式,高可靠性,高性能,高實時性,高穩(wěn)定性,高擴展性等特性。
## 優(yōu)點: ##
- 大量的業(yè)務(wù)消息堆積能力
- 無單點故障及故障監(jiān)控,異常提醒
- 生產(chǎn)者端負載均衡,故障轉(zhuǎn)移,故障自動恢復(fù),并行消息插入。
- 消費者端負載均衡,故障保持,故障自動恢復(fù),并行消息消費。
- 消息高可靠性持久化,較高性能,較高實時性,高穩(wěn)定性,高擴展性。
- 支持99*99個消息分區(qū),單個消息分區(qū)單天支持近1億的消息存儲。
- 消費者拉方式獲取消息,在高并發(fā),大量消息涌入的情況下,只要消費能力足夠,不會有消息延遲,消息越多性能越好。
## 缺點: ##
- 能保證消息順序插入,保證相同分區(qū)的消息是順序的(排除網(wǎng)絡(luò)延遲),但是多個分區(qū)之間的可能是亂序的。
- 消息并行消費或者多個分區(qū)并行消費或者負載均衡情況下的,消息消費順序是亂序。
## 缺點原因: ##
- 消息的負載均衡是基于消息的分區(qū)存儲,故多個分區(qū)之間的消息是亂序的,但是相同分區(qū)的消息是順序的。
- 消息的消費者負載均衡也是基于消息的分區(qū)進行均衡的,同時單個消費者訂閱多個分區(qū)的情況下,也可并行進行消費。意味著不同分區(qū)的消息的消費是亂序的,但是相同分區(qū)的消息消費是順序的。
## 缺點解決方案: ##
- 生產(chǎn)者自定義負載均衡算法,按照業(yè)務(wù)維度(用戶,商戶)等進行分區(qū)(多個用戶之間可以消息亂序,單個用戶的消息必須是順序的),不同維度可以指向不同的分區(qū),但是單個維度的消息是可以保證順序的。
- 本解決方案在故障的情況下,故障會移除某些故障節(jié)點,意味著故障節(jié)點會立即報錯(當然也可自己指定故障節(jié)點進行轉(zhuǎn)移,但是轉(zhuǎn)移的節(jié)點消息會被提前消費,故障的消息會在恢復(fù)故障后重新消費,這樣也會出現(xiàn)故障程度上的消息亂序消費)。
- 本解決方案在線上無縫擴容和擴展性能方面也會有限制,看要具體的負載均衡算法,但是一般情況下,如果要擴容還是會進行部分消息遷移的情況。
## 問答: ##
### *1.大量的業(yè)務(wù)消息堆積能力,如何實現(xiàn)?* ###
每個分區(qū)表支持約1億的消息存儲,可以通過增加分區(qū)表進行擴容。消費者進行消息消費,內(nèi)部僅保留某個分區(qū)上一次消費的指針,所以不會影響消費者。
消息持久化到磁盤,不會在內(nèi)存駐留,理論上不影響內(nèi)存。
### *2.無單點故障及故障監(jiān)控,異常提醒?* ###
故障一般會發(fā)生在redis,數(shù)據(jù)節(jié)點,管理中心,日志中心。
redis節(jié)點故障會影響消費者的消息消費響應(yīng)及時度,一般延遲5s以內(nèi)。不會影響消息消費速度和消息消費QPS
數(shù)據(jù)節(jié)點故障會影響生產(chǎn)者和消費者的消息,并造成消息暫時丟失(但是都是可恢復(fù)的,具體的看數(shù)據(jù)庫的高可用做到什么程度)。
生產(chǎn)者端會無縫的進行節(jié)點移除,但是會默認1分鐘重新嘗試重連。消費者會持續(xù)報錯至日志中,但是不會影響其他分區(qū)消費。
管理中心故障會影響生產(chǎn)者和消費者的心跳檢測和新注冊的生產(chǎn)者,消費者,但不會影響生產(chǎn)者和消費者具體的消息存儲和發(fā)送接收。
日志中心故障不會影響生產(chǎn)者和消費者,但是影響日志的打印,日志中心故障會通知公司內(nèi)部監(jiān)控平臺。
雖然故障不會影響線上已有的消息運行,但是還是會在高并發(fā)情況下出現(xiàn)性能問題,和系統(tǒng)穩(wěn)定性,所以一旦發(fā)現(xiàn)要重視和及時處理。
### *3.生產(chǎn)者端負載均衡,故障轉(zhuǎn)移,故障自動恢復(fù),并行消息插入?* ###
默認負載均衡采用多個分區(qū)順序輪詢插入,在并發(fā)情況下輪詢插入是并行插入到不同分區(qū)的;某個數(shù)據(jù)節(jié)點出現(xiàn)故障,會移除相關(guān)數(shù)據(jù)節(jié)點的所有分區(qū);
默認1分鐘會重新載入故障分區(qū)進行重試。
### *4.消費者端負載均衡,故障保持,故障自動恢復(fù),并行消息消費。* ###
默認消費者端負載均衡是根據(jù)消費者訂閱的分區(qū)進行的(一個消費者可以訂閱多個分區(qū),多個相同業(yè)務(wù)的消費者可以訂閱多個不同分區(qū)進行負載)。
一個消費者訂閱多個分區(qū),這個消費者可以開啟并行進行多分區(qū)消費。并行度=分區(qū)數(shù),效果理論上最佳。
分區(qū)節(jié)點出現(xiàn)故障等,單個分區(qū)或者數(shù)據(jù)節(jié)點就會暫停消費,并通知日志中心打印錯誤日志。當故障恢復(fù)后,消費繼續(xù)進行。
### *5.消息高可靠性持久化,較高性能,較高實時性,高穩(wěn)定性,高穩(wěn)定性。* ###
消息傳遞到消息中心后,立即持久化到磁盤,故不會丟失消息。生產(chǎn)者可以采用多個分區(qū)進行并行插入,消費者可以采用并行進行消息消費,故理論上性能是可擴展無限量的。
消息是通過拉取的方式獲取的,發(fā)送消息會由redis進行即時通知消費者拉取(即時消息默認會合并在500ms內(nèi)redis通知消息),一般在20ms內(nèi)消息會被消費掉。
批量拉消息的方式相對push的消息推送方式在高并發(fā)和大量消息處理的情況下,消息發(fā)送性能應(yīng)該是更優(yōu)的。
穩(wěn)定性是基于數(shù)據(jù)庫的穩(wěn)定性和故障轉(zhuǎn)移層面來確保的,擴展性體現(xiàn)在線上無縫的遷移和擴容。
### *6.支持9999個消息分區(qū),單個消息分區(qū)單天支持近1億的消息存儲。* ###
數(shù)據(jù)節(jié)點是01~99個,節(jié)點里面的表分區(qū)是01~99個,所以可以支持近1萬個分區(qū)節(jié)點。單表的mqid最大應(yīng)該是(1億-1)條,應(yīng)該滿足一般的業(yè)務(wù)需求,
若不能滿足,可以通過多個分區(qū)的方式擴容。
### *7.消費者拉方式獲取消息,在高并發(fā),大量消息涌入的情況下,只要消費能力足夠,不會有消息延遲,消息越多性能越好。* ###
push推消息的模式能保證更高的實時性,但是在大量消息的情況下,消息堆積的情況更嚴重,性能會有所影響。
pull拉消息的模式在保證消息實時性方面會略差,但是在大量消息涌入的情況下,批量拉消息效率更加。而且會將消息分發(fā)的負載轉(zhuǎn)移到多個消費者端上。
## 未來改進: ##
1. 未來采用leveldb重寫存儲。
1. 剝離broker服務(wù)用于支持相對可靠的消息服務(wù)。
1. 消息完成標記本地緩存/持久化(或者存儲redis),每秒提交更新至數(shù)據(jù)庫,消除頻繁消費導(dǎo)致的瓶頸。
## 架構(gòu)示意圖 ##
## 使用demo示例 ##
/// <summary> /// 發(fā)送消息 /// </summary> /// <param name="msg"></param> public void SendMessageDemo(string msg) { //發(fā)送字符串示例 var p = PRoducterPoolHelper.GetPool(new BusinessMQConfig() { ManageConnectString = "server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;PassWord=Xx~!@#;" },//管理中心數(shù)據(jù)庫 "dyd.mytest3");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一標識,要提前告知運維在消息中心注冊,方可使用。 p.SendMessage(@"1"); //發(fā)送對象示例 /* var obj = new message2 { text = "文字", num = 1 }; var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() { ManageConnectString = "server=192.168.17.237;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" },//管理中心數(shù)據(jù)庫 "test.diayadian.obj");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一標識,要提前告知運維在消息中心注冊,方可使用。 p.SendMessage<message>(obj); */ } private ConsumerProvider Consumer; /// <summary> /// 接收消息 /// </summary> /// <param name="action"></param> public void ReceiveMessageDemo(Action<string> action) { if (Consumer == null) { Consumer = new ConsumerProvider(); Consumer.Client = "dyd.mytest3.customer1";//clientid,接收消息的(消費者)唯一標示,一旦注冊以后,不能更改,業(yè)務(wù)下線廢棄后必須要告知運維,刪除消費者注冊。 Consumer.ClientName = "客戶端名稱";//這個相對隨意些,主要是用來自己識別的,要簡短 Consumer.Config = new BusinessMQConfig() { ManageConnectString = "server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" }; Consumer.MaxReceiveMQThread = 1;//并行處理的線程數(shù),一般為1足夠,若消息處理慢,又想并行消費,則考慮 正在使用的分區(qū)=并行處理線程數(shù) 為并行效率極端最優(yōu),但cpu消耗應(yīng)該不小。 Consumer.MQPath = "dyd.mytest3";//接收的隊列要正確 Consumer.PartitionIndexs = new List<int>() { 1, 2, 3,4, 5, 6, 7, 8 };//消費者訂閱的分區(qū)順序號,從1開始 Consumer.RegisterReceiveMQListener<string>((r) => { /* * 這些編寫業(yè)務(wù)代碼 * 編寫的時候要注意考慮,業(yè)務(wù)處理失敗的情況。 * 1.重試失敗n次。 * 2.重試還不行,則標記消息已被處理。然后跳過該消息處理,自己另外文檔記錄這種情況。 * 消息被消費完畢,一定要調(diào)用MarkFinished,標記消息被消費完畢。 */ action.Invoke(r.ObjMsg); r.MarkFinished(); }); } } /// <summary> /// 關(guān)閉消息訂閱連接 /// </summary> public void CloseReceiveMessage() { //注冊消費者消息,消費者務(wù)必要在程序關(guān)閉后關(guān)掉(dispose)。否則導(dǎo)致異常終止,要人工等待連接超時后,方可重新注冊。 if (Consumer != null) { Consumer.Dispose(); Consumer = null; } } }
部分截圖
備注:.net開源的消息隊列很少,特別是針對業(yè)務(wù)的高可靠性的消息隊列;希望這個開源的消息隊列,能夠為.net領(lǐng)域帶來更多解決方案,更多的思路和架構(gòu)設(shè)計;同時也希望了解消息隊列的人能夠給于這個解決方案更多的建議和完善意見。
作者:車江毅
新聞熱點
疑難解答