Netty(4-1)factorial~总结

本节大纲:java

一、Handler的执行顺序
二、自定义二进制协议(每条完整数据的组成),从而解决拆包和粘包。
三、经过为每一个channel建立新的handler,从而解决即便handler中使用全局变量,也能够避免竞态条件。并发

 

一、Handler的执行顺序。app

client中pipeline顺序: //first,add codec
pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); //then,add business logic
pipeline.addLast(new FactorialClientHandler()); 
server中pipeline顺序:
//first,codec pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); //then,business logic pipeline.addLast(new FactorialServerHandler());

 

总结: 写(outbound):自下而上,跳过inbound 读(inbound): 自上而下,跳过outbound Codec放在上边,业务逻辑handler放在下边。

 

二、自定义二进制协议(每条完整数据的组成),从而解决拆包和粘包。ide

每条完整数据的组成:'F'+4个字节的长度+数据 将传进来的number编码为二进制,在其前边加上'F'和4个字节的长度,做为前缀。 例如:42被编码为:'F',0,0,0,1,42 

客户端:oop

public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { StringBuilder sb = new StringBuilder(); for (int j = 0; j < 1024; j++) { sb.append(j); } BigInteger bigInt = new BigInteger(sb.toString()); ChannelFuture future = ctx.writeAndFlush(bigInt);//只发送1次 log.info("send:{}", bigInt); log.info("send字节数:{}", bigInt.toByteArray().length); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info("发送成功"); } } }); }

 

 

 

/** * <pre> * 自定义二进制协议:F+4字节长度+具体数值 * 例如:'F',0,0,0,1,42 解码为new BigInteger("42") * </pre> */ @Slf4j public class BigIntegerDecoder extends ByteToMessageDecoder { private int splitCount = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { log.info(">>>>>splitCount:{},可读字节数:{}",++splitCount,in.readableBytes()); //wait until the length prefix is available
        if (in.readableBytes() < 5) { return ; } in.markReaderIndex(); int magicNumber = in.readUnsignedByte(); if (magicNumber !='F') { throw new CorruptedFrameException("Invalid:"+magicNumber); } //wait until the whole data is available
        int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return ; } //convert the received data into a new BigInteger
        byte [] decoded = new byte[dataLength]; in.readBytes(decoded); out.add(new BigInteger(decoded)); } }

客户端发送了1240个字节的BigInteger,服务端接收:ui

18:15:13.121 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可读字节数:1024
18:15:13.126 [nioEventLoopGroup-3-1] >>>>>splitCount:2,可读字节数:1245编码

 

虽然客户端只发送了1次,但服务端分2次接收在BigIntegerDecoder中都接收完后,才调用FactorialServerHandler的channelRead0方法url

注意,spa

in.markReaderIndex(); 。。。 if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return ; }

以上的流程:.net

当第一次时,in.readableBytes()=1024,而dataLength=1245,因此进入该方法,将readerIndex复位到以前mark处,此例为0。舍弃该部分包数据。

当第二次时,in.readableBytes()=1245(说明,从0开始读的),读取到了完整的报文。

 

若是去掉以上代码,则会报错:

18:09:51.465 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可读字节数:1024 io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(5) + length(1240) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 5, widx: 1024, cap: 1024)

 

固然,服务端处理完并把原文发给客户端后,客户端也是分2次读取的:

18:15:13.119 [nioEventLoopGroup-2-1] send字节数:1240
18:15:13.153 [nioEventLoopGroup-2-1] >>>>>splitCount:1,可读字节数:1024
18:15:13.154 [nioEventLoopGroup-2-1] >>>>>splitCount:2,可读字节数:1245

 

 

三、经过为每一个channel建立新的handler,从而解决即便handler中使用全局变量,也能够避免竞态条件

并发发送数据包,且每一个数据包超过1024个字节,以下代码中的成员变量:

public class FactorialServerHandler extends SimpleChannelInboundHandler<BigInteger> { private BigInteger lastMultiplier = new BigInteger("1"); private BigInteger factorial = new BigInteger("1"); @Override protected void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception { //计算阶乘并发送到客户端
        lastMultiplier = msg; factorial = factorial.multiply(msg); ctx.writeAndFlush(factorial); } 。。。

客户端调用:

public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { executor(2,Thread.currentThread().getName());// 2*3*4*5=120
 } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { executor(3,Thread.currentThread().getName());// 3*4*5=60
 } }); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { executor(4,Thread.currentThread().getName());// 4*5=20
 } }); t3.start(); } public static void executor(int next,String threadName) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new FactorialClientInitializer(next)); // make a new connection
            ChannelFuture f = b.connect(HOST, PORT).sync(); // get the handler instance to retrieve the answer.
            FactorialClientHandler handler = (FactorialClientHandler) f.channel().pipeline().last(); // print out the answer
            log.info("threadName:{},开始:{},结束:{},结果:{}", threadName,next,COUNT, handler.getFactorial()); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }

结果:不乱,各自打印各自的。由于,每发送1条数据则建立1个Channel和handler ,因此不会乱。

 

四、codec是netty封装好了的handler,简化代码开发。

本例中涉及的是:

ByteToMessageDecoder(inbound):必须实现decode方法

MessageToByteEncoder<Number>(outbound):必须实现encode方法

 

最后,

以上的3参考代码:userguide-04-factorial。一、2参考代码:userguide-04-2-factorial

相关文章
相关标签/搜索