netty用户指南

Netty用户指南

1、前言

1.问题

当今世界咱们须要使用通用的软件或库与其余组件进行通讯,例如使用HTTP客户端从服务器中获取信息,或经过网络服务调用一个远程的方法。然而通用的协议及其实现一般不具有较好的伸缩性。因此问题看起来是咱们怎么不使用通用的HTTP服务器去传输大文件、e-mail、实事数据、多媒体数据等。咱们须要的是针对特定问题而进行优化的协议实现。例如咱们可能须要从新实现一个HTTP服务器来与AJAX的客户端进行通讯。另一种状况是须要处理历史遗留的协议保证与旧的系统兼容。这些例子的关键在于怎样快速的实现协议而不损失目标系统的稳定性和性能。java

2.解决方案

Netty是一个异步事件驱动的网络应用框架,能够用来快速开发可维护的、高性能、可扩展的协议服务器和客户端。编程

换句话说,Netty是一个基于NIO的客户端和服务器框架,能够简单快速的开发网络应用程序,如协议的客户端和服务器。它极大的简化了TCP、UDP服务器之类的网络编程。bootstrap

2、开始

1.编写DiscardServer

最简单的协议并非“hello world”,而是丢弃。丢弃协议会丢弃任何接受到的数据不作任何的响应。服务器

要实现丢弃协议,须要作的就是丢弃任何接收到的数据。首先从handler的实现开始,handler会处理由Netty产生的I/O事件。网络

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler继承了ChannelInboundHandlerAdapter,而他又实现了ChannelInboundHandlerChannelInboundHandler提供了不一样的事件处理方法,你能够根据须要去覆写相应的方法。ChannelInboundHandlerAdapter提供了一些默认的实现,因此在这个例子中只须要去继承它就能够了。
  2. 覆写了channelRead方法,Netty从客户端收到数据时就会调用该方法。消息的类型是ByteBuf
  3. ByteBuf是一个引用计数对象,须要进行手动的释放。须要注意的是,handler须要释听任何传递给他的引用计数对象。一般状况下channelRead()方法一般的实现方式以下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 因为IO错误Netty抛出异常或handle处理事件抛出异常,都会使exceptionCaught()方法被调用。在大多数状况下,都须要对异常记日志,而且关闭相关连的channel

到目前为止实现了DISCARD服务的通常,接下来须要实现main()方法来启动服务。数据结构

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一个多线程的事件循环,用来处理I/O操做。Netty为不一样的通讯方式提供了多种EventLoopGroup实现。在本例中,咱们只须要实现服务器端的应用,因此须要两个NioEventLoopGroup 。第一个一般称为boss,用来接收客户端的连接请求。第二个称为worker,用来处理boss已接收链接的I/O请求和把接收的链接注册到worker
  2. ServerBootstrap是用来建立服务器的辅助类。
  3. 使用NioServerSocketChannel类来实例化channel,用来接收链接请求。
  4. 在这里设置的handler会被每个新channel调用,ChannelInitializer是一个特殊的handler用来配置一个新的channel。在本例中,咱们将DiscardServerHandler添加到新channel 的管道中。随着应用程序的复杂度增长,可能会向管道中加入更多的handler。
  5. 能够经过option()方法给channel设置一些参数。
  6. option()方法是用来设置NioServerSocketChannel参数的,而childOption()是给接收的链接设置参数的。
  7. 剩下的就是绑定端口而后启动服务了。

2. 测试DiscardServer是否成功

最简单的方法是使用telnet命令。例如输入telnet localhost 8080。DiscarServer丢弃了任何接受的数据,咱们能够把DiscardServer的接收的数据打印出来。多线程

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 循环能够等价于System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 等价于in.release()

3.写一个Echo Server

