《一块儿学netty》

o文章摘自 netty 官网(netty.io)
 
netty 是一个异步的,事件驱动的网络应用通讯框架,可让咱们快速编写可靠,高性能,高可扩展的服务端和客户端
 
  • 样例一:discard server(丢弃任何消息的服务端)
package io.netty.example.discard;
 
import io.netty.buffer.ByteBuf;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
1.DiscardServerHandler扩展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一
个实现。 ChannelInboundHandler提供了能够覆盖的各类事件处理程序方法。 目前,只需扩展
ChannelInboundHandlerAdapter而不是本身实现处理程序接口。
 
2.咱们在这里覆盖channelRead()事件处理程序方法。 每当从客户端接收到新数据时,都会使用收到的
消息调用此方法。 在此示例中,接收消息的类型是ByteBuf。
 
3.要实现DISCARD协议,处理程序必须忽略收到的消息。 ByteBuf是一个引用计数对象( ReferenceCounted ),
必须经过 release()方法显式释放。 请记住,处理程序有责任释放传递给处理程序的任何引用计数对象。 一般,
channelRead()处理程序方法的实现方式以下:
 
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
 
4.io,或者hadler在处理数据时,可能会致使netty抛出异常,这时会调用exceptionCaught()方法,在大
多数状况下,应该记录捕获的异常并在此处关闭其关联的通道,固然,根据具体状况,也可添加其余逻辑,
好比发送带有错误代码的响应消息。
 
  • 至此咱们已经实现了一半discardserver,接下来编写main方法来启动server
package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
 
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
* Discards any incoming data.
*/
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
 
        new DiscardServer(port).run();
    }
}
 
1.NioEventLoopGroup是一个处理I / O操做的多线程事件循环。 Netty为不一样类型的传输提供各类
EventLoopGroup实现。 咱们在此示例中实现了服务器端应用程序,所以将使用两个NioEventLoopGroup。 
第一个,一般称为“boss”,接受传入链接。 第二个,一般称为“worker”,一旦boss接受链接并将接受
的链接注册到worker,worker就处理被接受链接的io。 NioEventLoopGroup使用多少个线程以及它们如何
映射到建立的Channels取决于EventLoopGroup实现,也能够经过构造函数进行配置。
 
2.ServerBootstrap是一个设置服务器的辅助类。 
 
3.在这里,咱们指定使用NioServerSocketChannel类,该类用于实例化新Channel以接受传入链接。
 
4.此处指定的处理程序将始终由新接受的Channel评估(不太懂)。 ChannelInitializer是一个特殊的处理程
序,旨在帮助用户配置新的Channel。 能够将不一样的handler 添加到pipeline 中来完成对消息的复杂处理。
 
5.能够经过option()方法来给channel 来指定参数
 
6.你注意到option()和childOption()吗? option()用于接受传入链接的NioServerSocketChannel。
 childOption()用于父ServerChannel接受的Channels,在这种状况下是NioServerSocketChannel。
 
7.咱们如今准备好了。 剩下的就是绑定到端口并启动服务器。 在这里,咱们绑定到机器中全部NIC(网络
接口卡)的端口8080。 您如今能够根据须要屡次调用bind()方法(使用不一样的绑定地址。)
 
  • 以前是把收到的消息直接丢弃,如今咱们把收到的消息写出去
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
 
1.ChannelHandlerContext对象提供各类操做,使您可以触发各类I / O事件和操做。 在这里,咱们调用
write(Object)来逐个写出接收到的消息。 请注意,咱们没有release收到的消息,这与咱们在DISCARD
示例中的操做不一样。 这是由于Netty在写入线路时会为您release
 
2. ctx.write(Object)不会将消息写入线路。 而是存在内部缓存,而后经过 ctx.flush()刷新到线路。 或
者能够将上述两步合二为一, 调用ctx.writeAndFlush(msg)便可
 
  • 接下来写一个time server
 
它与前面的示例的不一样之处在于,它发送包含32位整数的消息,而不接收任何请求,并在 发送消息后关闭链接
 在此示例中,您将学习如何构造和发送消息,以及在完成时关闭链接。
由于咱们将忽略任何接收的数据,可是一旦创建链接就发送消息,此次咱们不能使用channelRead()方法。 
相反,咱们应该覆盖channelActive()方法。 如下是实现:
 
package io.netty.example.time;
 
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
 
