超详细Netty入门,看这篇就够了!

思惟导图

前言

本文主要讲述Netty框架的一些特性以及重要组件,但愿看完以后能对Netty框架有一个比较直观的感觉,但愿能帮助读者快速入门Netty,减小一些弯路。html

1、Netty概述

官方的介绍:java

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.linux

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端程序员

2、为何使用Netty

从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为何不用NIO呢?web

2.1 NIO的缺点

对于这个问题,以前我写了一篇文章《NIO入门》对NIO有比较详细的介绍,NIO的主要问题是:算法

  • NIO的类库和API繁杂,学习成本高,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 须要熟悉Java多线程编程。这是由于NIO编程涉及到Reactor模式,你必须对多线程和网络编程很是熟悉,才能写出高质量的NIO程序。
  • 臭名昭著的epoll bug。它会致使Selector空轮询,最终致使CPU 100%。直到JDK1.7版本依然没获得根本性的解决。

2.2 Netty的优势

相对地,Netty的优势有不少:编程

  • API使用简单,学习成本低。
  • 功能强大,内置了多种解码编码器,支持多种协议。
  • 性能高,对比其余主流的NIO框架,Netty的性能最优。
  • 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
  • Dubbo、Elasticsearch都采用了Netty,质量获得验证。

3、架构图

上面这张图就是在官网首页的架构图,咱们从上到下分析一下。bootstrap

绿色的部分Core核心模块,包括零拷贝、API库、可扩展的事件模型。api

橙色部分Protocol Support协议支持,包括Http协议、webSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。安全

红色的部分Transport Services传输服务,包括Socket、Datagram、Http Tunnel等等。

以上可看出Netty的功能、协议、传输方式都比较全,比较强大。

4、永远的Hello Word

首先搭建一个HelloWord工程,先熟悉一下API,还有为后面的学习作铺垫。如下面这张图为依据:

4.1 引入Maven依赖

使用的版本是4.1.20,相对比较稳定的一个版本。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>
复制代码

4.2 建立服务端启动类

