精通并发与 Netty (一)如何使用

精通并发与 Netty

Netty 是一个异步的,事件驱动的网络通讯框架,用于高性能的基于协议的客户端和服务端的开发。javascript

异步指的是会当即返回,并不知道到底发送过去没有,成功没有,通常都会使用监听器来监听返回。java

事件驱动是指开发者只须要关注事件对应的回调方法便可,好比 channel active,inactive,read 等等。web

网络通讯框架就不用解释了,不少你很是熟悉的组件都使用了 netty,好比 spark,dubbo 等等。编程

初步了解 Netty

第一个简单的例子,使用 Netty 实现一个 http 服务器,客户端调用一个没有参数的方法,服务端返回一个 hello world。bootstrap

Netty 里面大量的代码都是对线程的处理和 IO 的异步的操做。浏览器

package com.paul;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Server {

    public static void main(String[] args) throws InterruptedException {
        //定义两个线程组,事件循环组,能够类比与 Tomcat 就是死循环,不断接收客户端的链接
        // boss 线程组不断从客户端接受链接,但不处理,由 worker 线程组对链接进行真正的处理
        // 一个线程组其实也能完成,推荐使用两个
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务端启动器,能够轻松的启动服务端的 channel
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //group 方法有两个,一个接收一个参数,另外一个接收两个参数
            // childhandler 是咱们本身写的请求处理器
            serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
                    .childHandler(new ServerInitializer());
            //绑定端口
            ChannelFuture future = serverBootstrap.bind(8011).sync();
            //channel 关闭的监听
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
package com.paul;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能够有不少 handler,一层层过滤的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的组合,编码和解码的 h      handler
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        pipeline.addLast("handler", new ServerHandler());
    }
}
package com.paul;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        if(httpObject instanceof HttpRequest) {
            ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            //单纯的调用 write 只会放到缓存区,不会真的发送
            channelHandlerContext.writeAndFlush(response);
        }
    }
}

咱们在 SimpleChannelInboundHandler 里分析一下,先看它继承的 ChannelInboundHandlerAdapter 里面的事件回调方法,包括通道注册,解除注册,Active,InActive等等。缓存

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelRegistered();
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelUnregistered();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelActive();
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelInactive();
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  ctx.fireChannelRead(msg);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelReadComplete();
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  ctx.fireUserEventTriggered(evt);
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelWritabilityChanged();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  ctx.fireExceptionCaught(cause);
}

执行顺序为 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。服务器

Netty 自己并非遵循 servlet 规范的。Http 是基于请求和响应的无状态协议。Http 1.1 是有 keep-alived 参数的,若是3秒没有返回,则服务端主动关闭了解,Http 1.0 则是请求完成直接返回。websocket

Netty 的链接会被一直保持,咱们须要本身去处理这个功能。网络

在服务端发送完毕数据后,能够在服务端关闭 Channel。

ctx.channel.close();

Netty 能作什么

  1. 能够看成一个 http 服务器,可是他并无实现 servelt 规范。虽然 Tomcat 底层自己也使用 NIO,可是 Netty 自己的特色决定了它比 Tomcat 的吞吐量更高。相比于 SpringMVC 等框架,Netty 没提供路由等功能,这也契合和 Netty 的设计思路,它更贴近底层。
  2. Socket 开发,也是应用最为普遍的领域,底层传输的最基础框架,RPC 框架底层多数采用 Netty。直接采用 Http 固然也能够,可是效率就低了不少了。
  3. 支持长链接的开发,消息推送,聊天,服务端向客户端推送等等都会采用 WebSocket 协议,就是长链接。

Netty 对 Socket 的实现

对于 Http 编程来讲,咱们实现了服务端就能够了,客户端彻底可使用浏览器或者 CURL 工具来充当。可是对于 Socket 编程来讲,客户端也得咱们本身实现。

服务器端:

Server 类于上面 Http 服务器那个同样,在 ServerInitoalizer 有一些变化

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能够有不少 handler,一层层过滤的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串编码,解码
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ServerHandler());

    }
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端:

public class Client {

    public static void main(String[] args) throws InterruptedException {
        //客户端不须要两个 group,只须要一个就够了,直接链接服务端发送数据就能够了
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            //服务器端既可使用 handler 也可使用 childhandler, 客户端通常使用 handler
            //对于 服务端,handler 是针对 bossgroup的,childhandler 是针对 workergorup 的
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能够有不少 handler,一层层过滤的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串编码,解码
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ClientHandler());

    }
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        System.out.println("client output:"+ msg);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("23123");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty 长链接实现一个聊天室

Server 端:

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    //定义 channel group 来管理全部 channel
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {


    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "加入\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "离开\n");
        //这个 channel 会被自动从 channelGroup 里移除

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "上线");

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "离开");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端:

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
  channel.writeAndFlush(br.readLine() + "\r\n");
}

Netty 心跳

集群之间各个节点的通讯,主从节点之间须要进行数据同步,每当主节点的数据发生变化时,经过异步的方式将数据同步到从节点,同步方式能够用日志等等,所以主从节点之间不是实时一致性而是最终一致性。

