Netty整理

什么是阻塞/非阻塞,什么是同/异步html

简介:使用最通俗概念讲解 同步异步、堵塞和非堵塞
洗衣机洗衣服react

洗衣机洗衣服(不管阻塞式IO仍是非阻塞式IO,都是同步IO模型
同步阻塞:你把衣服丢到洗衣机洗,而后看着洗衣机洗完,洗好后再去晾衣服(你就干等,啥都不作,阻塞在那边)linux

同步非阻塞:你把衣服丢到洗衣机洗,而后会客厅作其余事情,定时去阳台看洗衣机是否是洗完了,洗好后再去晾衣服
(等待期间你能够作其余事情,玩游戏,看视频等等)。但即使如此,依然要时不时地去看一下洗衣机是否是把衣服洗完了,而后才能去晾衣服。nginx

        
异步阻塞: 你把衣服丢到洗衣机洗,而后看着洗衣机洗完,洗好后再去晾衣服(几乎没这个状况,几乎没这个说法,能够忽略,异步不可能阻塞
        
异步非阻塞:你把衣服丢到洗衣机洗,而后会客厅作其余事情,洗衣机洗好后会自动去晾衣服,晾完成后放个音乐告诉你洗好衣服并晾好了,这里你不须要晾衣服。git

同步有前后顺序,不管是否阻塞。异步没有前后顺序,各自完成各自的事情。github

linux网络编程中的IO模型web

网络IO,用户程序(进程,缓存)和内核的交互为基础算法

IO操做分两步:一、发起IO请求等待数据准备,二、实际IO操做编程

同步需要主动读写数据,在读写数据的过程当中仍是会阻塞(发起请求,等待内核准备数据,内核准备完毕,从内核中拿取数据到应用进程)bootstrap

异步仅仅需要I/O操做完毕的通知。并不主动读写数据,由操做系统内核完毕数据的读写(发起请求,内核准备数据,内核准备完毕,内核把数据放进应用进程,通知应用程序已经放好了)

五种IO的模型:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO和异步IO
前四种都是同步IO,在内核数据copy到用户空间时都是阻塞的

1)阻塞式I/O;全程阻塞,主线程不把数据拿到进程空间,不干别的

2)非阻塞式I/O;主线程在内核数据准备阶段能够干别的,但会时不时去看看数据有没有准备好,该阶段是非阻塞的;当内核把数据准备好后,会把数据复制到进程空间,该阶段是阻塞的。

3)I/O复用(select,poll,epoll...);
                I/O多路复用是阻塞在select,epoll这样的系统调用,没有阻塞在真正的I/O系统调用如recvfrom
                进程受阻于select,等待可能多个套接口中的任一个变为可读(这个指的是第一阶段,数据准备阶段)

                IO多路复用使用两个系统调用(select和recvfrom)
                blocking IO只调用了一个系统调用(recvfrom)
                select/epoll 核心是能够同时处理多个connection,而不是更快,因此链接数不高的话,性能不必定比多线程+阻塞IO好
                多路复用模型中,每个socket,设置为non-blocking,
                阻塞是被select这个函数block,而不是被socket阻塞的

这里跟阻塞和非阻塞IO到区别是:阻塞和非阻塞IO只会发起一次系统请求让内核准备数据,而IO多路复用会发起不少次系统请求让系统同时准备不少份数据,同时监听哪个请求到数据,内核已经准备好了,内核准备好后,select指令会发生阻塞。但不管哪一种IO到了将数据从内核复制到进程的过程当中,都是阻塞的,干不了别打事情,即使在多路复用IO中内核又准备好了一份数据,也没法同时复制数据。

 

4)信号驱动式I/O(SIGIO);主线程发送一个请求,直接返回。内核准备好数据后,会给主线程一个信号,主线程开始复制数据到进程空间。

5)异步I/O(POSIX的aio_系列函数) Future-Listener机制;主线程发送一个请求,内核从准备数据到复制数据到进程空间所有本身处理,主线程不参与,内核处理完通知主线程已经完成。

IO操做分为两步
            1)发起IO请求,等待数据准备(Waiting for the data to be ready)
            2)实际的IO操做,将数据从内核拷贝到进程中(Copying the data from the kernel to the process)


            前四种IO模型都是同步IO操做,区别在于第一阶段,而他们的第二阶段是同样的:在数据从内核复制到应用缓冲区期间(用户空间),进程阻塞于recvfrom调用或者select()函数。 相反,异步I/O模型在这两个阶段都要处理。


            阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,若是阻塞直到完成那么就是传统的阻塞IO,若是不阻塞,那么就是非阻塞IO。
            同步IO和异步IO的区别就在于第二个步骤是否阻塞,若是实际的IO读写阻塞请求进程,那么就是同步IO,所以阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,若是不阻塞,而是操做系统帮你作完IO操做再将结果返回给你,那么就是异步IO。