1.如上所述,当创建链接并准备进行io时,将调用channelActive()方法。 让咱们写一个32位整数来表
示这个方法中的当前时间。
2.要发送新消息,咱们须要分配一个包含消息的新缓冲区。 咱们要写一个32位整数,所以咱们须要一个容
量至少为4个字节的ByteBuf。 经过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分
配一个新的缓冲区。
3.flip在哪里?在NIO中发送消息以前,咱们不习惯调用java.nio.ByteBuffer.flip()吗? 
下面时java.nio.ByteBuffer.flip方法的英文注释:
Flips this buffer. The limit is set to the current position and then the position is set 
to zero. If the mark is defined then it is discarded.
个人理解是把limit 设置为当前下标位置,而后下标归零,若是定义了mark,则丢弃
 
netty ByteBuf没有这样的方法,由于它有两个指针;一个用于读操做,另外一个用于写操做。当您在读取器索
引未更改时向ByteBuf写入内容时,写入器索引会增长。 reader索引和writer索引分别表示消息的开始和结
束位置。
 
另外一点须要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一个
ChannelFuture。 ChannelFuture表示尚 未发生的I / O操做。 这意味着,任何请求的操做可能还没有执行,
由于全部操做在Netty中都是异步的。 例如,如下代码可能会在发送消息以前关闭链接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
所以,您须要在ChannelFuture完成以后调用close()方法,并在写入操做完成时通知其侦听器。 请注意,
close()也可能不会当即关闭链接,由于close 也返回ChannelFuture。
 
4.咱们怎么知道写请求何时完成,让后来关闭链接呢,能够添加一个ChannelFutureListener,让其监听
channelfuture,当channelfuture完成任务时关闭链接,上面的方法中咱们用了一个匿名ChannelFutureListener
更简单的替代方法是
f.addListener(ChannelFutureListener.CLOSE);
 
  • 接下来写一个time client 来接受server发回 的消息
server 和 client没多大不一样,只是使用了不一样的 channel 和 bootstrap
package io.netty.example.time;
 
public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
 
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
1.Bootstrap与ServerBootstrap相似,不一样之处在于它 适用于非服务器通道,例如客户端或无链接通道。
2.只指定一个EventLoopGroup,它将同时用做boss组和worker组。
3. NioSocketChannel用于建立客户端通道,而不是NioServerSocketChannel。
4.请注意,咱们不像在ServerBootstrap中那样使用childOption(),由于客户端S ocketChannel没有
父服务器
5.咱们应该调用connect()方法而不是bind()方法。
 
  • ChannelHandler实现怎么样? 它应该从服务器接收一个32位整数,将其转换为人类可读的格式,
打印翻译的时间,并关闭链接:
package io.netty.example.time;
 
import java.util.Date;
 
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
 
1.在TCP / IP中,Netty将从对等方发送的数据读入ByteBuf。
 
  • 上面的handler 有时候回发神经,抛出IndexOutOfBoundsException,缘由以下:
 
基于流的传输(例如TCP / IP)中,接收的 数据存储在套接字接收缓冲区中。可是套接字缓冲区中存储的
不是包队列,而是字节队列。 这意味着,即便您将两条消息做为两个独立的数据包发送,操做系统也不会将
它们视为两条消息,而只是一堆字节。 所以,没法保证您所阅读的内容正是您的远程peer所写的内容。 例如,
假设操做系统的TCP / IP堆栈已收到三个数据包:
 
abc  def   ghi
 
因为基于流的协议的这种通常属性,应用程序颇有可能如下面的碎片形式读取它们
 
ab cdef g hi
 
那么应该怎么解决这个问题呢?还记得以前在ChannelInitializer 中添加多个hadler吗?能够在pipeline中添加
一个hadler 来专门解决碎片化问题
package io.netty.example.time;
 
public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes( 4)); // (4)
    }
}
1.ByteToMessageDecoder是ChannelInboundHandler的一个实现,能够很容易地处理碎片问题。
2.每当收到新数据时,ByteToMessageDecoder都会使用内部维护的 累积缓冲区调用decode()方法。
3.若是累积缓冲区中没有足够数据,decode()不向out中添加内容。 当收到更多数据时,
ByteToMessageDecoder将再次调用decode()。
4.若是decode()将对象添加到out,则意味着解码器成功解码了一条消息。 ByteToMessageDecoder
丢弃累积缓冲区的 读取 部分。ByteToMessageDecoder将继续调用decode()方法,直到它不添加任
何内容。
 
下面来看看改版的ChannelInitializer
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});
相关文章
相关标签/搜索