麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

Mina、Netty、Twisted一起學(xué)(五):整合protobuf

2019-11-14 21:01:54
字體:
供稿:網(wǎng)友
Mina、Netty、Twisted一起學(xué)(五):整合PRotobuf

protobuf是谷歌的Protocol Buffers的簡稱,用于結(jié)構(gòu)化數(shù)據(jù)和字節(jié)碼之間互相轉(zhuǎn)換(序列化、反序列化),一般應(yīng)用于網(wǎng)絡(luò)傳輸,可支持多種編程語言。

protobuf如何使用這里不再介紹,本文主要介紹在MINA、Netty、Twisted中如何使用protobuf,不了解protobuf的同學(xué)可以去參考我的另一篇博文。

在前面的一篇博文中,有介紹到一種用一個固定為4字節(jié)的前綴Header來指定Body的字節(jié)數(shù)的一種消息分割方式,在這里同樣要使用到。只是其中Body的內(nèi)容不再是字符串,而是protobuf字節(jié)碼。

在處理業(yè)務(wù)邏輯時,肯定不希望還要對數(shù)據(jù)進(jìn)行序列化和反序列化,而是希望直接操作一個對象,那么就需要有相應(yīng)的編碼器和解碼器,將序列化和反序列化的邏輯寫在編碼器和解碼器中。有關(guān)編碼器和解碼器的實(shí)現(xiàn),上一篇博文中有介紹。

Netty包中已經(jīng)自帶針對protobuf的編碼器和解碼器,那么就不用再自己去實(shí)現(xiàn)了。而MINA、Twisted還需要自己去實(shí)現(xiàn)protobuf的編碼器和解碼器。

這里定義一個protobuf數(shù)據(jù)結(jié)構(gòu),用于描述一個學(xué)生的信息,保存為StudentMsg.proto文件:

