jboss marshalling是jboss内部的一个序列化框架,速度也十分快,这里netty也提供了支持,使用十分方便。java
TCP在网络通信的时候,一般在解决TCP粘包、拆包问题的时候,通常会用如下几种方式:数组
一、 消息定长 例如每一个报文的大小固定为200个字节,若是不够,空位补空格;网络
二、 在消息尾部添加特殊字符进行分割,如添加回车;框架
三、 将消息分为消息体和消息头,在消息头里面包含表示消息长度的字段,而后进行业务逻辑的处理。ide
在Netty中咱们主要利用对象的序列化进行对象的传输,虽然Java自己的序列化也能完成,可是Java序列化有不少问题,如后字节码流太大,以及序列化程度过低等。Jboss的序列化有程度较高、序列化后码流较小。这里利用Jboss的Marshalling测试一个简单的对象序列化。工具
引入marshallingoop
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.0.CR1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.CR1</version> </dependency>
server:测试
public class Server { public static void main(String[] args) throws InterruptedException { //1.第一个线程组是用于接收Client端链接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2.第二个线程组是用于实际的业务处理的 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup);//绑定两个线程池 b.channel(NioServerSocketChannel.class);//指定NIO的模式,若是是客户端就是NioSocketChannel // b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的缓冲区设置 // b.option(ChannelOption.SO_SNDBUF, 32*1024);//设置发送缓冲的大小 // b.option(ChannelOption.SO_RCVBUF, 32*1024);//设置接收缓冲区大小 // b.option(ChannelOption.SO_KEEPALIVE, true);//保持连续 b.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { //设置Marshalling的编码和解码 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = b.bind(8765).sync();//绑定端口 future.channel().closeFuture().sync();//等待关闭(程序阻塞在这里等待客户端请求) bossGroup.shutdownGracefully();//关闭线程 workerGroup.shutdownGracefully();//关闭线程 } }
ServerHandlerui
public class ServerHandler extends ChannelHandlerAdapter{ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Send send = (Send) msg; System.out.println("client发送:"+send); Receive receive = new Receive(); receive.setId(send.getId()); receive.setMessage(send.getMessage()); receive.setName(send.getName()); ctx.writeAndFlush(receive); } }
clientthis
public class Client { public static void main(String[] args) throws InterruptedException { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf)); //sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture f=b.connect("127.0.0.1",8765).sync(); for(int i=1;i<=5;i++){ Send send = new Send(); send.setId(i); send.setMessage("message"+i); send.setName("name"+i); f.channel().writeAndFlush(send); } f.channel().closeFuture().sync(); worker.shutdownGracefully(); } }
clientHandler
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Receive receive = (Receive) msg; System.out.println("server反馈:"+receive); } }
send
public class Send implements Serializable { /** * serialVersionUID:TODO(用一句话描述这个变量表示什么) * * @since 1.0.0 */ private static final long serialVersionUID = 1L; private Integer id; private String name; private String message; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "Send [id=" + id + ", name=" + name + ", message=" + message + "]"; } }
receive
public class Receive implements Serializable{ /** * serialVersionUID:TODO(用一句话描述这个变量表示什么) * @since 1.0.0 */ private static final long serialVersionUID = 1L; private Integer id; private String name; private String message; private byte[] sss; public byte[] getSss() { return sss; } public void setSss(byte[] sss) { this.sss = sss; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "Receive [id=" + id + ", name=" + name + ", message=" + message + ", sss=" + Arrays.toString(sss) + "]"; } }
marshalling工厂类
public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先经过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识建立的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration建立provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
运行结果
server反馈:Receive [id=1, name=name1, message=message1, sss=null] server反馈:Receive [id=2, name=name2, message=message2, sss=null] server反馈:Receive [id=3, name=name3, message=message3, sss=null] server反馈:Receive [id=4, name=name4, message=message4, sss=null] server反馈:Receive [id=5, name=name5, message=message5, sss=null]