最近特別忙,博客久未更新。
回顧了一下2010-2011年的一些.Net項目代碼,覺得對初學者可能有一定參考作用,這里share一下。
主要包括:
一般的Web反向代理大家很熟悉了,主要是通過在客戶端和服務端之間架設一層代理服務器,轉發客戶端的請求至服務端或數據庫,并將結果回復給客戶端。
其特點主要有:
1、緩存一些數據庫I/O過重、卻更新不頻繁的數據,或者靜態數據,如文件、圖片等。
2、隔離客戶端(公網)和服務端(windows服務、Web服務、文件服務),僅將反向代理服務器的ip、名稱、host和端口等暴露給公網。
3、基于第2點,其應該是輕量的、可隨時重啟的,這在服務端自身所在的服務器重啟代價較高或不能忍受重啟的條件下,極為有用。
比如服務端本身需要處理大量業務邏輯,可能涉及重計算(cpu和內存要求高)、重I/O(磁盤和網絡要求高)或者混合類型,那么服務端的機器成本就很高,因為需要更強力的cpu,更大容量的內存,和更快的磁盤和網絡,如果還需要考慮DDOS和CC防御能力,服務端的機器成本將急劇上升。
此時,可考慮反向代理技術,選擇帶硬防、配置比服務端低很多的廉價機器,來作為反向代理服務器(比如阿x云的云主機,帶5G硬防,但其非SSD的云磁盤I/O能力很差,此時不能作為業務服務端的宿主機器,但可以作為反向代理服務器),來組成反向代理分布集群。
DDOS攻擊,流量需要聚合到一個峰值,才會打死帶防機器,而根據DDOS攻擊者所具備的流量打壓機器和網絡條件的不同,這通常需要一段時間,通過反向代理分布集群,一臺反向代理被打死,死亡或黑洞窗口通常在半小時至數小時內,如果能保證有相對充裕的反向代理儲備,使得整個集群陣亡前,能有跳出黑洞復生的代理機重新加入集群為客戶端提供服務,那么就可以形成對抗。即使儲備不足,至少可以為受到攻擊時的決策贏得更多時間。
綜上所述,反向代理技術通過增加額外的網絡傳輸時間,卻獲得了很多客戶端與服務端直接連接所不具備的優勢。
通常的web應用服務器,如nginx都可提供反向代理能力。
但tcp級別的反向代理還是比較少的。
當時的項目倒逼出了這么一個需求,其中用到一些基礎組件如下。
如果客戶端使用tcp協議和反向代理服務器通訊,比如常見的桌面客戶端,那么可以考慮單個長連接 + 異步的方式連接至代理服務器。
而多臺代理服務器和真正的業務服務端之間,由于代理和服務端之間多為同步通訊,為了效率,可考慮使用多連接 + 池化的技術,讓連接介于長、短之間,綜合兩者的長處。
下面給出項目中真實使用過的連接池代碼,實現中參考了當時MongoDB的C#驅動部分:
/// <summary> /// 連接池 /// 特性及更新: /// 1:從單個移除不可用連接,變為批量移除 /// 2:移除連接不再防止,暴露重連風暴風險 /// 目的:盡快盡多發現不可用連接,防止請求失敗 /// 考慮:一般只開放200個連接,沒什么大問題. /// 5:增大排隊線程數和排隊超時時間,考慮:網絡抖動和業務層慢操作 /// 6:增大連接最大存活和空閑時間,考慮:網絡抖動和業務層慢操作 /// 7:盡最大可能負載請求并減輕 某一瞬間 傳遞給主力的請求和連接數目 /// </summary> public class sessionPool { PRivate object _poolLock = new object(); public int PoolSize { get; set; } public IList<SyncTcpSession> AvaliableSessions { get { return _avaliableSessions; } } public ILog Logger; private int _waitQueueSize; private bool _inMaintainPoolSize; private bool _inEnsureMinConnectionPoolSizeWorkItem; private IList<SyncTcpSession> _avaliableSessions = new List<SyncTcpSession>(); public int MaxWaitQueueSize { get; set; } public int MaxConnectionPoolSize { get; set; } public int MinConnectionPoolSize { get; set; } public TimeSpan WaitQueueTimeout { get; set; } /// <summary> /// 連接最大存活時間(分) /// </summary> public TimeSpan MaxConnectionLifeTime { get; set; } /// <summary> /// 連接最大空閑時間(秒) /// </summary> public TimeSpan MaxConnectionIdleTime { get; set; } public IPEndPoint RemoteAddr { get; set; } public SessionPool(ILog log) { Logger = log; } /// <summary> /// 獲取可用連接 /// </summary> /// <returns></returns> public SyncTcpSession GetAvaliableSession() { lock (_poolLock) { //等待獲取連接的線程發生了嚴重積壓 //說明連接數量不足以應付業務,或者業務層處理積壓 //考慮優化業務層和數據庫或者增加超時時間 if (_waitQueueSize >= MaxWaitQueueSize) { var ex = new Exception("等待獲取連接的線程數過多!"); Logger.Error(ex.Message, ex); return null; } _waitQueueSize += 1; try { DateTime timeoutAt = DateTime.Now + WaitQueueTimeout; while (true) { //有可用連接 if (_avaliableSessions.Count > 0) { //先嘗試找到已經打開過的連接 for (int i = _avaliableSessions.Count - 1; i >= 0; i--) { if (_avaliableSessions[i].State == SessionState.Open) { var connection = _avaliableSessions[i]; _avaliableSessions.RemoveAt(i); return connection; } } //否則去掉最近最少使用的連接,并返回新連接 AvaliableSessions[0].Close(); AvaliableSessions.RemoveAt(0); return new SyncTcpSession(this); } //無可用連接,新建連接 if (PoolSize < MaxConnectionPoolSize) { var connection = new SyncTcpSession(this); PoolSize += 1; return connection; } //不能創建新的連接也沒有可用連接,等待連接被回收. var timeRemaining = timeoutAt - DateTime.Now; if (timeRemaining > TimeSpan.Zero) { Monitor.Wait(_poolLock, timeRemaining); } else { //等待超時,說明連接數量不足以應付業務,或者業務層處理積壓,考慮優化業務層和數據庫或者增加超時時間 var ex = new TimeoutException("等待SyncTcpSession已超時."); Logger.Error(ex.Message, ex); } } } finally { _waitQueueSize -= 1; } } } /// <summary> /// 清空連接池 /// </summary> public void Clear() { lock (_poolLock) { foreach (var connection in AvaliableSessions) { connection.Close(); } AvaliableSessions.Clear(); PoolSize = 0; Monitor.Pulse(_poolLock); Logger.Info("連接池已清空."); } } /// <summary> /// 維護連接池的連接數量 /// </summary> public void MaintainPoolSize() { if (_inMaintainPoolSize) { return; } _inMaintainPoolSize = true; try { IList<SyncTcpSession> connectionsToRemove = new List<SyncTcpSession>(); lock (_poolLock) { var now = DateTime.Now; //已改為:移除全部不可用連接,暴露連接風暴風險,但考慮實際業務連接很少閑置,連接風暴風險較小 for (int i = AvaliableSessions.Count - 1; i >= 0; i--) { var connection = AvaliableSessions[i]; if (now > connection.CreatedAt + MaxConnectionLifeTime || now > connection.LastUsedAt + MaxConnectionIdleTime || connection.IsConnected() == false) { //超過最大生命、閑置時間或未連接則關閉 //加入刪除集合 connectionsToRemove.Add(connection); //從可用連接中移除 AvaliableSessions.RemoveAt(i); } } // } } //在鎖外移除 if (connectionsToRemove.Any()) { int i = 0; foreach (var connToRemove in connectionsToRemove) { i++; RemoveConnection(connToRemove); } Logger.InfoFormat("批量移除連接:數量{0}.", i); } if (PoolSize < MinConnectionPoolSize) { ThreadPool.QueueUserWorkItem(EnsureMinConnectionPoolSizeWorkItem,null); } } finally { _inMaintainPoolSize = false; } } private void EnsureMinConnectionPoolSizeWorkItem(object state) { if (_inEnsureMinConnectionPoolSizeWorkItem) { return; } _inEnsureMinConnectionPoolSizeWorkItem = true; try { while (true) { lock (_poolLock) { if (PoolSize >= MinConnectionPoolSize) { return; } } var connection = new SyncTcpSession(this); try { var added = false; lock (_poolLock) { if (PoolSize < MaxConnectionPoolSize) { AvaliableSessions.Add(connection); PoolSize++; added = true; Monitor.Pulse(_poolLock); } } if (!added) { connection.Close(); } } catch { Thread.Sleep(TimeSpan.FromSeconds(1)); } } } catch { } finally { _inEnsureMinConnectionPoolSizeWorkItem = false; } } /// <summary> /// 回收連接 /// </summary> /// <param name="connection"></param> public void ReleaseConnection(SyncTcpSession connection) { //每次都關閉連接,則退化為短連接 // RemoveConnection(connection); // return; if (connection == null) return; if (connection.SessionPool != this) { connection.Close(); Logger.Info("連接不屬于此連接池."); } if (connection.State != SessionState.Open) { RemoveConnection(connection); Logger.Info("移除連接:連接已關閉."); return; } if (DateTime.Now - connection.CreatedAt > MaxConnectionLifeTime) { RemoveConnection(connection); Logger.Info("移除連接:超過最大存活時間."); return; } lock (_poolLock) { connection.LastUsedAt = DateTime.Now; AvaliableSessions.Add(connection); Monitor.Pulse(_poolLock); } } /// <summary> /// 移除并關閉連接 /// </summary> /// <param name="connection"></param> private void RemoveConnection(SyncTcpSession connection) { lock (_poolLock) { AvaliableSessions.Remove(connection); PoolSize -= 1; Monitor.Pulse(_poolLock); } connection.Close(); }
上面的代碼中,使用到的同步連接SyncTcpSession,主要用于服務器間的同步通訊,其保證的特性有:
代碼如下:
/// <summary> /// 同步會話對象 /// </summary> public class SyncTcpSession { private readonly object _instanceLock = new object(); private readonly ILog _logger; private readonly IPEndPoint _remoteAddr; private readonly SessionPool _sessionPool; private Socket _socket; private SessionState _state = SessionState.Initial; public SessionPool SessionPool { get { return _sessionPool; } } public SyncTcpSession(SessionPool pool) { _logger = pool.Logger; _sessionPool = pool; _remoteAddr = _sessionPool.RemoteAddr; CreatedAt = DateTime.Now; _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) {SendTimeout = 90000, ReceiveTimeout = 180000}; } public DateTime CreatedAt { get; set; } public DateTime LastUsedAt { get; set; } public SessionState State { get { return _state; } } public void Connect() { _socket.Connect(_remoteAddr); LastUsedAt = DateTime.Now; _state = SessionState.Open; } public void Close() { lock (_instanceLock) { try { if (_socket == null || _state == SessionState.Closed) return; if (_socket.Connected) { try { _socket.Shutdown(SocketShutdown.Both); } finally { _socket.Close(); _socket.Dispose(); _socket = null; } _state = SessionState.Closed; } } catch { } // _logger.Info("session is closed"); } } /// <summary> /// 獲取當前是否連接 /// </summary> /// <returns></returns> public bool IsConnected() { if(_state!=SessionState.Open) { return false; } byte[] tmp = new byte[1]; try { _socket.Blocking = false; _socket.Send(tmp, 0, 0); _socket.Blocking = true; return true; }catch(SocketException ex) { _logger.Error("[Not Connected]"); return false; } } public bool Send(byte[] sentBytes) { lock (_instanceLock) { LastUsedAt = DateTime.Now; if (_state == SessionState.Initial) { //如果沒有連接成功,返回失敗 if(!Open()) { _state = SessionState.Closed; return false; } } //如果當前沒有連接,返回失敗 if(!IsConnected()) { _state = SessionState.Closed; return false; } //此時連接可能已經關閉了 int allLen = sentBytes.Length; try { while (allLen > 0) { //發送數據到緩沖區,不保證立即在網絡中傳遞,可能會發送超時 int sent = _socket.Send(sentBytes, 0, sentBytes.Length, SocketFlags.None); allLen -= sent; } LastUsedAt = DateTime.Now; return true; } catch (SocketException ex) { //如果出現錯誤,返回失敗 _logger.Error(string.Format("[Send Failed] {0}", ex.Message), ex); _state = SessionState.Closed; return false; } } } public byte[] Receive(out bool successful) { successful = false; const int headerLen = 8; byte[] ret = null; bool foundHeader = false; LastUsedAt = DateTime.Now; if (_socket == null || !_socket.Connected) return null; lock (_instanceLock) { // 部署環境,內網比較穩定,簡化包解析 var buffer = new byte[16*1024]; int remaining = -1; using (var ms = new MemoryStream()) { try { while (remaining != 0) { int allLen = _socket.Receive(buffer, 0, buffer.Length, SocketFlags.None); if (!foundHeader) { if (allLen >= headerLen) { foundHeader = true; int bodyLen = (buffer[4] << 24) + (buffer[5] << 16) + (buffer[6] << 8) + buffer[7]; remaining = (int) (headerLen + bodyLen - ms.Length); } } ms.Write(buffer, 0, allLen); if (foundHeader) { remaining -= allLen; } } ret = new byte[ms.Length]; ms.Position = 0; ms.Read(ret, 0, ret.Length); LastUsedAt = DateTime.Now; successful = true; return ret; } catch (Exception ex) { successful = false; _state = SessionState.Closed; _logger.Error(string.Format("[Recv Failed] {0}", ex.Message), ex); } } } return ret; } public bool Open() { try { Connect(); return true; } catch (Exception ex) { _state = SessionState.Closed; _logger.Error(string.Format("[Open Failed] {0}", ex.Message), ex); return false; } } public bool SendRequest(byte[] package) { return Send(package); } public byte[] GetBody(byte[] data) { if (data == null || data.Length < 1) { return null; } if (data.Length < 8) { _logger.Error("接收到的數據包長度不足"); return null; } int bodyLen = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) + data[7]; var body = new byte[bodyLen]; if (bodyLen + 8 != data.Length) { _logger.ErrorFormat("包長有誤:totalLen:({0}),bodyLen:{1}", data.Length, body.Length); return null; } Buffer.BlockCopy(data, 8, body, 0, bodyLen); return body; } }
為了提供分片發送數據的能力,同時避免粘包問題,必須要進行包解析,完備版的包解析代碼提供如下特性:
代碼如下:
/// <summary> /// 包解析器 /// </summary> public class PackageAnalyzer { public PackageAnalyzer(int headerLen) { _headerLen = headerLen; _header = new byte[_headerLen]; } /// <summary> /// 包頭長度 /// </summary> private readonly int _headerLen; /// <summary> /// 包頭緩沖 /// </summary> private readonly byte[] _header; /// <summary> /// 還差多少字節組成一個完整包 /// </summary> private int _requiredDataLength; /// <summary> /// 包頭協議標識字節數組中已收到的字節數 /// </summary> private int _receivedHeaderLength; /// <summary> /// 包頭獲取狀態 /// </summary> private Header _headerFlag; /// <summary> /// 包頭狀態 /// </summary> private enum Header { NotFound, Found, PartialFound } /// <summary> /// 包頭是否已保存 /// </summary> private bool _headerWritten; /// <summary> /// 包存儲器 /// </summary> public BufferWriter Writer { get; set; } /// <summary> /// 是否允許變長存儲 /// </summary> public bool EnabledVariant { get; set; } /// <summary> /// 包分析成功時的處理委托 /// </summary> /// <param name="requestInfo"></param> public delegate void OnAnalyzeSuccess(BinaryResponseInfo requestInfo); /// <summary> /// 從字節流分析包頭,并獲取完整包 /// </summary> /// <param name="data">字節流</param> /// <param name="offset">偏移</param> /// <param name="total">總字節數</param> /// <param name="onAnalyzeSuccessCallback">分包成功時回調</param> public void TryProcess(byte[] data, int offset, int total, OnAnalyzeSuccess onAnalyzeSuccessCallback) { while (total > 0) { //還沒有獲取頭部 if (_headerFlag == Header.NotFound) { //剩余 if (total >= _headerLen) { //獲取完整頭部 _headerFlag = Header.Found; Array.Copy(data, offset, _header, 0, _headerLen); offset += _headerLen; total -= _headerLen; //獲取數據長度 _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7]; _receivedHeaderLength = 0; //繼續處理 } //不足 else { Array.Copy(data, offset, _header, 0, total); _receivedHeaderLength += total; _headerFlag = Header.PartialFound; break; } } //已獲取頭部 if (_headerFlag == Header.Found) { //可以獲取完整數據 if (total >= _requiredDataLength) { //保存數據 //還未寫入頭部 if (!_headerWritten) { Writer = new BufferWriter(System.Text.Encoding.UTF8); _headerWritten = true; Writer.Write(_header, 0, _headerLen); } Writer.Write(data, offset, _requiredDataLength); offset += _requiredDataLength; total -= _requiredDataLength; //重置全部狀態 _requiredDataLength = 0; _receivedHeaderLength = 0; _headerFlag = Header.NotFound; _headerWritten = false; //獲得了完整的包,開始讀取消息 ////////////////////////////////////////////////////// var reader = new BufferReader(System.Text.Encoding.UTF8, Writer) { EnabledVariant = EnabledVariant }; BinaryResponseInfo responseInfo; MessageRead(reader, out responseInfo); if (responseInfo != null) if (onAnalyzeSuccessCallback != null) onAnalyzeSuccessCallback(responseInfo); ////////////////////////////////////////////////////// } //不能獲取完整數據 else { //保存數據 //還未寫入頭部 if (!_headerWritten) { Writer = new BufferWriter(System.Text.Encoding.UTF8); _headerWritten = true; Writer.Write(_header, 0, _headerLen); } //寫入數據 Writer.Write(data, offset, total); _requiredDataLength -= total; break; } } //部分獲取頭部 if (_headerFlag == Header.PartialFound) { //不能獲取完整頭部 if (total + _receivedHeaderLength < _headerLen) { Array.Copy(data, offset, _header, _receivedHeaderLength, total); _receivedHeaderLength += total; break; } //可以獲取完整頭部 else { _headerFlag = Header.Found; var delta = _headerLen - _receivedHeaderLength; Array.Copy(data, offset, _header, _receivedHeaderLength, delta); total -= delta; offset += delta; //獲取數據長度 _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7]; _receivedHeaderLength = 0; //繼續處理 } } } } /// <summary> /// 從包頭獲取數據長度 /// </summary> /// <param name="buffer">包頭</param> /// <param name="offset">偏移</param> /// <param name="length">長度</param> /// <returns></returns> public virtual int GetDataLengthFromHeader(byte[] buffer, int offset, int length) { //第5、6、7、8個字節標識長度 return (buffer[offset + 4] << 24) + (buffer[offset + 5] << 16) + (buffer[offset + 6] << 8) + (buffer[offset + 7]); } /// <summary> /// 獲取消息實體 /// </summary> /// <param name="reader">數據流讀取器</param> /// <param name="requestInfo">請求信息</param> private void MessageRead(BufferReader reader, out BinaryResponseInfo requestInfo) { var package = reader.ToBytes(); //服務標識(調用業務端的哪個服務) var serviceKey = System.Text.Encoding.UTF8.GetString(package, 0, 4); var bodyLen = (package[4] << 24) + (package[5] << 16) + (package[6] << 8) + (package[7]); // System.Diagnostics.Debug.Assert(bodyLen > 0, "包數據為空"); var body = new byte[bodyLen]; Buffer.BlockCopy(package, _headerLen, body, 0, bodyLen); requestInfo = new BinaryResponseInfo() { Key = serviceKey, Body = body }; } }
由于歷史原因,一些命名不是很恰當,代碼結構也不是很好,比如上面這個類,叫做Parser可能更恰當。
希望對初學網絡編程的朋友有所幫助。
新聞熱點
疑難解答