阻塞非阻塞说的是线程的状态
同步和异步说的是消息的通知机制

同步须要主动读写数据,异步是不须要主动读写数据
同步IO和异步IO是针对用户应用程序和内核的交互

IO多路复用技术select、poll、epoll

什么是IO多路复用:
            I/O多路复用,I/O是指网络I/O, 多路指多个TCP链接(即socket或者channel),复用指复用一个或几个线程。
            简单来讲:就是使用一个或者几个线程处理多个TCP链接
            最大优点是减小系统开销小,没必要建立过多的进程/线程,也没必要维护这些进程/线程.(若是线程过多,则线程的开销较大,线程切换消耗资源,浪费时间)

select:同步机制
            基本原理:
                监视文件3类描述符: writefds、readfds、和exceptfds(Linux会将全部的资源视为文件系统fd)
                调用后select函数会阻塞住,等有数据 可读、可写、出异常 或者 超时 就会返回
                select函数正常返回后,经过遍历fdset整个数组才能发现哪些句柄发生了事件,来找到就绪的描述符fd,而后进行对应的IO操做
                几乎在全部的平台上支持,跨平台支持性好

            缺点:
                1)select采用轮询的方式扫描文件描述符,所有扫描,随着文件描述符FD数量增多而性能降低
                2)每次调用 select(),须要把 fd 集合从用户态拷贝到内核态,并进行遍历(消息传递都是从内核到用户空间)
                2)最大的缺陷就是单个进程打开的FD有限制,默认是1024   (可修改宏定义,可是效率仍然慢)  
                   相似于Java的 static final  int MAX_FD = 1024

poll:同步机制
            基本流程:
                select() 和 poll() 系统调用的大致同样,处理多个描述符也是使用轮询的方式,根据描述符的状态进行处理
                同样须要把 fd 集合从用户态拷贝到内核态,并进行遍历。

                最大区别是: poll没有最大文件描述符限制(使用链表的方式存储fd)

epoll:异步机制,nginx底层实现机制
            基本原理:
                在2.6内核中提出的,对比select和poll,epoll更加灵活,没有描述符限制,用户态拷贝到内核态只须要一次
                使用事件通知,经过epoll_ctl注册fd,一旦该fd就绪,内核就会采用callback的回调机制来激活对应的fd

            优势:
                1)没fd这个限制,所支持的FD上限是操做系统的最大文件句柄数,1G内存大概支持10万个句柄
                2)效率提升,使用回调通知而不是轮询的方式,不会随着FD数目的增长效率降低

                3)经过callback机制通知,内核和用户空间mmap同一块内存实现

            Linux内核核心函数
                1)epoll_create()  在Linux内核里面申请一个文件系统 B+树,返回epoll对象,也是一个fd
                2)epoll_ctl()   操做epoll对象,在这个对象里面修改添加删除对应的连接fd, 绑定一个callback函数
                3)epoll_wait()  判断并完成对应的IO操做

            缺点:
                编程模型比select/poll 复杂


            例子:100万个链接,里面有1万个链接是活跃,在 select、poll、epoll分别是怎样的表现
                
                select:不修改宏定义,则须要 1000个进程(每一个进程是分别去轮询遍历)才能够支持 100万链接,单机没法承受
                poll:100万个链接,遍历都响应不过来了,还有空间的拷贝消耗大量的资源
                epoll:使用callback机制,不须要遍历,不须要拷贝,直接完成,单机能够直接支持,性能应该在用户的开发代码上。

但若是100万个链接有大部分是活跃的,好比90万个活跃的,poll和epoll的区别不大,单机都没法承受。

Java的I/O演进历史
    一、jdk1.4以前是采用同步阻塞模型,也就是BIO
        大型服务通常采用C或者C++, 由于能够直接操做系统提供的异步IO,AIO

    二、jdk1.4推出NIO,支持非阻塞IO,jdk1.7升级,推出NIO2.0,提供AIO的功能,支持文件和网络套接字的异步IO

 

在讲解Netty以前,须要先说明一下什么是Reactor设计模式

设计模式——Reactor模式(反应器设计模式),是一种基于事件驱动的设计模式,在事件驱动的应用中,将 一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步 地、有序地处理同时接收的多个服务请求 通常出如今高并发系统中,好比NettyRedis

优势 1)响应快,不会由于单个同步而阻塞,虽然Reactor自己依然是同步的; 2)编程相对简单,最大程度 的避免复杂的多线程及同步问题,而且避免了多线程/进程的切换开销; 3)可扩展性,能够方便的经过增长 Reactor实例个数来充分利用CPU资源;

缺点 1)相比传统的简单模型,Reactor增长了必定的复杂性,于是有必定的门槛,而且不易于调试。 2) Reactor模式须要系统底层的的支持,好比Java中的Selector支持,操做系统的select系统调用支持