message Student {      // ID      required int32 id = 1;          // 姓名      required string name = 2;        // email      optional string email = 3;        // 朋友      repeated string friends = 4;  }  

用StudentMsg.proto分別生成java和Python代碼,將代碼加入到相應(yīng)的項目中。生成的代碼就不再貼上來了。

下面分別介紹在Netty、MINA、Twisted如何使用protobuf來傳輸Student信息。

Netty:

Netty自帶protobuf的編碼器和解碼器,分別是ProtobufEncoder和ProtobufDecoder。需要注意的是,ProtobufEncoder和ProtobufDecoder只負(fù)責(zé)protobuf的序列化和反序列化,而處理消息Header前綴和消息分割的還需要LengthFieldBasedFrameDecoder和LengthFieldPrepender。LengthFieldBasedFrameDecoder即用于解析消息Header前綴,根據(jù)Header中指定的Body字節(jié)數(shù)截取Body,LengthFieldPrepender用于在wirte消息時在消息前面添加一個Header前綴來指定Body字節(jié)數(shù)。

public class TcpServer {        public static void main(String[] args) throws InterruptedException {          EventLoopGroup bossGroup = new NioEventLoopGroup();          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap b = new ServerBootstrap();              b.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          public void initChannel(SocketChannel ch)                                  throws Exception {                              ChannelPipeline pipeline = ch.pipeline();                                  // 負(fù)責(zé)通過4字節(jié)Header指定的Body長度將消息切割                              pipeline.addLast("frameDecoder",                                       new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));                                                            // 負(fù)責(zé)將frameDecoder處理后的完整的一條消息的protobuf字節(jié)碼轉(zhuǎn)成Student對象                              pipeline.addLast("protobufDecoder",                                      new ProtobufDecoder(StudentMsg.Student.getDefaultInstance()));                                // 負(fù)責(zé)將寫入的字節(jié)碼加上4字節(jié)Header前綴來指定Body長度                              pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));                                                            // 負(fù)責(zé)將Student對象轉(zhuǎn)成protobuf字節(jié)碼                              pipeline.addLast("protobufEncoder", new ProtobufEncoder());                                pipeline.addLast(new TcpServerHandler());                          }                      });              ChannelFuture f = b.bind(8080).sync();              f.channel().closeFuture().sync();          } finally {              workerGroup.shutdownGracefully();              bossGroup.shutdownGracefully();          }      }  } 

處理事件時,接收和發(fā)送的參數(shù)直接就是Student對象:

public class TcpServerHandler extends ChannelInboundHandlerAdapter {        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) {                    // 讀取客戶端傳過來的Student對象          StudentMsg.Student student = (StudentMsg.Student) msg;          System.out.println("ID:" + student.getId());          System.out.println("Name:" + student.getName());          System.out.println("Email:" + student.getEmail());          System.out.println("Friends:");          List<String> friends = student.getFriendsList();          for(String friend : friends) {              System.out.println(friend);          }            // 新建一個Student對象傳到客戶端          StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();          builder.setId(9);          builder.setName("服務(wù)器");          builder.setEmail("[email protected]");          builder.addFriends("X");          builder.addFriends("Y");          StudentMsg.Student student2 = builder.build();          ctx.writeAndFlush(student2);      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          cause.printStackTrace();          ctx.close();      }  }  

MINA:

在MINA中沒有針對protobuf的編碼器和解碼器,但是可以自己實(shí)現(xiàn)一個功能和Netty一樣的編碼器和解碼器。

編碼器:

public class MinaProtobufEncoder extends ProtocolEncoderAdapter {        @Override      public void encode(Iosession session, Object message,              ProtocolEncoderOutput out) throws Exception {            StudentMsg.Student student = (StudentMsg.Student) message;          byte[] bytes = student.toByteArray(); // Student對象轉(zhuǎn)為protobuf字節(jié)碼          int length = bytes.length;                    IoBuffer buffer = IoBuffer.allocate(length + 4);          buffer.putInt(length); // write header          buffer.put(bytes); // write body          buffer.flip();          out.write(buffer);      }  }  

解碼器:

public class MinaProtobufDecoder extends CumulativeProtocolDecoder {        @Override      protected boolean doDecode(IoSession session, IoBuffer in,              ProtocolDecoderOutput out) throws Exception {            // 如果沒有接收完Header部分(4字節(jié)),直接返回false          if (in.remaining() < 4) {              return false;          } else {                // 標(biāo)記開始位置,如果一條消息沒傳輸完成則返回到這個位置              in.mark();                // 讀取header部分,獲取body長度              int bodyLength = in.getInt();                // 如果body沒有接收完整,直接返回false              if (in.remaining() < bodyLength) {                  in.reset(); // IoBuffer position回到原來標(biāo)記的地方                  return false;              } else {                  byte[] bodyBytes = new byte[bodyLength];                  in.get(bodyBytes); // 讀取body部分                  StudentMsg.Student student = StudentMsg.Student.parseFrom(bodyBytes); // 將body中protobuf字節(jié)碼轉(zhuǎn)成Student對象                  out.write(student); // 解析出一條消息                  return true;              }          }      }  }  

MINA服務(wù)器加入protobuf的編碼器和解碼器:

public class TcpServer {        public static void main(String[] args) throws IOException {          IoAcceptor acceptor = new NioSocketAcceptor();            // 指定protobuf的編碼器和解碼器          acceptor.getFilterChain().addLast("codec",                  new ProtocolCodecFilter(new MinaProtobufEncoder(), new MinaProtobufDecoder()));            acceptor.setHandler(new TcpServerHandle());          acceptor.bind(new InetSocketAddress(8080));      }  }  

這樣,在處理業(yè)務(wù)邏輯時,就和Netty一樣了:

public class TcpServerHandle extends IoHandlerAdapter {        @Override      public void exceptionCaught(IoSession session, Throwable cause)              throws Exception {          cause.printStackTrace();      }            @Override      public void messageReceived(IoSession session, Object message)              throws Exception {            // 讀取客戶端傳過來的Student對象          StudentMsg.Student student = (StudentMsg.Student) message;          System.out.println("ID:" + student.getId());          System.out.println("Name:" + student.getName());          System.out.println("Email:" + student.getEmail());          System.out.println("Friends:");          List<String> friends = student.getFriendsList();          for(String friend : friends) {              System.out.println(friend);          }            // 新建一個Student對象傳到客戶端          StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();          builder.setId(9);          builder.setName("服務(wù)器");          builder.setEmail("[email protected]");          builder.addFriends("X");          builder.addFriends("Y");          StudentMsg.Student student2 = builder.build();          session.write(student2);      }  }  

Twisted:

在Twisted中,首先定義一個ProtobufProtocol類,繼承Protocol類,充當(dāng)編碼器和解碼器。處理業(yè)務(wù)邏輯的TcpServerHandle類再繼承ProtobufProtocol類,調(diào)用或重寫ProtobufProtocol提供的方法。

# -*- coding:utf-8 –*-    from struct import pack, unpack  from twisted.internet.protocol import Factory  from twisted.internet.protocol import Protocol  from twisted.internet import reactor  import StudentMsg_pb2    # protobuf編碼、解碼器  class ProtobufProtocol(Protocol):        # 用于暫時存放接收到的數(shù)據(jù)      _buffer = b""        def dataReceived(self, data):          # 上次未處理的數(shù)據(jù)加上本次接收到的數(shù)據(jù)          self._buffer = self._buffer + data          # 一直循環(huán)直到新的消息沒有接收完整          while True:              # 如果header接收完整              if len(self._buffer) >= 4:                  # header部分,按大字節(jié)序轉(zhuǎn)int,獲取body長度                  length, = unpack(">I", self._buffer[0:4])                  # 如果body接收完整                  if len(self._buffer) >= 4 + length:                      # body部分,protobuf字節(jié)碼                      packet = self._buffer[4:4 + length]                                            # protobuf字節(jié)碼轉(zhuǎn)成Student對象                      student = StudentMsg_pb2.Student()                      student.ParseFromString(packet)                                            # 調(diào)用protobufReceived傳入Student對象                      self.protobufReceived(student)                                            # 去掉_buffer中已經(jīng)處理的消息部分                      self._buffer = self._buffer[4 + length:]                  else:                      break;              else:                  break;        def protobufReceived(self, student):          raise NotImplementedError        def sendProtobuf(self, student):          # Student對象轉(zhuǎn)為protobuf字節(jié)碼          data = student.SerializeToString()          # 添加Header前綴指定protobuf字節(jié)碼長度          self.transport.write(pack(">I", len(data)) + data)    # 邏輯代碼  class TcpServerHandle(ProtobufProtocol):        # 實(shí)現(xiàn)ProtobufProtocol提供的protobufReceived      def protobufReceived(self, student):            # 將接收到的Student輸出          print 'ID:' + str(student.id)          print 'Name:' + student.name          print 'Email:' + student.email          print 'Friends:'          for friend in student.friends:              print friend            # 創(chuàng)建一個Student并發(fā)送給客戶端          student2 = StudentMsg_pb2.Student()          student2.id = 9          student2.name = '服務(wù)器'.decode('UTF-8') # 中文需要轉(zhuǎn)成UTF-8字符串          student2.email = '[email protected]'          student2.friends.append('X')          student2.friends.append('Y')          self.sendProtobuf(student2)    factory = Factory()  factory.protocol = TcpServerHandle  reactor.listenTCP(8080, factory)  reactor.run()  

下面是Java編寫的一個客戶端測試程序:

public class TcpClient {        public static void main(String[] args) throws IOException {            Socket socket = null;          DataOutputStream out = null;          DataInputStream in = null;                    try {                socket = new Socket("localhost", 8080);              out = new DataOutputStream(socket.getOutputStream());              in = new DataInputStream(socket.getInputStream());                            // 創(chuàng)建一個Student傳給服務(wù)器              StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();              builder.setId(1);              builder.setName("客戶端");              builder.setEmail("[email protected]");              builder.addFriends("A");              builder.addFriends("B");              StudentMsg.Student student = builder.build();              byte[] outputBytes = student.toByteArray(); // Student轉(zhuǎn)成字節(jié)碼              out.writeInt(outputBytes.length); // write header              out.write(outputBytes); // write body              out.flush();                            // 獲取服務(wù)器傳過來的Student              int bodyLength = in.readInt();  // read header              byte[] bodyBytes = new byte[bodyLength];              in.readFully(bodyBytes);  // read body              StudentMsg.Student student2 = StudentMsg.Student.parseFrom(bodyBytes); // body字節(jié)碼解析成Student              System.out.println("Header:" + bodyLength);              System.out.println("Body:");              System.out.println("ID:" + student2.getId());              System.out.println("Name:" + student2.getName());              System.out.println("Email:" + student2.getEmail());              System.out.println("Friends:");              List<String> friends = student2.getFriendsList();              for(String friend : friends) {                  System.out.println(friend);              }            } finally {              // 關(guān)閉連接              in.close();              out.close();              socket.close();          }      }  }  

用客戶端分別測試上面三個TCP服務(wù)器:

服務(wù)器輸出:

ID:1Name:客戶端Email:[email protected]:AB

客戶端輸出:

Header:32Body:ID:9Name:服務(wù)器Email:[email protected]:XY


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 九一免费版在线观看 | 午夜丰满少妇高清毛片1000部 | 成人在线视频网 | 大学生一级毛片在线视频 | 九九热精品视频在线 | 国产精品自在线拍 | 嫩呦国产一区二区三区av | 国产精品热| 精品99在线视频 | 黄色网页在线观看 | 李宗瑞国产福利视频一区 | 91午夜免费视频 | 黄色一级片免费观看 | 99riav国产在线观看 | 日韩视频一二三 | 宅男噜噜噜66国产在线观看 | 日日摸夜夜添夜夜添牛牛 | 亚洲第一成人在线视频 | 国产精品久久久久久久四虎电影 | 久久久一区二区三区四区 | 亚洲精品午夜国产va久久成人 | 久啪视频 | 免费毛片小视频 | 91色爱 | 看免费一级毛片 | 国产女厕一区二区三区在线视 | 黄色免费在线网站 | 91精品久久久久久久 | 日本看片一区二区三区高清 | 欧美成人一区二区三区电影 | 国产18成人免费视频 | 国产成人精品免高潮在线观看 | 国产流白浆高潮在线观看 | 国产成视频在线观看 | 最近免费观看高清韩国日本大全 | 国产宾馆3p国语对白 | 91精品国产乱码久久桃 | 国产91九色视频 | 日日草夜夜草 | 欧美a在线 | 娇喘在线|