编解码器的做用是将原始字节数据与自定义的消息对象进行互转。编码器负责处理“出站”数据。java
解码器web
解码器负责解码“入站”数据从一种格式到另外一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。安全
解码器有三种类型:解码字节到消息;解码消息到消息以及解码消息到字节。服务器
ByteToMessageDecoderwebsocket
decode(ChannelHandlerContext, ByteBuf, List<Object>):这个方法须要本身实现的抽象方法,做用是将ByteBuf数据解码成其余形式的数据并发
decodeLast(ChannelHandlerContext, ByteBuf, List<Object>)app
ReplayingDecodersocket
ReplayingDecoder是ByteToMessageDecoder的一种特殊的抽象基类,读取缓冲区的数据以前须要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需本身检查,若ByteBuf中有足够的字节,则正常读取,不然就是中止解码。不是全部的操做都被ByteBuf支持,如有一个不支持的就会抛出DecoderException;ByteBuf.readableBytes()大部分不会返回指望值ide
MessageToMessageDecoder大数据
decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Excetopn;
编码器
编码器有两种类型:消息对象编码成消息对象和消息对象编码成字节码
Netty也提供了两个抽象类:MessageToByteEncoder和MessageToMessageEncoder。须要重写encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception;
byte-to-byte编解码器
Netty提供了ByteArrayEncoder和ByteArrayDecoder两个类。
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception{ byte[] array = new byte[msg.readableBytes()]; msg.getBytes(0, array); out.add(array); } } @Sharable public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]>{ protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception{ out.add(Unpooled.wrappedBuffer(msg)); } }
ByteToMessageCodec
ByteToMessageCodec用来处理byte-to-message和message-to-byte。ByteToMessageCodec是一种组合,等同于ByteToMessageDecoder和MessageToByteEncoder的组合。MessageToByteEncoder有两个抽象方法:
encode(ChannelHandlerContext, I, ByteBuf) //编码
decode(ChannelHandlerContext, ByteBuf, List<Object>) //解码
MessageToMessageCodec
MessageToMessageCodec用于message-to-message的编码和解码。能够当作是MessageToMessageDecoder和MessageToMessageEncoder的组合体。MessageToMessageCodec有两个抽象方法:
encode(ChannelHandlerContext, OUTBOUNG_IN, List<Object>)
decode(ChannelHandlerContext, INBOUND_IN, List<Object>)
CombinedChannelDuplexHandler
自定义编码器和解码器
public class CharCodec extends CombinedChannelDuplexHandler<Decoder, Encoder>{ public CharCodec(){ super(new Decoder(), new Encoder()); } }
使用SSL/TLS建立安全的netty程序
java提供了SslContext和SslEngine支持SSL/TLS。Netty提供了SslHandler,它扩展了Java的SslEngine。
public class SslChannelInitializer extends ChannelInitializer<Channel>{ private final SSLContext context; private final boolean client; private final boolean startTls; public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) { this.context = context; this.client = client; this.startTls = startTls; } protected void initChannel(Channel ch) throws Exception{ SSLEngine engine = context.createSSLEngine(); engine.setUserClientMode(client); ch.pipeline().addFirst("ssl", new SslHander(engine, startTls)); } }
SslHandler必需要添加到ChannelPipeline的第一个位置。
SSL/TLS的方法:
setHandshakeTimeout(long handshakeTimeout, TimeUnit unit):设置握手超时时间,ChannelFuture将获得通知
setHandshakeTimeoutMillis(long handshakeTimeoutMillis):设置握手超时时间,ChannelFuture将会获得通知
getHandshakeTimeoutMillis():获取握手超时时间值
setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit):设置关闭通知超时事件,若超时,ChannelFuture会关闭失败
setHandshakeTimeoutMillis(long handshakeTimeoutMillis):设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
getCloseNotifyTimeoutMillis():获取关闭通知超时时间
handshakeFuture():返回完成握手后的ChannelFuture
close():发送关闭通知请求关闭和销毁
一个HTTP请求/响应消息可能不止一个,但最终都会包含LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的两个接口,分别用来完成http请求和响应。
Netty提供了HTTP请求和响应的编码器和解码器:
HttpRequestEncoder:将HttpRequest或HttpContent编码成ByteBuf
HttpRequestDecoder:将ByteBuf解码成HttpRequest和HttpContent
HttpResponseEncoder:将HttpResponse或HttpContent编码成ByteBuf
HttpResponseDecoder:将ByteBuf解码成HttpResponse和HttpContent
HTTP消息聚合
处理HTTP时可能接受HTTP消息片断,Netty须要缓冲直到接受完整个消息。要处理HTTP消息,Netty提供了HttpObjectAggregator。经过HttpObjectAggregator,Netty能够聚合HTPP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这样就消除了断裂消息,保证了消息的完整性。
HTTP压缩
Netty支持“gzip”和“deflate”。
protected void intiChannel(Channel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); if(client){ pipeline.addLast("codec", new HttpClientCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); }else{ pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); } pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); }
使用HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel>{ private final SSLContext context; private final boolean client; public HttpsCodecInitializer(SSLContext context, boolean client){ this.context = context; this.client = client; } protected void initChannel(Channel ch) throws Exception{ SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("ssl", new SslHandler(engine)); if(client) pipeline.addLast("codec", new HttpClientCodec()); else pipeline.addLast("codec", new HttpServerCodec()); } }
WebSocket
Netty经过ChannelHandler对WebSocket进行支持。Netty支持以下WebSocket:
BinaryWebSocketFrame:包含二进制数据
TextWebSocketFrame:包含文本数据
ContinuationWebSocketFrame:包含二进制数据或文本数据
CloseWebSocketFrame:表明一个关闭请求,包含关闭状态码和短语
PingWebSocketFrame:WebSocketFrame要求PongWebSocketFrame发送数据
PongWebSocketFrame:WebSocketFrame要求PingWebSocketFrame响应
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{ protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler()); } public static final class TextFrameHanlder extends SimpleChannelInboundHandler<TextWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{ } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception{ } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{ protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception{ } } }
SPDY
SPDY是Google开发的基于TCP的应用层协议。SPDY的定位:将页面加载时间减小50%;最大限度减小部署的复杂性;避免网站开发者改动内容
SPDY技术实现:单个TCP链接支持并发的HTTP请求;压缩包头和去掉没必要要的头部来减小当前HTTP使用的带宽;定义一个易实现,在服务器端高效的协议。经过减小边缘状况,定义易解析的消息格式来减小HTTP的复杂性;强制使用SSL;容许服务器在须要时发起对客户端的链接并推送数据。
Netty用三种不一样的ChannelHanlder处理闲置和超时链接:
IdeStateHandler:当一个通道没有进行读写或运行了一段时间后发出IdleStateEvent
ReadTimeoutHandler:在指定时间内没有接收到任何数据将抛出ReadTimeoutException
WriteTimeoutHandler:在指定时间内写入数据将抛出WriteTimeoutException
public class IdleStateHandlerInitializer extends ChannelInitialzier<Channel>{ protected void initChannel(Channel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{ private static final ByteBuf HEARTBEAT_SEQUEUE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ if(evt instanceof IdleStateEvent) ctx.writeAndFlush(HEARTBEAT_SEQUEUE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); else super.userEventTriggered(ctx, evt); } } }
解码分隔符和基于长度的协议
Netty提供两个类用于提取序列分割:
DelimiterBasedFrameDecoder:接收ByteBuf由一个或多个分隔符拆分
LineBasedFrameDecoder:接收ByteBuf以分隔线结束,如\n, \r\n
public class LineBaseHandlerInitializer extends ChannelInitializer<Channel>{ protected void initChannel(Channel ch) throw Exception{ ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1024), new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{ } } }
Netty提供了两个以长度为单位的解码器:FixedLengthFrameDecoder和LengthFieldBasedFrameDecoder。
public class LengthBasedInitializer extends ChannelInitializer<Channel>{ protected void intiChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new LengthFileBasedFrameDecoder(65 * 1024, 0, 8)).addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{ } } }
写大数据
Netty使用零拷贝写文件内容时经过DefaultFileRegion,ChannelHandlerContext和ChannelPipeline。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ File file = new File("test.txt"); FileInputStream fis = new FileInputStream(file); FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length()); Channel channel = ctx.channel(); channel.writeAndFlush(region).addListener(new ChannelFutureListener(){ protected void operationComplelte(ChannelFuture future) throws Exception{ if(!future.isSuccess()) Throwable cause = future.cause(); } }); }
Netty提供了ChunkedWriteHandler,容许经过处理ChunkedInput来写大的数据块。ChunkedFile,ChunkedNioFile,ChunkedStream和ChunkedNioStream实现了ChunkedInput。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel>{ private final File file; public ChunkedWriteHandlerInitializer(File file){ this.file = file; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter{ public void channelActive(ChannelHandlerContext ctx) throws Exception{ super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
序列化
Java提供了ObjectInputStream和ObjectOutputStream等序列化接口。
io.netty.handler.codec.serialization提供了以下接口:CompatibleObjectEncoder,CompactObjectInputStream,CompactObjectOutputStream,ObjectEncoder,ObjectDecoder,ObjectEncoderOutputStream和ObjectDecoderInputStream。
JBoss Marshalling序列化的速度是JDK的3倍且序列化的结构更紧凑。
io.netty.handler.codec.marshalling提供以下接口:CompatibleMarshallingEncoder,CompatibleMarshallingDecoder,MarshallingEncoder和MarshallingDecoder。
pubilc class MarshallingInitializer extends ChannelInitializer<Channel>{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider){ this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)).addLast(new MarshallingEncoder(marshallerProvider)).addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHander<Serializable>{ protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{ } } }
ProtoBuf是Google开源的一种编码和解码技术,做用是使序列化数据更高效。
io.netty.handler.codec.protobuf提供了:ProtoBufDecoder,ProtobufEncoder,ProtobufVarint32FrameDecoder和ProtibufVarint32LengthFieldPrepender
public class ProtoBufInitializer extends ChannelInitializer<Channel>{ private final MessageLite lite; public ProtoBufInitializer(MessageLite lite){ this.lite = lite; } protected void initChannel(Channel ch) throws Exception{ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufEncoder()).addLast(new ProtobufDecoder(lite)).addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable>{ protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception{ } } }