通俗理解:KTV例子 前台接待,服务人员带领去开机器 Reactor模式基于事件驱动,适合处理海量的I/O事件,属于同步非阻塞IO(NIO)

Reactor单线程模型(比较少用)

内容: 1)做为NIO服务端,接收客户端的TCP链接;做为NIO客户端,向服务端发起TCP链接; 2) 服务端读请求数据并响应;客户端写请求并读取响应
使用场景
: 对应小业务则适合,编码简单;对于高负载、大并发的应用场景不适合,一个NIO线程处理 太多请求,则负载太高,而且可能响应变慢,致使大量请求超时,并且万一线程挂了,则不可用了 ,NIO的说明能够参考传统IO与NIO比较

Reactor多线程模型

内容:一个Acceptor线程;一组NIO线程,通常是实用自带的线程池,包含一个任务队列和多个可用的线程 去处理接入链接和处理IO
使用场景:知足大多数场景 ,可是当Acceptor须要作复杂操做的时候,好比认证等耗时操做的时候,在高并发状况下则也会有性能问题,大量的请求被堆积到Acceptor,而没法分发到Worker线程池。

Reactor主从线程模型
内容:
1) Acceptor不在是一个线程,而是一组NIO线程;IO线程也是一组NIO线程,这样就是两个线程池去处理接入链接和处理IO

使用场景:知足目前的大部分场景,也是Netty推荐使用的线程模型

BossGroup
WorkGroup

为何Netty使用NIO而不是AIO,是同步非阻塞仍是异步非阻塞?
            
            答案:
            在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,所以在性能上没有明显的优点
            Netty总体架构是reactor模型,采用epoll机制,因此往深的说,仍是IO多路复用模式,因此也可说netty是同步非阻塞模型(看的层次不同)
            

            不少人说这是netty是基于Java NIO 类库实现的异步通信框架
            特色:异步非阻塞、基于事件驱动,性能高,高可靠性和高可定制性。

            参考资料:
            https://github.com/netty/netty/issues/2515

Netty Hello World,Echo服务

什么是Echo服务:就是一个应答服务(回显服务器),客户端发送什么数据,服务端就响应的对应的数据
            是一个很是有的用于调试和检测的服务

首先要实用Netty,须要添加maven依赖

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.23.Final</version>
</dependency>

Netty服务端

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

这里有关优雅关闭线程池能够参考使用RunTime.getRunTime().addShutdownHook优雅关闭线程池

处理器

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

因为我这里实用的Springboot,在主函数中将其启动就能够了。

@SpringBootApplication
public class NettyechoApplication {

   public static void main(String[] args) throws InterruptedException {
      SpringApplication.run(NettyechoApplication.class, args);
      new EchoServer(10101).run();
   }

}

咱们能够用telnet链接,输入字符串,能够看到回显成功

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfa
sdfa     (此处为服务端回显字符串)

服务端日志

2019-09-20 00:18:48.800  INFO 634 --- [ntLoopGroup-3-2] c.g.nettyecho.netty.EchoServerHandler    : sdfa

2019-09-20 00:18:48.801  INFO 634 --- [ntLoopGroup-3-2] c.g.nettyecho.netty.EchoServerHandler    : channelReadComplete

可见服务端事件处理器响应了读取事件和读取完毕事件。

如今咱们来添加一个客户端的程序

Netty客户端

@AllArgsConstructor
public class EchoClient {
    private String host;
    private int port;

    public void run() throws InterruptedException {
        //客户端处理线程组(其实就是一个线程池)
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //客户端netty启动对象
            Bootstrap bootstrap = new Bootstrap();
            //将客户端线程组添加到启动对象中
            bootstrap.group(group)
                    //给启动对象添加Socket管道
                    .channel(NioSocketChannel.class)
                    //主动链接到远程服务器IP端口
                    .remoteAddress(new InetSocketAddress(host,port))
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //链接到服务端,connect是异步链接,在调用同步等待sync,等待链接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            group.shutdownGracefully();
        }
    }
}

客户端处理器

/**
 * 客户端事件处理器
 */