一个服务器一般须要对请求做出响应,而一个Echo服务仅仅须要作的是把请求的内容返回给客户端。app

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
  1. ChannelHandlerContext对象提供了各类出发IO时间的操做。经过调用write(Object)方法把数据发给客户端。在这里没有手动的释放msg,这是由于当把msg写入时Netty会自动的释放它。
  2. ctx.write(Object)并不会把数据写到外部,而是在内部的缓冲区中,经过调用ctx.flush()把数据刷出到外部。能够简洁的调用ctx.wirteAndFlush(msg)达到一样的效果。

4. 写一个Timer Server

TIME协议与前面的例子不一样之处在于,它发送一个32位的整数,不接收任何请求,而且只要消息发送了就马上关闭链接。框架

由于咱们不须要接收任何数据,并且在链接创建时就发送数据,因此不能使用channelRead()方法。须要覆写channelActive()方法异步

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 当一个链接创建时,activeChannel()方法会被调用,而后写一个32位的整数。

  2. 为了发送一个新的信息,须要分配一个缓冲区。经过调用ctx.alloc()获取ByteBufAllocator来分配缓冲区。

  3. 在Netty中的Buffer不须要像Java NIO同样调用flip(),这是由于Netty中的Buffer具备两个指针,分别用于读写操做。当进行写操做时写指针在移动而读指针不移动,读写指针分别表明数据的开始和结束。

    另外须要指出的是,ctx.write()返回一个ChannelFuture对象,该对象表明着一个还未发生的IO操做。这意味着,任何一个请求操做可能都未发生,这是由于在Netty中,全部操做都是异步的。例以下面的代码可能在发送信息前关闭链接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此要在ChannelFuture完成前调用close(),当操做完成时,ChannelFuture会通知他的监听器。close()可能也不会当即关闭链接。

  4. 本例中添加一个匿名内部类做为监听器,来关闭链接。也可使用预约义的监听器:

    f.addListener(ChannelFutureListener.CLOSE);

5.Time Client

不一样于DISCARD和ECHO,TIME协议须要一个客户端将32位的整数转为一个日期。Netty中的客户端和服务器最大的不一样在于使用了不一样的BootStrapChannel现实。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootStapServerBootStrap很类似,但它是用于客户端的。
  2. 只需指定一个EventLoopGroup,在客户端中不须要boss。
  3. 使用NioSocketChannel而不是NioServerSocketChannel
  4. 不须要childOption()
  5. 使用connect()方法而不是bind()

TimeClientHandler中,将整数翻译成日期格式的类型。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

6.处理基于流的传输问题。

TCP/IP协议接收数据并储存到Socket缓冲区中,可是缓冲区不是数据包的队列,而是字节的队列,这意味着你发送了两条消息,但操做系统会并不认为是两条消息而是一组字节。因此在读数据时并不能肯定读到了对方发过来的数据。

在TIME协议中,在调用m.readUnsignedInt()时缓冲区中须要有四个字节,若是缓冲区中还未接收到四个字节时就会抛出异常。

解决方法是,再加一个ChannelHandleChannelPipeline。该handler专门处理编码问题。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一个实现,专门用于编码问题。
  2. 当新的数据到达时,Netty会调用decode方法,而且其内部维护着一个累加Buffer。
  3. 当累加Buffer中没有足够的数据时,能够不在out中添加任何数据。当新数据到达后Netty又会调用decode方法。
  4. 若是decode()添加一个对象到out中,意味着编码信息成功了。Netty会丢弃Buffer中已读取的部分数据。

TimeDecoder添加到ChannelPipeline中:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

另一种更简单的方式是使用ReplayingDecoder

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

当调用in.readBytes(4)抛出异常时,ReplayingDecoder会捕捉异常并重复执行decode()

7.使用POJO代替ByteBuf

在以前的TIME服务中,都是直接使用ByteBuf做为协议的数据结构。在Handler中使用POJO对象,能够把从ByteBuf抽取POJO的代码分离开。

首先定义UnixTime类:

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeDecoder中解码产生UnixTime对象

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

TimeClientHandler中再也不须要使用ByteBuf了。

在服务器端,首先更改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

还须要建立一个编码器,将UnixTime转为ByteBuf以便网络传输

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
相关文章
相关标签/搜索