Netty 是一个异步的,事件驱动的网络通讯框架,用于高性能的基于协议的客户端和服务端的开发。javascript
异步指的是会当即返回,并不知道到底发送过去没有,成功没有,通常都会使用监听器来监听返回。java
事件驱动是指开发者只须要关注事件对应的回调方法便可,好比 channel active,inactive,read 等等。web
网络通讯框架就不用解释了,不少你很是熟悉的组件都使用了 netty,好比 spark,dubbo 等等。编程
第一个简单的例子,使用 Netty 实现一个 http 服务器,客户端调用一个没有参数的方法,服务端返回一个 hello world。bootstrap
Netty 里面大量的代码都是对线程的处理和 IO 的异步的操做。浏览器
package com.paul; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Server { public static void main(String[] args) throws InterruptedException { //定义两个线程组,事件循环组,能够类比与 Tomcat 就是死循环,不断接收客户端的链接 // boss 线程组不断从客户端接受链接,但不处理,由 worker 线程组对链接进行真正的处理 // 一个线程组其实也能完成,推荐使用两个 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务端启动器,能够轻松的启动服务端的 channel ServerBootstrap serverBootstrap = new ServerBootstrap(); //group 方法有两个,一个接收一个参数,另外一个接收两个参数 // childhandler 是咱们本身写的请求处理器 serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class) .childHandler(new ServerInitializer()); //绑定端口 ChannelFuture future = serverBootstrap.bind(8011).sync(); //channel 关闭的监听 future.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.paul; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能够有不少 handler,一层层过滤的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的组合,编码和解码的 h handler pipeline.addLast("httpServerCodec", new HttpServerCodec()); pipeline.addLast("handler", new ServerHandler()); } }
package com.paul; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if(httpObject instanceof HttpRequest) { ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //单纯的调用 write 只会放到缓存区,不会真的发送 channelHandlerContext.writeAndFlush(response); } } }
咱们在 SimpleChannelInboundHandler 里分析一下,先看它继承的 ChannelInboundHandlerAdapter 里面的事件回调方法,包括通道注册,解除注册,Active,InActive等等。缓存
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }
执行顺序为 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。服务器
Netty 自己并非遵循 servlet 规范的。Http 是基于请求和响应的无状态协议。Http 1.1 是有 keep-alived 参数的,若是3秒没有返回,则服务端主动关闭了解,Http 1.0 则是请求完成直接返回。websocket
Netty 的链接会被一直保持,咱们须要本身去处理这个功能。网络
在服务端发送完毕数据后,能够在服务端关闭 Channel。
ctx.channel.close();
对于 Http 编程来讲,咱们实现了服务端就能够了,客户端彻底可使用浏览器或者 CURL 工具来充当。可是对于 Socket 编程来讲,客户端也得咱们本身实现。
服务器端:
Server 类于上面 Http 服务器那个同样,在 ServerInitoalizer 有一些变化
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能够有不少 handler,一层层过滤的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); // TCP 粘包 拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); // 字符串编码,解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ServerHandler()); } }
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); ctx.channel().writeAndFlush("from server:" + UUID.randomUUID()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端:
public class Client { public static void main(String[] args) throws InterruptedException { //客户端不须要两个 group,只须要一个就够了,直接链接服务端发送数据就能够了 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); //服务器端既可使用 handler 也可使用 childhandler, 客户端通常使用 handler //对于 服务端,handler 是针对 bossgroup的,childhandler 是针对 workergorup 的 bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync(); channelFuture.channel().closeFuture().sync(); }finally { eventLoopGroup.shutdownGracefully(); } } }
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能够有不少 handler,一层层过滤的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); // TCP 粘包 拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); // 字符串编码,解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ClientHandler()); } }
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); System.out.println("client output:"+ msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().writeAndFlush("23123"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Server 端:
public class ServerHandler extends SimpleChannelInboundHandler<String> { //定义 channel group 来管理全部 channel private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "加入\n"); channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "离开\n"); //这个 channel 会被自动从 channelGroup 里移除 } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "上线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "离开"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端:
BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(;;){ channel.writeAndFlush(br.readLine() + "\r\n"); }
集群之间各个节点的通讯,主从节点之间须要进行数据同步,每当主节点的数据发生变化时,经过异步的方式将数据同步到从节点,同步方式能够用日志等等,所以主从节点之间不是实时一致性而是最终一致性。
节点与节点之间如何进行通讯那?这种主从模式是须要互相之间有长链接的,这样来肯定对方还活着,实现方式是互相之间定时发送心跳数据包。若是发送几回后对方仍是没有响应的话,就能够认为对方已经挂掉了。
回到客户端与服务端的模式,有人可能会想,客户端断开链接后服务端的 handlerRemoved 等方法不是能感知吗?还要心跳干什么哪?
真实状况其实很是复杂,好比手机客户端和服务端进行一个长链接,客户端没有退出应用,客户端开了飞行模型,或者强制关机,此时双方是感知不到链接已经断掉了,或者说须要很是长的时间才能感知到,这是咱们不想看到的,这时就须要心跳了。
来看一个示例:
其余的代码仍是和上面的同样,咱们就不列出来了,直接进入主题,看不一样的地方:
服务端
// Netty 为了支持心跳的 IdleStateHandler,空闲状态监测处理器。 pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));
来看看 IdleStateHandler 的说明
/* * Triggers an IdleStateEvent when a Channel has not performed read, write, or both * operation for a while * 当一个 channel 一断时间没有进行 read,write 就触发一个 IdleStateEvent */ public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); //三个参数分别为多长时间没进行读,写或者读写操做则触发 event。 }
触发 event 后咱们编写这个 event 对应的处理器。
public class MyHandler extends ChannelInboundHandlerAdapter{ //触发某个事件后这个方法就会被调用 //一个 channelhandlerContext 上下文对象,另外一个是事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; String eventType = null; switch(event.state()){ case READER_IDLE: eventType = "读空闲"; case WRITER_IDLE: eventType = "写空闲"; case ALL_IDLE: eventType = "读写空闲"; } }else{ //继续将事件向下一个 handler 传递 ctx. } } }
WebSocket 是一种规范,是 HTML5 规范的一部分,主要是解决 Http 协议自己存在的问题。能够实现浏览器和服务端的长链接,链接头信息只在创建链接时发送一次。是在 Http 协议之上构建的,好比请求链接实际上是一个 Http 请求,只不过里面加了一些 WebSocket 信息。也能够用在非浏览器场合,好比 app 上。
Http 是一种无状态的基于请求和响应的协议,意思是必定是客户端想服务端发送一个请求,服务端给客户端一个响应。Http 1.0 在服务端给客户端响应后链接就断了。Http 1.1 增长可 keep-alive,服务端能够和客户端在短期以内保持一个链接,某个事件以内服务端和客户端能够复用这个连接。在这种状况下,网页聊天就是实现不了的,服务端的数据推送是没法实现的。
之前有一些假的长链接技术,好比轮询,缺点和明显,这里就不细说了。
Http 2.0 实现了长链接,可是这不在咱们讨论范围以内。
针对服务端,Tomcat 新版本,Spring,和 Netty 都实现了对 Websocket 的支持。
使用 Netty 对 WebSocket 的支持来实现长链接
其余的部分仍是同样的,先来看服务端的 WebSocketChannelInitializer。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{ //须要支持 websocket,咱们在 initChannel 是作一点改动 @Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); //由于 websocket 是基于 http 的,因此要加入 http 相应的编解码器 pipeline.addLast(new HttpServerCodec()); //以块的方式进行写的处理器 pipeline.addLast(new ChunkedWriteHandler()); // 进行 http 聚合的处理器,将 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 // FullHttpResponse //HttpObjectAggregator 在基于 netty 的 http 编程使用的很是多,粘包拆包。 pipeline.addLast(new HttpObjectAggregator(8192)); // 针对 websocket 的类,完成 websocket 构建的全部繁重工做,负责握手,以及心跳(close,ping, // pong)的处理, websocket 经过 frame 帧来传递数据。 // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame, // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。 // /ws 是 context_path,websocket 协议标准,ws://server:port/context_path pipeline.addLast(new WebSocketServerProcotolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler()); } }
// websocket 协议须要用帧来传递参数 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{ System.out.println("收到消息:"+ msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回")); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerAdded" + ctx.channel().id.asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerRemoved" + ctx.channel().id.asLongText()); } }
客户端咱们直接经过浏览器的原声 JS 来写
<script type="text/javascript"> var socket; if(window.WebSocket){ socket = new WebSocket("ws://localhost:8899/ws"); socket.onmessage = function(event){ alert(event.data); } socket.onopen = function(event){ alert("链接开启"); } socket.onclose = function(event){ alert("链接关闭"); } }else{ alert("浏览器不支持 WebSocket"); } function send(message){ if(!window.WebSocket){ return; } if(socket.readyState == WebSocket.OPEN){ socket.send(message); } } </script>
咱们在浏览器中经过 F12 看看 Http 协议升级为 WebSocket 协议的过程。