@Slf4j
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 激活事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("active");
        //Unpooled.copiedBuffer()是一个缓冲资源申请的方法,之后在Netty的ByteBuf会作说明
        ctx.writeAndFlush(Unpooled.copiedBuffer("帅呆了",CharsetUtil.UTF_8));
    }

    /**
     * 读取完成事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 异常处理事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 读取事件
     * @param channelHandlerContext
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        log.info("Client received:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

Springboot主程序

@SpringBootApplication
public class EchoclientApplication {

   public static void main(String[] args) throws InterruptedException {
      SpringApplication.run(EchoclientApplication.class, args);
      new EchoClient("127.0.0.1",10101).run();
   }

}

启动客户端,日志

2019-09-21 01:33:17.360  INFO 654 --- [ntLoopGroup-2-1] c.g.echoclient.netty.EchoClientHandler   : active
2019-09-21 01:33:17.386  INFO 654 --- [ntLoopGroup-2-1] c.g.echoclient.netty.EchoClientHandler   : Client received:帅呆了
2019-09-21 01:33:17.386  INFO 654 --- [ntLoopGroup-2-1] c.g.echoclient.netty.EchoClientHandler   : channelReadComplete

此时服务端日志

2019-09-21 01:32:25.525  INFO 651 --- [           main] com.guanjian.nettyecho.netty.EchoServer  : 服务器启动中
2019-09-21 01:33:17.384  INFO 651 --- [ntLoopGroup-3-1] c.g.nettyecho.netty.EchoServerHandler    : 帅呆了
2019-09-21 01:33:17.386  INFO 651 --- [ntLoopGroup-3-1] c.g.nettyecho.netty.EchoServerHandler    : channelReadComplete

咱们能够看到客户端激活之后发送了一个"帅呆了"给服务端,服务端收到之后回写给客户端一个相同的"帅呆了",客户端收到之后打印"Client received:帅呆了",最后两边同时完成读取事件,打印"channelReadComplete"。

在客户端的事件处理器中,咱们能够看到,咱们继承的类跟服务端事件处理器不同,其实客户端继承的这个类SimpleChannelInboundHandler只是ChannelInboundHandlerAdapter的子类,并且SimpleChannelInboundHandler是一个抽象类,而它的父类ChannelInboundHandlerAdapter却并不是抽象类。

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter

咱们单看它的channelRead0以及channelRead方法,channelRead0是一个抽象方法,必须由咱们本身去实现。而channelRead它是把msg的消息对象转成了泛型I,放到抽象方法channelRead0中去处理,实则起做用的仍是channelRead方法,这里只是用了一个模版方法的设计模式罢了。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

EventLoop和EventLoopGroup线程模型

1)高性能RPC框架的3个要素:IO模型(前面说的同步,异步,阻塞,非阻塞,多路复用)

                                          数据协议(http/protobuf/Thrift)

                                          线程模型(BIO一个线程服务于一个Socket,NIO一个线程服务于一组Socket/Channel)

2)EventLoop比如一个线程,1个EventLoop能够服务多个Channel,1个Channel只有一个EventLoop
     能够建立多个 EventLoop 来优化资源利用,也就是EventLoopGroup

3)EventLoopGroup 负责分配 EventLoop 到新建立的 Channel,里面包含多个EventLoop

EventLoopGroup -> 多个 EventLoop
EventLoop -> 维护一个 Selector

这里EventLoopGroup,EventLoop都是接口,NioEventLoopGroup,NioEventLoop是接口的实现类,意思是用NIO的方式来实现的。

这里EventLoopGroup既然是线程池,那它究竟建立了多少个线程数呢,其答案为Runtime.getRuntime().availableProcessors() * 2,这是源码里面有写的。如下为代码片断

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup() {
        this(0);
    }
/**
 * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
 * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

到这里须要注意的是,咱们这里赋给的线程数为0,Executor为JDK的线程池接口,这里赋为null.

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

在MultithreadEventLoopGroup类(NioEventLoopGroup的父类)中,咱们能够看到若是初始化线程数为0,获取DEFAULT_EVENT_LOOP_THREADS

/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

在NettyRuntime类中

/**
 * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}. This
 * can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
 * {@link #setAvailableProcessors(int)} before any calls to this method.
 *
 * @return the configured number of available processors
 */
public static int availableProcessors() {
    return holder.availableProcessors();
}
/**
 * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}.
 * This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
 * {@link #setAvailableProcessors(int)} before any calls to this method.
 *
 * @return the configured number of available processors
 */
@SuppressForbidden(reason = "to obtain default number of available processors")
synchronized int availableProcessors() {
    if (this.availableProcessors == 0) {
        final int availableProcessors =
                SystemPropertyUtil.getInt(
                        "io.netty.availableProcessors",
                        Runtime.getRuntime().availableProcessors());
        setAvailableProcessors(availableProcessors);
    }
    return this.availableProcessors;
}

至此咱们能够看到NettyRuntime.availableProcessors()=Runtime.getRuntime().availableProcessors(),因此EventLoopGroup初始化的线程数为Runtime.getRuntime().availableProcessors() * 2,其实也就是CPU核数*2。

经过其继承图,咱们也能够看到NioEventLoopGroup是最上层其实就是JDK的线程池.

而ExecutorService也是咱们常常用到的线程池,例如

private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

Netty启动引导类Bootstrap做用和tcp通道参数设置

1)服务器启动引导类ServerBootstrap,其实通过调整,netty能够跟以前写过的Reactor线程模型进行一个一一对应。
                1) group :设置线程组模型,Reactor线程模型对比EventLoopGroup
                         1)单线程

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池,此处只有一个线程)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //工做线程组(其实就是一个线程池)
//        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
//            serverBootstrap.group(bossGroup,workGroup)
            serverBootstrap.group(bossGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
//            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

通过调整,以上就是一个单线程模型,只有一个线程服务于全部的Channel.

                         2)多线程

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池,此处只有一个线程)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //工做线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

