对于 RPC 协议来讲,最重要的就是对象的发送与接收,这就要用到序列化与反序列化,也称为编码和解码,序列化与反序列化和网络传输通常都在对应的 RPC 框架中完成。java
序列化与反序列化的流程以下:git
JavaBean-> stub(client) <->skeleton(server)->JavaBean,简单点说就是编码和解码。apache
相比于 RMI 远程方法调用,不少 RPC 远程过程调用的跨语言的,这就须要序列化于反序列化协议也支持跨语言。Google Procobuf 就是这样一种跨语言的序列化于反序列化协议,效率很是高(怎么作到比其余协议效率高那?比其余协议压缩生成的对象小)。网络
Netty 对于 ProtoBuf 提供了很好的支持。多线程
先看如何单独使用 Google ProtoBuf架构
新建 .proto 结构描述文件框架
syntax = "proto2"; package com.paul.protobuf; //加快解析速度 option optimize_for = SPEED; option java_package = "com.paul.protobuf"; option java_outer_classname = "DataInfo"; message Student{ reuqired string name = 1; option int32 = 2; option string address = 3; }
使用对应的编译文件生成对应的 Java 类socket
Proton —java_out src/main/java src/protobuf/Student.protoasync
这时在咱们代码的 src/main/java 文件夹下生成了一个新的 pkg com.paul.protobuf,里面生成了 DataInfo 类。对象会有对应的 builder 方法让咱们来构建。maven
测试序列化方法
// 构建对象->字节->对象 public class ProtoBufTest{ public static void main(String[] args) throws Exception{ DataInfo.Student student = DataInfo.Student.newBuilder().setName("张三").setAge(20).setAddress("abc").build(); byte[] student2ByteArray = student.toByteArray(); DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray); System.out.println(studdent2); } }
在来看 Netty 对 Google ProtoBuf 的支持
仍是只给出不同的部分(服务单和客户端的这部分是同样的):
@Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); //解码器 pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); //编码器 pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new MyServerHandler()); }
测试方法就是在客户端组装一个 DataInfo.Student 而后发送给服务端,这里就不演示了。
你们可能会发现上面的代码存在一个问题,就是上面的程序只能对 DataInfo.Student 进行编解码,若是传递消息的类型有多种怎么办那?
解决方案一:定义义协议,须要本身实现解码器,经过前两位来标识具体的 JavaBean 类型。
解决方案二:定义一个最外层的类,经过枚举的方式来肯定传递的 JavaBean 类型。
好比咱们有两个 JavaBean
message MyMessage{ enum DataType{ PersonType = 1; DogType = 2; CatType = 3; } required Datatype data_type = 1; //oneof 在同一时刻只有一个字段会被设置,字段之间会共享内存,后面设置会自动清空前面的。 oneof dataBody{ Person person = 2; Dog dog = 3; Cat cat = 4; } } message Person{ option string name = 1; option int32 age = 2; option string address = 3; } message Dog{ option string name = 1; option int32 age = 2; } message Cat{ option string name = 1; option int32 city = 2; }
Pipeline 的改动(客户端和服务端):
pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));
咱们本身的 handler 的改动:
@Overrode public void channelActive(ChannelHandlerContext ctx) throws Exception{ MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder(). setDataType(DataType.PersonType.PersonType). setPerson(MyDataInfo.Person.newBuilder(). setName("张三").setAge(20). setAddress("111").build()). build(); ctx.channel().writeAndFlush(myMessage); }
服务端 handler 根据 enum 的类型分别进行解析。
在实际的应用环境中,咱们客户端和服务端大几率是两个分开的应用程序,此时咱们使用 Google ProtoBuf 时 .proto 文件和对应的 class 文件是否是须要在两边都保存一份,若是有修改会很是麻烦。下面咱们介绍一种最佳实践。
最佳实践是使用 git 做为版本控制系统为前提的:
不那么好的方案:git submodule,就至关于 maven 的子模块,客户端和服务端都依赖这个模块。
比较好的方案:git subtree,将公共的代码合并到 server 和 client 端,至关于向 server 和 client 提交代码。
Apache Thrift 和 Google ProtoBuf 总体很是类似,适用于可伸缩的跨语言的服务开发。Thrift 至关于 Netty + Google ProtoBuf,是一个高性能 RPC 框架。Thrift 底层是 socket + RPC 的模式。
Thrift 是一个典型的 CS 结构,客户端和服务端可使用不一样的语言开发,既然客户端和服务端能使用不一样的语言开发,那么必定有一种中间语言来关联服务端和客户端,这就是 IDL(Interface Description Language)。
Thrift 如何实现多语言之间的通讯?
数据传输使用 socket (多种语言均支持),数据再以特定的格式(String 等)发送,接收方语言进行解析。
如何使用?
定义 thrift 的文件,由 thrift 文件(IDL) 生成双方语言的接口,model,在生成的 model 以及接口中会有解码编码的代码。
Thrift 中的服务
Thrift 定义服务至关于 Java 中建立 Interface 同样,建立的 service 通过代码生成命令以后就会生成客户端和服务端的框架代码,定义形式以下:
service HelloWorldService{ //service 中定义的函数,至关于 java interface 中定义的方法 string doAction(1:string name, 2:i32 age); }
.thrift 文件的定义
// java 中的包名 namespace jave thrift.generate // 定义别名 typedef i16 short typedef i32 int typedef i64 long typedef bool boolean typedef string String struct Person{ 1: optional String username, 2: optional int age, 3: optional boolean married } exception DataException{ 1: optional String message, 2: optional String callStack, 3: optional String date } service PersonService{ Person getPersonByName(1: required String username) throws (1: DataException dataException), void savePerson(1:requried Person person) throws (1:DataException dataException) }
编译 thrift 文件
thrift --gen java src/thrift/data.thrift
生成的文件
Person.java 里面包含了编解码的方法,PersonService 里面包含了 getPersonByName 和 savePerson 的方法。
测试方法:
服务端服务的具体实现方法
public class PersonServiceImpl implements PersonService.Iface{ @Override public Person getPersonByName(String username) throws DataException,TException{ Person p = new Person(); p.setUserName("paul"); p.setAge(25); p.setMarried(true); return p; } @Override public void savePerson(Person person) throws DataException,TException{ System.out.println(person.getUserName()); } }
Thrift 的服务端:
public class ThriftServer{ public static void main(String[] args){ //非阻塞的 socket server TNonblockingServerSocket socket = new TNonblockingServerSocket(8899); // 高可用的 server THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4); PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl()); arg.protocolFactory(new TCompactPrococol.Factory()); arg.transportFactory(new TFramedTransport.Facotry()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new THsHaServer(arg); System.out.println("Thrift Server Started"); //死循环 server.serve(); } }
Thrift 的客户端:
public class ThriftClient{ publiuc static void main(String[] args){ TTransport transport = new TFramedTransport(new TSocket ("localhost",8899),600); TProcotol procotol = new TComapctProcotol(transport); PersonService.Client client = new PersonService.Client(procotol); try{ //打开 socket transport.open(); //好像调用本地方法同样 Person person = client.getPersonByName("paul"); System.out.println(person.getAge()); }catch(Exception ex){ throw ex; }finally{ transport.close(); } } }
Thrift 的架构:
Thrift 的传输格式,协议:
TBinaryProtocol-二进制格式
TCompactProtocol-压缩格式
TJSONProtocol-JSON 格式
TSimpleJSONProtocol-提供 JSON 只写协议,生成的文件很容易经过脚本语言解析。不多使用,缺乏元数据信息,接收方不能读取出来。
TDebugProtocol-使用易懂的可读的文本格式,以便于 debug。
Thrift 数据传输方式,transport:
TSocket-阻塞式 socket。
TFramedTransport-以 frame 为单位进行传输,非阻塞式服务中使用。
TFileTransport-以文件形式进行传输。
TMemoryTransport-将内存用于 I/O,Java 实现时内部实际使用了简单的 ByteArrayOutputStream。
支持的服务模型,server:
TSimpleServer-简单的单线程服务模型,经常使用于测试。
TThreadPoolServer-多线程服务模型,标准的阻塞式 IO。
TNonboockingServer-多线程服务模型,使用非阻塞式 IO(须要使用 TFramedTransport 数据传输方式)。
THsHaServer-THsHa 引入了线程池去处理,其模型把读写任务放到线程池处理。Half-sync/Half-async 的处理模式,Half-sync 是在处理 IO 事件上,Half-async 用于 handler 对 rpc 的同步处理。