咱们知道,基于JDK默认的序列化机制能够避免操做底层的字节数组,从而提高了开发效率。也即是java序列化机制的用武之地。java
那么这和编解码有什么关系呢?因为java序列化的目的有两个:一是对象持久化,另外一个是网络传输。而编解码技术是贯穿在网络传输之中的。在进行远程跨进程服务调用时,须要把被传输的Java对象编码为字节数组或者ByteBuffer对象。当读取到ByteBuffer对象或字节数组时,须要将其编码为发送时的Java对象。这被称为Java对象编解码技术。数组
因为Java序列化的种种缺陷,衍生出了多种编解码技术和框架,其中MessagePack、JBoss Marshalling和Google Protobuf是比较流行的,本文只着重介绍Protobuf的原理和使用。缓存
1、Java序列化的缺点网络
1 没法跨语言框架
这也是Java序列化最致命的问题。对于跨进程的服务调用,当咱们须要和异构语言进程交互时,Java序列化就难以胜任。而且经Java序列化后的字节数组,别的语言没法进行反序列化,就严重阻碍了它的应用。异步
2 码流太大ide
看下列代码工具
public class UserInfo implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String userName; public final String getUserName() { return userName; } public final void setUserName(String userName) { this.userName = userName; } public final int getUserID() { return userID; } public final void setUserID(int userID) { this.userID = userID; } private int userID; public UserInfo buildUserName(String userName) { this.userName = userName; return this; } public UserInfo buildUserID(int userID) { this.userID = userID; return this; } public byte[] codeC() { ByteBuffer buffer = ByteBuffer.allocate(1024); byte[] value = this.userName.getBytes(); buffer.putInt(value.length); buffer.put(value); buffer.putInt(this.userID); buffer.flip(); value = null; byte[] result = new byte[buffer.remaining()]; buffer.get(result); return result; } } public class TestUserInfo { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); System.out.println("The jdk serializable length is : " + b.length); bos.close(); System.out.println("-------------------------------------"); System.out.println("The byte array serializable length is : " + info.codeC().length); } }
执行TestUserInfo,结果以下:oop
这代表,采用JDK序列化机制编码后的二进制数组大小居然是二进制编码的4.75倍!性能
3 序列化性能过低
将以前的代码稍做修改,改形成性能测试版本
public byte[] codeC(ByteBuffer buffer) { buffer.clear(); byte[] value = this.userName.getBytes(); buffer.putInt(value.length); buffer.put(value); buffer.putInt(this.userID); buffer.flip(); value = null; byte[] result = new byte[buffer.remaining()]; buffer.get(result); return result; }
对UserInfo进行改造,新增以上所示方法,并建立一个性能测试版本的UserInfo测试程序。以下:
public class PerformTestUserInfo { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); int loop = 1000000; ByteArrayOutputStream bos = null; ObjectOutputStream os = null; long startTime = System.currentTimeMillis(); for(int i=0;i<loop;i++) { bos = new ByteArrayOutputStream(); os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); bos.close(); } long endTime = System.currentTimeMillis(); System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms"); System.out.println("----------------------------------------------"); ByteBuffer buffer = ByteBuffer.allocate(1024); startTime = System.currentTimeMillis(); for(int i=0;i<loop;i++) { byte[] b = info.codeC(buffer); } endTime = System.currentTimeMillis(); System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms"); } }
结果为:
可见,Java序列化的性能只有二进制编码的51.4%左右。
2、业界主流的编解码框架
Google Protobuf介绍
特色:
1 结构化数据存储格式
2 高效的编解码性能
3 语言无关、平台无关、扩展性好
4 官方支持Java、C++和Python三种语言
Facebook Thrift介绍
Thrift适用于搭建大型数据交换及存储的通用工具,对于大型系统中的内部数据传输,相对于JSON和XML在性能和传输大小上都有明显的优点。
组件:语言系统及IDL编译器,TProtocol,TTransport,TProcessor以及TServer
咱们重点关注的是编解码框架,与之对应的就是TProtocol。因为一般使用Thrift的时候都会采起RPC框架的方式。可是,它的TProtocol编解码框架仍是能够以类库的方式独立使用。
Thrift支持三种比较典型的编解码方式
1 通用的二进制编解码
2 压缩二进制编解码
3 优化的可选字段压缩编解码
JBoss Marshalling介绍
优势:
1 可插拔的类解析器
2 可插拔的对象替换技术
3 可插拔的预约义类缓存表
4 无须实现java.io.Serializable接口,便可实现Java序列化
5 经过缓存技术提高对象的序列化性能
然而JBoss Marshalling更可能是在JBoss内部使用,应用范围有限。
3、Google Protobuf编解码
这里经过一个简单的例程来学习介绍怎样使用Protobuf对POJO对象进行编解码,而后讲解如何在Netty中对POJO对象进行Protobuf编解码,并在两个进程之间进行通讯和数据交换。
此处略过Protobuf环境搭建过程,直接看经过.proto文件编译后的java代码来进行Protobuf的使用。
测试代码以下:
public class TestSubscribeReqProto { private static byte[] encode(SubscribeReqProto.SubscribeReq req) { return req.toByteArray(); } private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException { return SubscribeReqProto.SubscribeReq.parseFrom(body); } private static SubscribeReqProto.SubscribeReq createSubscribeReq() { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(1); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.setAddress(address.toString()); return builder.build(); } /** * @param args * @throws InvalidProtocolBufferException */ public static void main(String[] args) throws InvalidProtocolBufferException { SubscribeReqProto.SubscribeReq req = createSubscribeReq(); System.out.println("Before encode : " + req.toString()); SubscribeReqProto.SubscribeReq req2 = decode(encode(req)); System.out.println("After decode : " + req.toString()); System.out.println("Assert equal : --> " + req2.equals(req)); } }
运行后,输出结果:
运行结果代表,通过Protobuf编码后,生成的SubscribeReqProto.SubscribeReq与编码前原始的SubscribeReqProto.SubscribeReq等价。下面,使用Netty的Protobuf编解码框架试试看。
服务端代码:
public class SubReqServer { public void bind(int port) throws Exception { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); //绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { //优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { } } new SubReqServer().bind(port); } } @Sharable public class SubReqServerHandler extends ChannelHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReq) msg; if("Lilinfeng".equalsIgnoreCase(req.getUserName())) { System.out.println("Service accept client subscribe req : [" + req + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
因为使用了ProtobufEncoder,因此不须要对SubscribeRespProto.SubscribeResp进行手工编码。另外,ProtobufDecoder已经对消息进行了自动解码,所以接收到的请求消息能够直接使用。
客户端代码以下:
public class SubReqClient { public void connect(int port, String host) throws Exception { //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); //发起异步链接操做 ChannelFuture f =b.connect(host, port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { //优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args!=null && args.length > 0) { try{ port = Integer.valueOf(args[0]); } catch(NumberFormatException e) { //use default value } } new SubReqClient().connect(port, "127.0.0.1"); } } public class SubReqClientHandler extends ChannelHandlerAdapter { public SubReqClientHandler() { super(); // TODO Auto-generated constructor stub } public void channelActive(ChannelHandlerContext ctx) { for(int i=0;i<10;i++) { ctx.write(subReq(i)); } ctx.flush(); } private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(i); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.setAddress(address.toString()); return builder.build(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端接收到服务端的应答消息以后会直接打印,按照统计,应该打印10次。下面就测试下Protobuf的服务端和客户端,看它是否能正常运行。
能够看出利用Netty提供的Protobuf编解码能力,咱们再不须要了解Protobuf实现和使用细节的状况下就能轻松支持Protobuf编解码,能够方便地实现跨语言的远程服务调用和与周边的异构系统进行通讯对接。
本节完结。