多线程模型跟咱们经常使用的主从模型的不一样,就在于接待线程组只有一个线程,而工做线程组线程却不少。

                         3)主从线程,最先的样例即是主从线程模型。

            2)channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel(也能够设为Oio,但如今没人用Oio),IO模型就是跟前面说的跟操做系统底层相关的东西,什么同步IO,异步IO,阻塞非阻塞。

咱们来看一下它的源码

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    //返回一个通道工厂
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
//它的参数是一个接口
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);
}
//上面的参数
public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
    /**
     * Creates a new channel.
     */
    @Override
    T newChannel();
}
private volatile ChannelFactory<? extends C> channelFactory; //ChannelFactory是一个接口,此处是一个桥接模式

ChannelFactory<? extends C> channelFactory该参数赋进去的值为改接口的实现类——new ReflectiveChannelFactory<C>(channelClass)

@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

如下泛型T为Channel的实现类,Channel为一个接口,它有不少的实现类

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

因此咱们这里必须传入Channel接口的实现类,而NioServerSocketChannel为Channel接口的实现类,其继承图以下

OioServerSocketChannel也是Channel接口的实现类。

            3) option: 做用于每一个新创建的channel,设置TCP链接中的一些参数,以下
                            ChannelOption.SO_BACKLOG: 存放已完成三次握手的请求的等待队列的最大长度;

三次握手中,linux内核会维护两个队列--syn queue(半链接队列),accept queue(全链接队列),第一次握手,会将请求放入syn queue中,第三次握手,会将请求从syn queue转移到accept queue中。
                            Linux服务器TCP链接底层知识:
                                syn queue:半链接队列,洪水攻击,tcp_max_syn_backlog
                                accept queue:全链接队列, net.core.somaxconn

系统默认的somaxconn参数要足够大 ,若是backlog比somaxconn大,则会优先用后者

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

在NetUtil(Netty源码)中

/**
 * The SOMAXCONN value of the current machine.  If failed to get the value,  {@code 200}  is used as a
 * default value for Windows or {@code 128} for others.
 * 当前机器的最大链接数
 */
public static final int SOMAXCONN;
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
        @Override
        public Integer run() {
            // Determine the default somaxconn (server socket backlog) value of the platform.
            // The known defaults:
            // - Windows NT Server 4.0+: 200
            // - Linux and Mac OS X: 128
            //若是平台为Windows系统
            int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
            //若是平台为linux系统,从/proc/sys/net/core/somaxconn文件中获取
            File file = new File("/proc/sys/net/core/somaxconn");
            BufferedReader in = null;
            try {
                // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
                // try / catch block.
                // See https://github.com/netty/netty/issues/4936
                if (file.exists()) {
                    in = new BufferedReader(new FileReader(file));
                    somaxconn = Integer.parseInt(in.readLine());
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: {}", file, somaxconn);
                    }
                } else {
                    // Try to get from sysctl
                    Integer tmp = null;
                    if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
                        tmp = sysctlGetInt("kern.ipc.somaxconn");
                        if (tmp == null) {
                            tmp = sysctlGetInt("kern.ipc.soacceptqueue");
                            if (tmp != null) {
                                somaxconn = tmp;
                            }
                        } else {
                            somaxconn = tmp;
                        }
                    }

                    if (tmp == null) {
                        logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file,
                                     somaxconn);
                    }
                }
            } catch (Exception e) {
                logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file, somaxconn, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (Exception e) {
                        // Ignored.
                    }
                }
            }
            return somaxconn;
        }
    });
}

从以上代码可知,咱们在linux环境中,somaxconn的取值为

