本文完整代碼,可以瀏覽:
https://github.com/hjj2017/xgame-code_server/blob/master/game_server/src/com/game/gameServer/framework/mina/MsgCumulativeFilter.java
我在網(wǎng)上查閱過的 MINA 黏包處理,一般都是放在 Decoder 中做的。也就是黏包處理和消息解碼放在一起做,顯得比較混亂不好打理。而以下這段代碼,我是把黏包處理放在 Filter 中了。在具體使用時(shí)可以這樣:
1 // 創(chuàng)建 IO 接收器 2 NioSocketAcceptor acceptor = new NioSocketAcceptor(); 3 4 // 獲取責(zé)任鏈 5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 6 // 處理網(wǎng)絡(luò)粘包 7 chain.addLast("msgCumulative", new MsgCumulativeFilter()); 8 9 // 添加自定義編解碼器10 chain.addLast("msgCodec", new PRotocolCodecFilter(11 new XxxEncoder(),12 new XxxDecoder()13 ));14 15 // 獲取會(huì)話配置16 IosessionConfig cfg = acceptor.getSessionConfig();17 18 // 設(shè)置緩沖區(qū)大小19 cfg.setReadBufferSize(4096);20 // 設(shè)置 session 空閑時(shí)間21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10);22 23 // 設(shè)置 IO 句柄24 acceptor.setHandler(new XxxHandler());25 acceptor.setReuseAddress(true);26 27 try {28 // 綁定端口29 acceptor.bind(new InetSocketAddress("127.0.0.1", 4400));30 } catch (Exception ex) {31 // 輸出錯(cuò)誤日志32 System.error.println(ex);33 }
目前 Netty 框架要比 MINA 流行的多,而且 Netty 對網(wǎng)絡(luò)黏包處理也做了很好的處理,不用開發(fā)者自己費(fèi)那么大勁。我也考慮過遷移到 Netty 框架上,不過目前還沒有找到特別充分的理由。閑話不多說了,以下就是黏包處理代碼:
1 package com.game.gameServer.framework.mina; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 5 import org.apache.mina.core.buffer.IoBuffer; 6 import org.apache.mina.core.filterchain.IoFilterAdapter; 7 import org.apache.mina.core.session.IoSession; 8 9 import com.game.gameServer.framework.FrameworkLog; 10 import com.game.gameServer.msg.SpecialMsgSerialUId; 11 import com.game.part.msg.IoBuffUtil; 12 13 /** 14 * 消息粘包處理 15 * 16 * @author hjj2017 17 * @since 2014/3/17 18 * 19 */ 20 class MsgCumulativeFilter extends IoFilterAdapter { 21 /** 22 * 從客戶端接收的消息估計(jì)長度, 23 * {@value} 字節(jié), 24 * 對于從客戶端接收的數(shù)據(jù)來說, 都是簡單的命令! 25 * 很少超過 {@value}B 26 * 27 */ 28 private static final int DECODE_MSG_LEN = 64; 29 /** 容器 Buff 字典 */ 30 private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>(); 31 32 @Override 33 public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception { 34 if (nextFilter == null || 35 sessionObj == null) { 36 // 如果參數(shù)對象為空, 37 // 則直接退出! 38 FrameworkLog.LOG.error("null nextFilter or sessionObj"); 39 return; 40 } 41 42 // 移除容器 Buff 43 removeContainerBuff(sessionObj); 44 // 向下傳遞 45 super.sessionClosed(nextFilter, sessionObj); 46 } 47 48 @Override 49 public void messageReceived( 50 NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception { 51 if (nextFilter == null || 52 sessionObj == null) { 53 // 如果參數(shù)對象為空, 54 // 則直接退出! 55 FrameworkLog.LOG.error("null nextFilter or sessionObj"); 56 return; 57 } 58 59 // 獲取會(huì)話 UId 60 long sessionUId = sessionObj.getId(); 61 62 if (!(msgObj instanceof IoBuffer)) { 63 // 如果消息對象不是 ByteBuff, 64 // 則直接向下傳遞! 65 FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId); 66 super.messageReceived(nextFilter, sessionObj, msgObj); 67 } 68 69 // 獲取輸入 Buff 70 IoBuffer inBuff = (IoBuffer)msgObj; 71 72 if (!inBuff.hasRemaining()) { 73 // 如果沒有剩余內(nèi)容, 74 // 則直接退出! 75 FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId); 76 return; 77 } else if (inBuff.remaining() <= 8) { 78 // 如果 <= 8 字節(jié), 79 // 那還是執(zhí)行粘包處理過程吧 ... 80 // 8 字節(jié) = 消息長度 ( Short ) + 消息類型 ( Short ) + 時(shí)間戳 ( Int ) 81 // 如果比這個(gè)長度都小, 82 // 那肯定不是一條完整消息 ... 83 this.msgRecv_0(nextFilter, sessionObj, inBuff); 84 return; 85 } 86 87 // 獲取消息長度 88 final int msgSize = inBuff.getShort(); 89 inBuff.position(0); 90 91 if (msgSize == inBuff.limit() && 92 containerBuffIsEmpty(sessionObj)) { 93 // 94 // 如果消息長度和極限值剛好相同, 95 // 并且容器 Buff 中沒有任何內(nèi)容 ( 即, 上一次消息沒有粘包 ), 96 // 那么直接向下傳遞! 97 // 98 super.messageReceived( 99 nextFilter, sessionObj, inBuff100 );101 } else {102 // 103 // 如果消息長度和極限值不同, 104 // 則說明是網(wǎng)絡(luò)粘包!105 // 這時(shí)候跳轉(zhuǎn)到粘包處理過程 ...106 // 107 this.msgRecv_0(nextFilter, sessionObj, inBuff);108 }109 }110 111 /**112 * 接收連包消息113 * 114 * @param nextFilter115 * @param sessionObj116 * @param inBuff117 * @throws Exception 118 * 119 */120 private void msgRecv_0(121 NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception {122 if (nextFilter == null || 123 sessionObj == null) {124 // 如果參數(shù)對象為空, 125 // 則直接退出!126 FrameworkLog.LOG.error("null nextFilter or sessionObj");127 return;128 }129 130 // 獲取會(huì)話 UId131 long sessionUId = sessionObj.getId();132 // 獲取容器 Buff133 IoBuffer containerBuff = getContainerBuff(sessionObj);134 135 // 添加新 Buff 到容器 Buff 的末尾136 IoBuffUtil.append(containerBuff, inBuff);137 // 令 position = 0138 containerBuff.position(0);139 140 // // 記錄調(diào)試信息141 // FrameworkLog.LOG.debug("/nin = [ " + inBuff.getHexDump() + " ]");142 143 for (int i = 0; ; i++) {144 // // 記錄調(diào)試信息145 // FrameworkLog.LOG.debug(146 // "i = " + i 147 // + "/nco = [ " + containerBuff.getHexDump() + " ]"148 // + "/nco.pos = " + containerBuff.position() 149 // + "/nco.lim = " + containerBuff.limit()150 // );151 152 if (containerBuff.remaining() < 4) {153 // 154 // 如果剩余字節(jié)數(shù) < 4, 155 // 這樣根本無法識(shí)別出消息類型 msgSerialUId ...156 // 直接退出!157 // 在退出前, 158 // 準(zhǔn)備好接收下一次消息!159 // 160 IoBuffUtil.readyToNext(containerBuff);161 return;162 }163 164 // 獲取原始位置165 final int oldPos = containerBuff.position();166 // 獲取消息長度和類型167 final int msgSize = containerBuff.getShort();168 final int msgSerialUId = containerBuff.getShort();169 170 // // 記錄調(diào)試信息171 // FrameworkLog.LOG.debug(172 // "i = " + i 173 // + "/nmsgSize = " + msgSize174 // + "/nmsgSerialUId = " + msgSerialUId175 // );176 177 // 還原原始位置178 containerBuff.position(oldPos);179 180 if (msgSerialUId == SpecialMsgSerialUId.CG_Flash_POLICY || 181 msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) {182 // 183 // 如果是 Flash 安全策略消息, 184 // 或者是騰訊網(wǎng)關(guān)消息, 185 // 則嘗試找一下 0 字節(jié)的位置 ...186 // 187 int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0);188 189 if (pos0 <= -1) {190 // 如果找不到 0 字節(jié)的位置, 191 // 則說明消息還沒接收完, 192 // 準(zhǔn)備接受下次消息并直接退出!193 IoBuffUtil.readyToNext(containerBuff);194 return;195 }196 197 // 復(fù)制 Buff 內(nèi)容198 containerBuff.position(0);199 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0);200 201 // 更新 Buff 位置202 final int newPos = containerBuff.position() + pos0;203 containerBuff.position(newPos);204 // 壓縮容器 Buff205 IoBuffUtil.compact(containerBuff);206 207 // 向下傳遞208 super.messageReceived(209 nextFilter, sessionObj, realBuff210 );211 continue;212 }213 214 if (msgSize <= 0) {215 // 216 // 如果消息長度 <= 0, 217 // 則直接退出!218 // 這種情況可能是消息已經(jīng)亂套了 ...219 // 還是重新來過吧!220 // 221 FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId);222 // 將容器 Buff 內(nèi)容清空223 containerBuff.position(0);224 containerBuff.flip();225 // 壓縮容器 Buff226 IoBuffUtil.compact(containerBuff);227 return;228 }229 230 if (containerBuff.remaining() < msgSize) {231 // 232 // 如果消息長度不夠, 233 // 則可能是出現(xiàn)網(wǎng)絡(luò)粘包情況了 ...234 // 直接退出就可以了!235 // 236 FrameworkLog.LOG.warn(237 "i = " + i238 + ", msgSize = " + msgSize 239 + ", containerBuff.remaining = " + containerBuff.remaining()240 + ", sessionUId = " + sessionUId241 );242 243 // 準(zhǔn)備接受下一次消息244 IoBuffUtil.readyToNext(containerBuff);245 return;246 }247 248 // 創(chuàng)建新 Buff 并復(fù)制字節(jié)內(nèi)容249 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize);250 251 if (realBuff == null) {252 // 253 // 如果真實(shí)的 Buff 為空, 254 // 則直接退出!255 // 這種情況可能也是消息亂套了 ...256 // 記錄一下錯(cuò)誤信息257 // 258 FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId);259 } else {260 // // 記錄調(diào)試信息261 // FrameworkLog.LOG.debug(262 // "i = " + i263 // + "/nreal = [ " + realBuff.getHexDump() + " ]"264 // + "/nreal.pos = " + realBuff.position()265 // + "/nreal.lim = " + realBuff.limit()266 // );267 268 // 向下傳遞269 super.messageReceived(270 nextFilter, sessionObj, realBuff271 );272 }273 274 // 更新位置275 containerBuff.position(containerBuff.position() + msgSize);276 // 壓縮容器 Buff277 IoBuffUtil.compact(containerBuff);278 }279 }280 281 /**282 * 獲取玩家的 Buff, 如果為空則新建一個(gè)!283 * 284 * @param sessionObj285 * @return 286 * 287 */288 private static IoBuffer getContainerBuff(IoSession sessionObj) {289 if (sessionObj == null) {290 // 如果參數(shù)對象為空, 291 // 則直接退出!292 return null;293 }294 295 // 獲取會(huì)話 UId296 long sessionUId = sessionObj.getId();297 // 獲取容器 Buff298 IoBuffer containerBuff = _containerBuffMap.get(sessionUId);299 300 if (containerBuff == null) {301 // 創(chuàng)建緩存 Buff302 containerBuff = IoBuffer.allocate(DECODE_MSG_LEN);303 containerBuff.setAutoExpand(true);304 containerBuff.setAutoShrink(true);305 containerBuff.position(0);306 containerBuff.flip();307 // 緩存 Buff 對象308 Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff);309 310 if (oldVal != null) {311 FrameworkLog.LOG.warn("exists oldVal");312 }313 }314 315 return containerBuff;316 }317 318 /**319 * 移除容器 Buff320 * 321 * @param sessionObj322 * 323 */324 private static void removeContainerBuff(IoSession sessionObj) {325 if (sessionObj == null) {326 // 如果參數(shù)對象為空, 327 // 則直接退出!328 return;329 }330 331 // 獲取會(huì)話 UId332 long sessionUId = sessionObj.getId();333 // 獲取容器 Buff334 IoBuffer containerBuff = _containerBuffMap.get(sessionUId);335 336 if (containerBuff != null) {337 // 是否所占資源338 containerBuff.clear();339 }340 341 // 移除玩家的 Buff 對象342 _containerBuffMap.remove(sessionUId);343 }344 345 /**346 * 容器 Buff 為空 ?347 * 348 * @param sessionObj349 * @return 350 * 351 */352 private static boolean containerBuffIsEmpty(IoSession sessionObj) {353 if (sessionObj == null) {354 // 如果參數(shù)對象為空, 355 // 則直接退出!356 return false;357 }358 359 // 獲取容器 Buff360 IoBuffer containerBuff = getContainerBuff(sessionObj);361 362 if (containerBuff == null) {363 // 如果容器為空, 364 // 則直接退出!365 FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId());366 return false;367 } else {368 // 如果當(dāng)前位置和極限值都為 0, 369 // 則判定為空!370 return (containerBuff.position() == 0 371 && containerBuff.limit() == 0);372 }373 }374 }
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注