节点与节点之间如何进行通讯那?这种主从模式是须要互相之间有长链接的,这样来肯定对方还活着,实现方式是互相之间定时发送心跳数据包。若是发送几回后对方仍是没有响应的话,就能够认为对方已经挂掉了。

回到客户端与服务端的模式,有人可能会想,客户端断开链接后服务端的 handlerRemoved 等方法不是能感知吗?还要心跳干什么哪?

真实状况其实很是复杂,好比手机客户端和服务端进行一个长链接,客户端没有退出应用,客户端开了飞行模型,或者强制关机,此时双方是感知不到链接已经断掉了,或者说须要很是长的时间才能感知到,这是咱们不想看到的,这时就须要心跳了。

来看一个示例:

其余的代码仍是和上面的同样,咱们就不列出来了,直接进入主题,看不一样的地方:

服务端

// Netty 为了支持心跳的 IdleStateHandler,空闲状态监测处理器。
     pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));

来看看 IdleStateHandler 的说明

/*
 * Triggers an IdleStateEvent when a Channel has not performed read, write, or both    
 * operation for a while
 * 当一个 channel 一断时间没有进行 read,write 就触发一个 IdleStateEvent
 */
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
  this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
  //三个参数分别为多长时间没进行读,写或者读写操做则触发 event。
}

触发 event 后咱们编写这个 event 对应的处理器。

public class MyHandler extends ChannelInboundHandlerAdapter{
  //触发某个事件后这个方法就会被调用
  //一个 channelhandlerContext 上下文对象,另外一个是事件
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        if(evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        String eventType = null;
        switch(event.state()){
          case READER_IDLE:
            eventType = "读空闲";
          case WRITER_IDLE:
            eventType = "写空闲";
          case ALL_IDLE:
            eventType = "读写空闲";
        }
      }else{
        //继续将事件向下一个 handler 传递
        ctx.
      }
  }
}

WebSocket 实现与原理分析

WebSocket 是一种规范,是 HTML5 规范的一部分,主要是解决 Http 协议自己存在的问题。能够实现浏览器和服务端的长链接,链接头信息只在创建链接时发送一次。是在 Http 协议之上构建的,好比请求链接实际上是一个 Http 请求,只不过里面加了一些 WebSocket 信息。也能够用在非浏览器场合,好比 app 上。

Http 是一种无状态的基于请求和响应的协议,意思是必定是客户端想服务端发送一个请求,服务端给客户端一个响应。Http 1.0 在服务端给客户端响应后链接就断了。Http 1.1 增长可 keep-alive,服务端能够和客户端在短期以内保持一个链接,某个事件以内服务端和客户端能够复用这个连接。在这种状况下,网页聊天就是实现不了的,服务端的数据推送是没法实现的。

之前有一些假的长链接技术,好比轮询,缺点和明显,这里就不细说了。

Http 2.0 实现了长链接,可是这不在咱们讨论范围以内。

针对服务端,Tomcat 新版本,Spring,和 Netty 都实现了对 Websocket 的支持。

使用 Netty 对 WebSocket 的支持来实现长链接

其余的部分仍是同样的,先来看服务端的 WebSocketChannelInitializer。

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
   //须要支持 websocket,咱们在 initChannel 是作一点改动
   @Override
   protected void initChannel(SocketChannel ch) throws Exception{
      ChannelPipeline pipeline = ch.pipeline();
      //由于 websocket 是基于 http 的,因此要加入 http 相应的编解码器
      pipeline.addLast(new HttpServerCodec());
      //以块的方式进行写的处理器
      pipeline.addLast(new ChunkedWriteHandler());
      // 进行 http 聚合的处理器,将 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 
      // FullHttpResponse
      //HttpObjectAggregator 在基于 netty 的 http 编程使用的很是多,粘包拆包。
      pipeline.addLast(new HttpObjectAggregator(8192));
      // 针对 websocket 的类,完成 websocket 构建的全部繁重工做,负责握手,以及心跳(close,ping, 
      // pong)的处理, websocket 经过 frame 帧来传递数据。
      // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame,
      // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。
      // /ws 是 context_path,websocket 协议标准,ws://server:port/context_path
      pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
      pipeline.addLast(new TextWebSocketFrameHandler());
   }
}
// websocket 协议须要用帧来传递参数
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
     System.out.println("收到消息:"+ msg.text());
     ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回"));
   }
   
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerAdded" + ctx.channel().id.asLongText());
   }
  
   @Override
   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
   }
  
}

客户端咱们直接经过浏览器的原声 JS 来写

<script type="text/javascript">
   var socket;
   if(window.WebSocket){
     socket = new WebSocket("ws://localhost:8899/ws");
     socket.onmessage = function(event){
       alert(event.data);
     }
     socket.onopen = function(event){
       alert("链接开启");
     }
     socket.onclose = function(event){
       alert("链接关闭");
     }
   }else{
     alert("浏览器不支持 WebSocket");
   }

   function send(message){
     if(!window.WebSocket){
       return;
     }
     if(socket.readyState == WebSocket.OPEN){
       socket.send(message);
     }
   }
</script>

咱们在浏览器中经过 F12 看看 Http 协议升级为 WebSocket 协议的过程。

相关文章
相关标签/搜索