[root@ecs-c222-0002 ~]# cd /proc/sys/net/core/
[root@ecs-c222-0002 core]# ll
total 0
-rw-r--r-- 1 root root 0 Sep 23 23:45 bpf_jit_enable
-rw------- 1 root root 0 Sep 23 23:45 bpf_jit_harden
-rw-r--r-- 1 root root 0 Sep 23 23:45 busy_poll
-rw-r--r-- 1 root root 0 Sep 23 23:45 busy_read
-rw-r--r-- 1 root root 0 Sep 23 23:45 default_qdisc
-rw-r--r-- 1 root root 0 Sep 23 23:45 dev_weight
-rw-r--r-- 1 root root 0 Sep 23 23:45 dev_weight_rx_bias
-rw-r--r-- 1 root root 0 Sep 23 23:45 dev_weight_tx_bias
-rw-r--r-- 1 root root 0 Sep 23 23:45 message_burst
-rw-r--r-- 1 root root 0 Sep 23 23:45 message_cost
-rw-r--r-- 1 root root 0 Sep 23 23:45 netdev_budget
-rw-r--r-- 1 root root 0 Sep 23 23:45 netdev_max_backlog
-r--r--r-- 1 root root 0 Sep 23 23:45 netdev_rss_key
-rw-r--r-- 1 root root 0 Sep 23 23:45 netdev_tstamp_prequeue
-rw-r--r-- 1 root root 0 Sep 23 23:45 optmem_max
-rw-r--r-- 1 root root 0 Sep 23 23:45 rmem_default
-rw-r--r-- 1 root root 0 Sep 23 23:45 rmem_max
-rw-r--r-- 1 root root 0 Sep 23 23:45 rps_sock_flow_entries
-rw-r--r-- 1 root root 0 Aug 28 17:23 somaxconn
-rw-r--r-- 1 root root 0 Sep 23 23:45 warnings
-rw-r--r-- 1 root root 0 Sep 23 23:45 wmem_default
-rw-r--r-- 1 root root 0 Sep 23 23:45 wmem_max
-rw-r--r-- 1 root root 0 Sep 23 23:45 xfrm_acq_expires
-rw-r--r-- 1 root root 0 Sep 23 23:45 xfrm_aevent_etime
-rw-r--r-- 1 root root 0 Sep 23 23:45 xfrm_aevent_rseqth
-rw-r--r-- 1 root root 0 Sep 23 23:45 xfrm_larval_drop
[root@ecs-c222-0002 core]# cat somaxconn
1024

那这个东西是干吗用的呢?假设咱们系统能够接受的最大吞吐量为7000,如今来了10000个并发请求,有7000个被正常处理,还剩3000个,有1024个被放入了accept queue中,剩下的1976个就被直接拒绝了,而.option(ChannelOption.SO_BACKLOG,1024)就是干这个用的,这个1024是由linux操做系统来决定的,设高了没用。

ChannelOption.TCP_NODELAY: 为了解决Nagle的算法问题,默认是false, 要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法;若是要减小发送次数,就设置为false,会累积必定大小后再发送;

TCP/IP协议中,不管发送多少数据,老是要在数据前面加上协议头,同时,对方接收到数据,也须要发送ACK表示确认。为了尽量的利用网络带宽,TCP老是但愿尽量的发送足够大的数据。(一个链接会设置MSS参数,所以,TCP/IP但愿每次都可以以MSS尺寸的数据块来发送数据)。Nagle算法就是为了尽量发送大块数据,避免网络中充斥着许多小数据块。但咱们这里通常会把Nagle算法关闭。

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //欢迎线程组(其实就是一个线程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做线程组(其实就是一个线程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //将两个线程组添加到启动对象中
            serverBootstrap.group(bossGroup,workGroup)
                    //给启动对象添加Socket管道(相似于NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的请求的等待队列的最大长度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延迟(NODELAY),要求高实时性,有数据时立刻发送,就将该选项设置为true关闭Nagle算法;
                    //若是要减小发送次数,就设置为false,会累积必定大小后再发送;
                    //此处应该使用.childOption为为工做线程组设置
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件处理器,这里ChannelInitializer为一个抽象类,initChannel是一个
                    //必需要实现的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //将咱们本身编写的事件处理器添加到客户端的链接管道中
                            //这里为socketChannel(客户端链接管道),有别于NioServerSocketChannel
                            //这里能够添加不少的事件处理器,其实Netty有不少内置的事件处理器可使用
                            //pipeline()能够理解成一个工厂生产流水线,而把事件处理器理解成在流水线上
                            //做业的工人,咱们能够往一条流水线上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服务器启动中");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

以前咱们看到半链接中syn queue这里容易出现洪水攻击。(如下内容来自互联网)

攻击原理:

SYN Flood是当前最流行的DoS(拒绝服务攻击)与DDoS(分布式拒绝服务攻击)的方式之一,这是一种利用TCP协议缺陷,发送大量伪造的TCP链接请求,经常使用假冒的IP或IP号段发来海量的请求链接的第一个握手包(SYN包),被攻击服务器回应第二个握手包(SYN+ACK包),由于对方是假冒IP,对方永远收不到包且不会回应第三个握手包。致使被攻击服务器保持大量SYN_RECV状态的“半链接”,而且会重试默认5次回应第二个握手包,塞满TCP等待链接队列,资源耗尽(CPU满负荷或内存不足),让正常的业务请求链接不进来。

 诊断:

咱们看到业务曲线大跌时,检查机器和DNS,发现只是对外的web机响应慢、CPU负载高、ssh登录慢甚至有些机器登录不上,检查系统syslog:

# tail -f /var/log/messages
Apr 18 11:21:56 web5 kernel: possible SYN flooding on port 80. Sending cookies.