public class MyServer {
    public static void main(String[] args) throws Exception {
        //建立两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //建立服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型 
                .channel(NioServerSocketChannel.class)
                //设置线程队列获得链接个数 
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置保持活动链接状态 
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //使用匿名内部类的形式初始化通道对象 
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器
            System.out.println("java技术爱好者的服务端已经准备就绪...");
            //绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
复制代码

4.3 建立服务端处理器

/** * 自定义的Handler须要继承Netty规定好的HandlerAdapter * 才能被Netty框架所关联,有点相似SpringMVC的适配器模式 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个问号?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常,关闭通道
        ctx.close();
    }
}
复制代码

4.4 建立客户端启动类

public class MyClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //建立bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                //设置客户端的通道实现类型 
                .channel(NioSocketChannel.class)
                //使用匿名内部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时能够起飞~");
            //链接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}
复制代码

4.5 建立客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}
复制代码

4.6 测试

先启动服务端,再启动客户端,就能够看到结果:

MyServer打印结果:

MyClient打印结果:

5、Netty的特性与重要组件

5.1 taskQueue任务队列

若是Handler处理器有一些长时间的业务处理,能够交给taskQueue异步处理。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取到线程池eventLoop,添加线程,执行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //长时间操做,不至于长时间的业务操做致使Handler阻塞
                    Thread.sleep(1000);
                    System.out.println("长时间的业务处理");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
复制代码

咱们打一个debug调试,是能够看到添加进去的taskQueue有一个任务。

5.2 scheduleTaskQueue延时任务队列

延时任务队列和上面介绍的任务队列很是类似,只是多了一个可延迟必定时间再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            //长时间操做,不至于长时间的业务操做致使Handler阻塞
            Thread.sleep(1000);
            System.out.println("长时间的业务处理");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);//5秒后执行
复制代码

依然打开debug进行调试查看,咱们能够有一个scheduleTaskQueue任务待执行中

5.3 Future异步机制

在搭建HelloWord工程的时候,咱们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
复制代码

不少操做都返回这个ChannelFuture对象,究竟这个ChannelFuture对象是用来作什么的呢?

ChannelFuture提供操做完成时一种异步通知的方式。通常在Socket编程中,等待响应结果都是同步阻塞的,而Netty则不会形成阻塞,由于ChannelFuture是采起相似观察者模式的形式进行获取结果。请看一段代码演示:

//添加监听器
channelFuture.addListener(new ChannelFutureListener() {
    //使用匿名内部类,ChannelFutureListener接口
    //重写operationComplete方法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        //判断是否操做成功 
        if (future.isSuccess()) {
            System.out.println("链接成功");
        } else {
            System.out.println("链接失败");
        }
    }
});
复制代码

5.4 Bootstrap与ServerBootStrap

Bootstrap和ServerBootStrap是Netty提供的一个建立客户端和服务端启动器的工厂类,使用这个工厂类很是便利地建立启动类,根据上面的一些例子,其实也看得出来能大大地减小了开发的难度。首先看一个类图:

能够看出都是继承于AbstractBootStrap抽象类,因此大体上的配置方法都相同。

通常来讲,使用Bootstrap建立启动器的步骤可分为如下几步:

5.4.1 group()

在上一篇文章《Reactor模式》中,咱们就讲过服务端要使用两个线程组:

  • bossGroup 用于监听客户端链接,专门负责与客户端建立链接,并把链接注册到workerGroup的Selector中。
  • workerGroup用于处理每个链接发生的读写事件。

通常建立线程组直接使用如下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码

有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:

//使用一个常量保存
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        //NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
	
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //若是不传入,则使用常量的值,也就是cpu核数的两倍
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
复制代码

经过源码能够看到,默认的线程数是cpu核数的两倍。假设想自定义线程数,可使用有参构造器:

//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
复制代码

5.4.2 channel()

这个方法用于设置通道类型,当创建链接后,会根据这个设置建立对应的Channel实例。

使用debug模式能够看到

通道类型有如下:

NioSocketChannel: 异步非阻塞的客户端 TCP Socket 链接。

NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 链接。

经常使用的就是这两个通道类型,由于是异步非阻塞的。因此是首选。

OioSocketChannel: 同步阻塞的客户端 TCP Socket 链接。

OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 链接。

稍微在本地调试过,用起来和Nio有一些不一样,是阻塞的,因此API调用也不同。由于是阻塞的IO,几乎没什么人会选择使用Oio,因此也很难找到例子。我稍微琢磨了一下,通过几回报错以后,总算调通了。代码以下:

//server端代码,跟上面几乎同样,只需改三个地方
//这个地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup
        .channel(OioServerSocketChannel.class)//设置服务端通道实现类型

//client端代码,只需改两个地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
        .channel(OioSocketChannel.class)//设置客户端的通道实现类型
复制代码

NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协议)链接。

NioSctpServerChannel: 异步的 Sctp 服务器端链接。

本地没启动成功,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不支持这个平台。由于我电脑是window系统,因此网友说的有点道理。

5.4.3 option()与childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接收进来的链接,也就是boosGroup线程。

childOption()是提供给父管道接收到的链接,也就是workerGroup线程。

搞清楚了以后,咱们看一下经常使用的一些设置有哪些:

SocketChannel参数,也就是childOption()经常使用的参数:

SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。 TCP_NODELAY TCP参数,当即发送数据,默认值为Ture。 SO_KEEPALIVE Socket参数,链接保活,默认值为False。启用该功能时,TCP会主动探测空闲链接的有效性。

ServerSocketChannel参数,也就是option()经常使用参数:

SO_BACKLOG Socket参数,服务端接受链接的队列长度,若是队列已满,客户端链接将被拒绝。默认值,Windows为200,其余为128。

因为篇幅限制,其余就不列举了,你们能够去网上找资料看看,了解一下。

5.4.4 设置流水线(重点)

ChannelPipeline是Netty处理请求的责任链,ChannelHandler则是具体处理请求的处理器。实际上每个channel都有一个处理器的流水线。

在Bootstrap中childHandler()方法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的方法,装配流水线就是在这个地方进行。代码演示以下:

//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
	@Override
	protected void initChannel(SocketChannel socketChannel) throws Exception {
		//给pipeline管道设置自定义的处理器
		socketChannel.pipeline().addLast(new MyServerHandler());
	}
});
复制代码

处理器Handler主要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层java NIO Channel到Netty的Channel。

出站指的是经过Netty的Channel来操做底层的java NIO Channel。

ChannelInboundHandlerAdapter处理器经常使用的事件有

  1. 注册事件 fireChannelRegistered。

  2. 链接创建事件 fireChannelActive。

  3. 读事件和读完成事件 fireChannelRead、fireChannelReadComplete。

  4. 异常通知事件 fireExceptionCaught。

  5. 用户自定义事件 fireUserEventTriggered。

  6. Channel 可写状态变化事件 fireChannelWritabilityChanged。

  7. 链接关闭事件 fireChannelInactive。

ChannelOutboundHandler处理器经常使用的事件有

  1. 端口绑定 bind。

  2. 链接服务端 connect。

  3. 写事件 write。

  4. 刷新时间 flush。

  5. 读事件 read。

  6. 主动断开链接 disconnect。

  7. 关闭 channel 事件 close。

还有一个相似的handler(),主要用于装配parent通道,也就是bossGroup线程。通常状况下,都用不上这个方法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。若是加上sync()方法则是同步。

有五个同名的重载方法,做用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地关闭EventLoopGroup

//释放掉全部的资源,包括建立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
复制代码

会关闭全部的child Channel。关闭以后,释放掉底层的资源。

5.5 Channel

Channel是什么?不妨看一下官方文档的说明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译大意:一种链接到网络套接字或能进行读、写、链接和绑定等I/O操做的组件。

若是上面这段说明比较抽象,下面还有一段说明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?), the configuration parameters of the channel (e.g. receive buffer size), the I/O operations that the channel supports (e.g. read, write, connect, and bind), and the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译大意:

channel为用户提供:

  1. 通道当前的状态(例如它是打开?仍是已链接?)

  2. channel的配置参数(例如接收缓冲区的大小)

  3. channel支持的IO操做(例如读、写、链接和绑定),以及处理与channel相关联的全部IO事件和请求的ChannelPipeline。

5.5.1 获取channel的状态

boolean isOpen(); //若是通道打开,则返回true
boolean isRegistered();//若是通道注册到EventLoop,则返回true
boolean isActive();//若是通道处于活动状态而且已链接,则返回true
boolean isWritable();//当且仅当I/O线程将当即执行请求的写入操做时,返回true。
复制代码

以上就是获取channel的四种状态的方法。

5.5.2 获取channel的配置参数

获取单条配置信息,使用getOption(),代码演示:

ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//由于我启动器配置的是128,因此我这里获取的soBackLogConfig=128
复制代码

获取多条配置信息,使用getOptions(),代码演示:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
	System.out.println(entry.getKey() + " : " + entry.getValue());
}
/** SO_REUSEADDR : false WRITE_BUFFER_LOW_WATER_MARK : 32768 WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536) SO_BACKLOG : 128 如下省略... */
复制代码

5.5.3 channel支持的IO操做

写操做,这里演示从服务端写消息发送到客户端:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}
复制代码

客户端控制台:

//收到服务端/127.0.0.1:6666的消息:这波啊,这波是肉蛋葱鸡~
复制代码

链接操做,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//通常使用启动器,这种方式不经常使用
复制代码

经过channel获取ChannelPipeline,并作相关的处理:

//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler处理器,装配流水线
pipeline.addLast(new MyServerHandler());
复制代码

5.6 Selector

在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在以前《NIO入门》中,我已经讲过Selector了。

Netty中的Selector也和NIO的Selector是同样的,就是用于监听事件,管理注册到Selector中的channel,实现多路复用器。

5.7 PiPeline与ChannelPipeline

在前面介绍Channel时,咱们知道能够在channel中装配ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,确定是有不少的,既然是不少channelHandler在一个流水线工做,确定是有顺序的。

因而pipeline就出现了,pipeline至关于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就能够实现按序执行channelHandler了。

在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被建立的时候建立。ChannelPipeline包含了一个ChannelHander造成的列表,且全部ChannelHandler都会注册到ChannelPipeline中。

5.8 ChannelHandlerContext

在Netty中,Handler处理器是有咱们定义的,上面讲过经过集成入站处理器或者出站处理器实现。这时若是咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。

因而Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操做。

经过类图,ChannelHandlerContext是一个接口,下面有三个实现类。

实际上ChannelHandlerContext在pipeline中是一个链表的形式。看一段源码就明白了:

//ChannelPipeline实现类DefaultChannelPipeline的构造器方法
protected DefaultChannelPipeline(Channel channel) {
	this.channel = ObjectUtil.checkNotNull(channel, "channel");
	succeededFuture = new SucceededChannelFuture(channel, null);
	voidPromise =  new VoidChannelPromise(channel, true);
	//设置头结点head,尾结点tail
	tail = new TailContext(this);
	head = new HeadContext(this);
	
	head.next = tail;
	tail.prev = head;
}
复制代码

下面我用一张图来表示,会更加清晰一点:

5.9 EventLoopGroup

咱们先看一下EventLoopGroup的类图:

其中包括了经常使用的实现类NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用过。

从Netty的架构图中,能够知道服务器是须要两个线程组进行配合工做的,而这个线程组的接口就是EventLoopGroup。

每一个EventLoopGroup里包括一个或多个EventLoop,每一个EventLoop中维护一个Selector实例。

5.9.1 轮询机制的实现原理

咱们不妨看一段DefaultEventExecutorChooserFactory的源码:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {
    //idx.getAndIncrement()至关于idx++,而后对任务长度取模
    return executors[idx.getAndIncrement() & executors.length - 1];
}
复制代码

这段代码能够肯定执行的方式是轮询机制,接下来debug调试一下:

它这里还有一个判断,若是线程数不是2的N次方,则采用取模算法实现。

@Override
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
复制代码

写在最后

参考Netty官网文档:API文档

创做不易,以为有用就点个赞吧。

我不要下次必定,但愿此次必定素质三连,感谢!

想第一时间看到我更新的文章,能够微信搜索公众号「java技术爱好者」,拒绝作一条咸鱼,我是一个努力让你们记住的程序员。咱们下期再见!!!

在这里插入图片描述

能力有限,若是有什么错误或者不当之处,请你们批评指正,一块儿学习交流!

相关文章
相关标签/搜索