这里咱们就要分析下netty4 中 hanlder从注册到执行用户消息事件的流程。java
在server端或是client端,都须要注册handler,才能使用。经过以下方式为channel设置相应的handler。linux
channel.pipeline().addLast(new RpcDecoder(MomResponse.class)); 或者 ChannelPipeline cp = socketChannel.pipeline(); cp.addLast(new RpcEncoder(RpcRequest.class));等
通常是要经过channel获取pipeline,由于Channel的构造函数中会 new DefaultChannelPipeline(this);git
而这个pipeline内部又维护了一个 双向链表,github
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
而在addLast的过程当中,以下 ,会将handler 加到内部链表的尾部。可是在add以前 ,会将其封装到一个DefaultChannelHandlerContext中,而这个context就 作为链表中的一个节点。经过链表表实现每一个handler的顺序执行。 数组
//DefaultChannelPipeline @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); } @Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; } private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; name2ctx.put(name, newCtx); callHandlerAdded(newCtx); }
netty中使用nio 中 selector也就是linux的epoll系统调用,来实现IO的多路复用。缓存
channel监听 SelectionKey.OP_READ | SelectionKey.OP_ACCEPT 事件,然后会调用channel中构造的内部类 nioUafe 的read方法 。unsafe.read(); 然后经过channel中pipeline串联整个msg的消息处理。核心是Context中的 fireChannelRead,由于每一个handler都封装到一个Context中,经过以下的方法安全
//DefaultChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound();//找到链表中下一个处理读的handler EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; } private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); //执行handler的逻辑。 } catch (Throwable t) { notifyHandlerException(t); } }
能够完成整个handler链的执行。app
而在handler的执行流中,通常在中间的handler中,在执行channelRead的业务逻辑中,会将 context自身传到方法里,并会经过调用该context的 fireChannelRead 将处理后的msg 经过 查找下一个handler(由于context是一个双向链表),找到相应的handler(In or Out)中的业务逻辑,直到最后一个hanlder不在调用 ctx.fireChannelRead 而整个handler链能够分为几类,对于In 也就是收到消息的处理handlers来讲,主要是分隔handler,Decoderhandler,业务逻辑handler。这三大类,是将收到的字节符按照设定的协议,执行完结束。socket
除了用户定义的业务逻辑的handler以外,netty也为咱们提供了不少十分有用的handler。咱们下面是以in类型的为主进行介绍,out逻辑。经常使用的有ByteToMessageDecoder 、SimpleChannelInboundHandler、ChannelInboundHandlerAdapter、SslHandler、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等,这些handler之间有继承的关系,在使用中咱们能够直接用,有些也能够经过 继承 来扩展达到咱们的业务功能。从基类开始介绍tcp
相对来讲比较底层的handler,能够直接继承,一般用在处理handler的register,unregister等事件,最为核心的就是继承 channelRead,经过以前的handler对msg的处理,直接能够转换为java类,执行业务。
继承自 ChannelInboundHandlerAdapter ,为咱们作了一次转换,将msg先转换为java类,而咱们能够经过继承,直接调用channelRead0,参数就是转换好的java类。使用十分简单。前提是前面的handler要先解码。一般放在最后一个handler。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); // 实现业务逻辑 } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
这个也是比较重要的handler,用户解码的基类handler,从名字也可猜出,其核心将接收的byte转换为用户定义的mssage, 用户须要 实现 decode方法,完成具体的转换。
以下是其channelRead源码
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; //上一次是否有累积的字节 if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes() || cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 expandCumulation(ctx, data.readableBytes());//剩余的空间不足写下本次的数据,扩充累积区, } cumulation.writeBytes(data);//将本次读取的data附加到上一次剩余的数据中。 data.release(); } callDecode(ctx, cumulation, out); //解码过程 } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); //当累积区已经不可读了,释放。 cumulation = null; } int size = out.size(); decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); //执行下一个handler。 } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
主要完成两个事件
1)解码(byte 转成object)
过程相对简单,经过定义了一个 cumulation 的累积缓存区,用以保存本次没有处理完的buf,并等待下一个tcp包的到来。一块儿传递到decode方法解决执行,如此反复。解决了粘包的问题。不过注意一点,这个handler是非线程安全的,一个channle对应一个该handler。因此一般咱们在加入到piepelie中都是从新new的。
而对于转换的逻辑来讲,就须要根据逻辑,转换成相应的对象了。经过callDecode,将从新组合后的cumulation,进行解码。将解码后的信息加到out中,该方法会经过循环,每次解码后的out大小与解码前大小是否一致,以此来决定是否结束本次解码过程。由于一次callDecode可能会 携带多个msg。
2) 下一个handler
将转换后的信息传递到下一个handler, 经过ctx.fireChannelRead。上面已经分析handler执行链的过程。
4. DelimiterBasedFrameDecoder
继承自ByteToMessageDecoder,只需完成解码的工做,一样从名字看出,起到分隔的做用,就是将收到的字节以特殊的字符分隔。通常要指定最大长度,以及分隔符。超过最大长度抛异常。
通常用在以下,以换行符结尾,转化为string。
5. FixedLengthFrameDecoder
同上,解码成固定长度的字节。
protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readSlice(frameLength).retain(); } }
接下来咱们给出几组经常使用的handler组合
1) 定义协议,decode为java对象,
channel.pipeline().addLast(new RpcDecoder(Response.class)); channel.pipeline().addLast(new RpcEncoder(Request.class)); channel.pipeline().addLast(handle);
这种在Decoder中须要用户去实现协议,最简单的过程以下,信息头部指定有效字节数,先读取头部长度。然后在读取相应长度的字节,反序列化。而复杂的可能设定magic、type、length等。
@Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int HEAD_LENGTH=4; if (in.readableBytes() < HEAD_LENGTH) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; in.readBytes(body); Object obj=SerializeUtils.Deserialize(body);//经过定义序列化工具将字节数组转换为指定类的实例 out.add(obj); }
2) 以换行符结尾分隔,转化为string。
// 以("\n")为结尾分割的 解码器 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // 字符串解码 和 编码 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // 本身的逻辑Handler pipeline.addLast("handler", new SelfHandler());
http://blog.csdn.net/langzi7758521/article/details/52712159