检查链接数增多,而且SYN_RECV 链接特别多:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
TIME_WAIT 16855
CLOSE_WAIT 21
SYN_SENT 99
FIN_WAIT1 229
FIN_WAIT2 113
ESTABLISHED 8358
SYN_RECV 48965
CLOSING 3
LAST_ACK 313
根据经验,正常时检查链接数以下:
# netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
TIME_WAIT 42349
CLOSE_WAIT 1
SYN_SENT 4
FIN_WAIT1 298
FIN_WAIT2 33
ESTABLISHED 12775
SYN_RECV 259
CLOSING 6
LAST_ACK 432

应急处理

根据netstat查看到的对方IP特征:
# netstat -na |grep SYN_RECV|more

利用iptables临时封掉最大嫌疑攻击的IP或IP号段,例如对方假冒173.*.*.*号段来攻击,短时间禁用173.*.*.*这个大号段(要确认当心不要封掉本身的本地IP了!)
# iptables -A INPUT -s  173.0.0.0/8  -p tcp  –dport 80 -j DROP

再分析刚才保留的罪证,分析业务,用iptables解封正常173.*.*.*号段内正常的ip和子网段。这样应急处理很容易误伤,甚至可能由于封错了致使ssh登录不了服务器,并非理想方式。

tcp_synack_retries = 0是关键,表示回应第二个握手包(SYN+ACK包)给客户端IP后,若是收不到第三次握手包(ACK包)后,不进行重试,加快回收“半链接”,不要耗光资源。

不修改这个参数,模拟攻击,10秒后被攻击的80端口即没法服务,机器难以ssh登陆; 用命令netstat -na |grep SYN_RECV检测“半链接”hold住180秒;

修改这个参数为0,再模拟攻击,持续10分钟后被攻击的80端口均可以服务,响应稍慢些而已,只是ssh有时也登陆不上;检测“半链接”只hold住3秒即释放掉。

修改这个参数为0的反作用:网络情况不好时,若是对方没收到第二个握手包,可能链接服务器失败,但对于通常网站,用户刷新一次页面便可。这些能够在高峰期或网络情况很差时tcpdump抓包验证下。

根据之前的抓包经验,这种状况不多,但为了保险起见,能够只在被tcp洪水攻击时临时启用这个参数。

tcp_synack_retries默认为5,表示重发5次,每次等待30~40秒,即“半链接”默认hold住大约180秒。详细解释:

The tcp_synack_retries setting tells the kernel how many times to retransmit the SYN,ACK reply to
an SYN request. In other words, this tells the system how many times to try to establish a passive
TCP connection that was started by another host.
This variable takes an integer value, but should under no circumstances be larger than 255 for the
same reasons as for the tcp_syn_retries variable. Each retransmission will take aproximately 30-40
seconds. The default value of the tcp_synack_retries variable is 5, and hence the default timeout
of passive TCP connections is aproximately 180 seconds.

 

之因此能够把tcp_synack_retries改成0,由于客户端还有tcp_syn_retries参数,默认是5,即便服务器端没有重发SYN+ACK包,客户端也会重发SYN握手包。详细解释:

The tcp_syn_retries variable tells the kernel how many times to try to retransmit the initial SYN
packet for an active TCP connection attempt.
This variable takes an integer value, but should not be set higher than 255 since each
retransmission will consume huge amounts of time as well as some amounts of bandwidth. Each
connection retransmission takes aproximately 30-40 seconds. The default setting is 5, which
would lead to an aproximate of 180 seconds delay before the connection times out.

第二个参数net.ipv4.tcp_max_syn_backlog = 200000也重要,具体多少数值受限于内存。

如下配置,第一段参数是最重要的,第二段参数是辅助的,其他参数是其余做用的:

# vi /etc/sysctl.conf

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
net.ipv4.tcp_synack_retries=0
#半链接队列长度
net.ipv4.tcp_max_syn_backlog=200000
#系统容许的文件句柄的最大数目,由于链接须要占用文件句柄
fs. file -max=819200
#用来应对突发的大并发connect 请求
net.core.somaxconn=65536
#最大的TCP 数据接收缓冲(字节)
net.core.rmem_max=1024123000
#最大的TCP 数据发送缓冲(字节)
net.core.wmem_max=16777216
#网络设备接收数据包的速率比内核处理这些包的速率快时,容许送到队列的数据包的最大数目
net.core.netdev_max_backlog=165536
#本机主动链接其余机器时的端口分配范围
net.ipv4.ip_local_port_range=1000065535
# ……省略其它……
使配置生效:

# sysctl -p

注意,如下参数面对外网时,不要打开。由于反作用很明显,具体缘由请google,若是已打开请显式改成0,而后执行sysctl -p关闭。由于通过试验,大量TIME_WAIT状态的链接对系统没太大影响:

?

1
2
3
4
5
6
7
8
#当出现 半链接 队列溢出时向对方发送syncookies,调大 半链接 队列后不必
net.ipv4.tcp_syncookies=0
#TIME_WAIT状态的链接重用功能
net.ipv4.tcp_tw_reuse=0
#时间戳选项,与前面net.ipv4.tcp_tw_reuse参数配合
net.ipv4.tcp_timestamps=0
#TIME_WAIT状态的链接回收功能
net.ipv4.tcp_tw_recycle=0

 

                4)childOption: 做用于被accept以后的链接

                5) childHandler: 用于对每一个通道里面的数据处理(这里之因此使用childHandler,而不是像客户端同样使用handler,是由于这里使用的是主从线程模型,而客户端使用的是单线程池)

Channel做用

        什么是Channel: 客户端和服务端创建的一个链接通道
        什么是ChannelHandler: 负责Channel的逻辑处理
        什么是ChannelPipeline: 负责管理ChannelHandler的有序容器

    他们是什么关系:
                一个Channel包含一个ChannelPipeline,全部ChannelHandler都会顺序加入到ChannelPipeline中
                建立Channel时会自动建立一个ChannelPipeline,每一个Channel都有一个管理它的pipeline,这关联是永久性的


        Channel当状态出现变化,就会触发对应的事件

        状态:
            (1)channelRegistered: channel注册到一个EventLoop(EventLoop是一个线程,它里面维护了一个Selector即多路复用器,channel会注册到Selector上面)

            (2)channelUnregistered: channel已经建立,可是未注册到一个EventLoop里面,也就是没有和Selector绑定
            (3)channelActive: 变为活跃状态(链接到了远程主机),能够接受和发送数据
            (4)channelInactive: channel处于非活跃状态,没有链接到远程主机

Channel的生命周期为:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered

ChannelHandler和ChannelPipeline核心做用和生命周期
        方法:
            handlerAdded : 当 ChannelHandler 添加到 ChannelPipeline 调用
            handlerRemoved : 当 ChannelHandler 从 ChannelPipeline 移除时调用
            exceptionCaught : 执行抛出异常时调用

从咱们本身写的EchoServerHandler往上追踪,能够看到它们都是顶层接口ChannelHandler的实现类。

public interface ChannelHandler {

    /**
     * 当 ChannelHandler 添加到 ChannelPipeline 调用
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当 ChannelHandler 从 ChannelPipeline 移除时调用
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * 执行抛出异常时调用
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

ChannelHandler下主要是两个子接口
                ChannelInboundHandler:(入站)监遵从外部进入时的事件,好比当有数据到来时,channel被激活时或者不可用时
                    处理输入数据和Channel状态类型改变
                    适配器 ChannelInboundHandlerAdapter(适配器设计模式此类是为了不咱们直接实现ChannelInboundHandler接口,要写过多的方法而设计的适配类

相似于

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler implements ChannelInboundHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }

    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

而该适配器类ChannelInboundHandlerAdapter已经帮咱们适配了全部的接口方法,当咱们继承于这个类的时候,只须要挑选咱们须要的方法便可。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}


                    经常使用的:SimpleChannelInboundHandler(继承于ChannelInboundHandlerAdapter,在处理channelRead0方法的时候,无需主动释放消息缓冲资源,若是咱们在ChannelInboundHandlerAdapter的channelRead方法中,不使用ctx.writeAndFlush回写数据,则必须主动释放缓冲资源,而SimpleChannelInboundHandler的channelRead0方法则不须要,由于它已经在channelRead方法中释放了)

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            //释放缓冲资源
            ReferenceCountUtil.release(msg);
        }
    }
}

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

                ChannelOutboundHandler:(出站)监听本身的IO操做,好比connect,bind等。通常用的很少,看你需不须要监听这些事件。
                处理输出数据,适配器 ChannelOutboundHandlerAdapter,一样为适配器模式,同ChannelInboundHandlerAdapter.

ChannelOutboundHandler接口一样是继承于ChannelHandler接口,但一样也添加了出站特有的方法。

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * 服务端执行bind时,会进入到这里,咱们能够在bind前及bind后作一些操做
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 客户端执行connect链接服务端时进入(主动链接时)
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 客户端与服务端断开时(主动断开)
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

通常咱们能够理解为Inbound为被动接收的时候,Outbound为主动链接的时候。

如今咱们来看一下channel的生命周期。

咱们将EchoServerHandler修改以下,增长所有的监听事件,并打印事件方法名称。

/**
 * 事件处理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 监听读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    /**
     * 监听读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 监听异常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 将channel注册到EventLoop的Selector多路复用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未注册到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有链接,变为活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 没有链接,非活跃状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}

启动EchoServer,打开telnet链接到端口,咱们能够看到

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.

整个过程为链接,发送字符串sdfs,退出链接

服务端日志为

2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-10-01 05:33:54.439  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : sdfs

2019-10-01 05:33:54.442  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.527  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelInactive
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered

整个生命周期正如前面写到同样

Channel的生命周期为:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered