Netty框架

 

学习Netty框架,三连问:

  什么是Netty框架?html

  为何要用Netty框架?java

  怎么用Netty框架?spring

 

什么是Netty框架?

  Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。apache

  Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。编程

  Netty 是一个基于 NIO 的网络编程框架,使用 Netty 能够帮助你快速、简单的开发出一个网络应用,至关于简化和流程化了 NIO 的开发过程。bootstrap

  做为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通讯行业等得到了普遍的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。api

 

为何要用Netty框架?

由于Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了JDK 原生 NIO 程序的问题。缓存

  JDK 原生 NIO 程序的问题:安全

    JDK 原生也有一套网络应用程序 API,可是存在一系列问题,主要以下:服务器

      1)NIO 的类库和 API 繁杂,使用麻烦:你须要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

      2)须要具有其余的额外技能作铺垫:例如熟悉 Java 多线程编程,由于 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程很是熟悉,才能编写出高质量的 NIO 程序。

      3)可靠性能力补齐,开发工做量和难度都很是大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特色是功能开发相对容易,可是可靠性能力补齐工做量和难度都很是大。

      4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会致使 Selector 空轮询,最终致使 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,可是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生几率下降了一些而已,它并无被根本解决。

  Netty的主要特色有:

    1)设计优雅:适用于各类传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,能够清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无链接数据报套接字支持(自 3.1 起)。

    2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其余依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。

    3)高性能、吞吐量更高:延迟更低;减小资源消耗;最小化没必要要的内存复制。

    4)安全:完整的 SSL/TLS 和 StartTLS 支持。

    5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 能够被及时修复,同时,更多的新功能会被加入。

Netty 常见的使用场景以下:

  1)互联网行业:在分布式系统中,各个节点之间须要远程服务调用,高性能的 RPC 框架必不可少,Netty 做为异步高性能的通讯框架,每每做为基础通讯组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通讯,Dubbo 协议默认使用 Netty 做为基础通讯组件,用于实现各进程节点之间的内部通讯。

  2)游戏行业:不管是手游服务端仍是大型的网络游戏,Java 语言获得了愈来愈普遍的应用。Netty 做为高性能的基础通讯组件,它自己提供了 TCP/UDP 和 HTTP 协议栈。

很是方便定制和开发私有协议栈,帐号登陆服务器,地图服务器之间能够方便的经过 Netty 进行高性能的通讯。

  3)大数据领域:经典的 Hadoop 的高性能通讯和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通讯,它的 Netty Service 基于 Netty 框架二次封装实现。

有兴趣的读者能够了解一下目前有哪些开源项目使用了 Netty的Related Projects

 

怎么用?(简单入门)

  可参考学习   netty 官方API: http://netty.io/4.1/api/index.html

 

 配置 pom.xml

1         <dependency>
2             <groupId>io.netty</groupId>
3             <artifactId>netty-all</artifactId>
4             <version>4.1.31.Final</version>
5         </dependency>

 

 配置 ServerConnection.java 

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ServerMsgHandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ServerConnection {
    
    Logger logger = Logger.getLogger(ServerConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    public ServerConnection(int port) {
        this.port = port ;
    }
    
    /***
     * NioEventLoopGroup 是用来处理I/O操做的多线程事件循环器,
     * Netty提供了许多不一样的EventLoopGroup的实现用来处理不一样传输协议。 在这个例子中咱们实现了一个服务端的应用,
     * 所以会有2个NioEventLoopGroup会被使用。 第一个常常被叫作‘boss’,用来接收进来的链接。
     * 第二个常常被叫作‘worker’,用来处理已经被接收的链接, 一旦‘boss’接收到链接,就会把链接信息注册到‘worker’上。
     * 如何知道多少个线程已经被使用,如何映射到已经建立的Channels上都须要依赖于EventLoopGroup的实现,
     * 而且能够经过构造函数来配置他们的关系。
     */
    public void run() {
        System.out.println("启动服务端Netty链接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * ServerBootstrap 是一个服务端启动NIO服务的辅助启动类 , 能够在这个服务中直接使用Channel
         */
        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 这一步是必须的,若是没有设置group将会报java.lang.IllegalStateException: group not set异常
         */
        bootstrap = bootstrap.group(bossGroup, workerGroup) ;
        /***
         * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的链接
         * 这里告诉Channel如何获取新的链接.
         */
        bootstrap = bootstrap.channel(NioServerSocketChannel.class) ;
        /***
         * 绑定端口
         */
        bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ;
        /***
         * 你能够设置这里指定的通道实现的配置参数。 咱们正在写一个TCP/IP的服务端,
         * 所以咱们被容许设置socket的参数选项好比tcpNoDelay和keepAlive。
         * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此能够对ChannelOptions的有一个大概的认识。
         */
        bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ;
        /***
         * option()是提供给NioServerSocketChannel用来接收进来的链接。
         * childOption()是提供给由父管道ServerChannel接收到的链接,
         * 在这个例子中也是NioServerSocketChannel。
         */
        bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ;
        /***
         * 这里的事件处理类常常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类,
         * 目的是帮助使用者配置一个新的Channel。
         * 也许你想经过增长一些处理类好比NettyServerHandler来配置一个新的Channel
         * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增长更多的处理类到pipline上,
         * 而后提取这些匿名类到最顶层的类上。
         */
        bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
                    pipeline.addLast("handler", new ServerMsgHandler()) ;
                }
            }) ;
        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("服务端开始监听") ;
//                    logger.info("服务端开始监听") ;
                }else {
                    logger.error("服务端没法使用监听端口",future.cause()) ;
                }
            }
        }) ;
    }
    
    public void shutdown() {
        logger.info("关闭 Server 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }

}
View Code

 

配置 ServerMsgHandler.java 

package com.example.demo.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerMsgHandler extends ChannelInboundHandlerAdapter {

    /**
     * 这里咱们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用,
     * 这个例子中,收到的消息的类型是ByteBuf
     * 
     * @param ctx
     *            通道处理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
        System.out.println("服务端接收的消息:"+msg.toString()) ;
        //向客户端发送消息
        String str = msg.toString() ;
        if ( "高性能NIO框架——Netty".equals(str) ) {
            ctx.writeAndFlush( "客户端 , 你好!") ;
        }
//        ctx.writeAndFlush(msg.toString()+"你好!") ;
    }
    
    /***
     * 这个方法会在发生异常时触发
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 因为 IO
         * 错误或者处理器在处理事件时抛出的异常时。在大部分状况下,捕获的异常应该被记录下来 而且把关联的 channel
         * 给关闭掉。然而这个方法的处理方式会在遇到不一样异常的状况下有不 同的实现,好比你可能想在关闭链接以前发送一个错误码的响应消息。
         */
        // 出现异常就关闭
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
}
View Code

 

 

配置 ClientConnection.java

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ClientMsgHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ClientConnection {

    Logger logger = Logger.getLogger(ClientConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    private Channel channel ;
    
    public ClientConnection(int port) {
        this.port = port ;
    }    
    
    public Channel getChannel() {
        return this.channel ;
    }
    
    /***
     * NioEventLoopGroup 是用来处理I/O操做的多线程事件循环器,
     * Netty提供了许多不一样的EventLoopGroup的实现用来处理不一样传输协议。 在这个例子中咱们实现了一个服务端的应用,
     * 所以会有2个NioEventLoopGroup会被使用。
     * 第一个常常被叫作‘boss’,用来接收进来的链接。
     * 第二个常常被叫作‘worker’,用来处理已经被接收的链接, 一旦‘boss’接收到链接,就会把链接信息注册到‘worker’上。
     * 如何知道多少个线程已经被使用,如何映射到已经建立的Channels上都须要依赖于EventLoopGroup的实现,
     * 而且能够经过构造函数来配置他们的关系。
     * @throws InterruptedException 
     */
    public void run() {
        System.out.println("启动客户端Netty链接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * Bootstrap 是客户端一个启动NIO服务的辅助启动类 , 能够在这个服务中直接使用Channel
         */
        Bootstrap bootstrap = new Bootstrap() ;
//        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 这一步是必须的,若是没有设置group将会报java.lang.IllegalStateException: group not set异常
         */
//        bootstrap.group(bossGroup, workerGroup)
        bootstrap.group(bossGroup)
        /***
         * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的链接
         * 这里告诉Channel如何获取新的链接.
         */
//            .channel(NioServerSocketChannel.class)
            .channel(NioSocketChannel.class)
            /***
             * 绑定端口,等价于bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把这个注释掉,否则会报错
             */
//            .localAddress(new InetSocketAddress(port))
            .remoteAddress("127.0.0.1", port)
            /***
             * 你能够设置这里指定的通道实现的配置参数。 咱们正在写一个TCP/IP的服务端,
             * 所以咱们被容许设置socket的参数选项好比tcpNoDelay和keepAlive。
             * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此能够对ChannelOptions的有一个大概的认识。
             */
//            .option(ChannelOption.SO_BACKLOG, 128)
            /***
             * option()是提供给NioServerSocketChannel用来接收进来的链接。
             * childOption()是提供给由父管道ServerChannel接收到的链接,
             * 在这个例子中也是NioServerSocketChannel。
             */
//            .childOption(ChannelOption.SO_KEEPALIVE, true)
            /***
             * 这里的事件处理类常常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类,
             * 目的是帮助使用者配置一个新的Channel。
             * 也许你想经过增长一些处理类好比NettyServerHandler来配置一个新的Channel
             * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增长更多的处理类到pipline上,
             * 而后提取这些匿名类到最顶层的类上。
             */
//            .childHandler(new ChannelInitializer<SocketChannel>() {
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
//                    pipeline.addLast("handler", new ClientMsgHandler()) ;
                    pipeline.addLast(new ClientMsgHandler()) ;
                }
            }) ;
/*        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("客户端开始监听") ;
                    logger.info("客户端开始监听") ;
                }else {
                    logger.error("客户端没法使用监听端口",future.cause()) ;
                }
            }
        }) ;    */
        
/*        //绑定端口,开始接收进来的链接
        ChannelFuture cFuture;
        try {
//            cFuture = bootstrap.connect(host, port).sync();            
//            cFuture = bootstrap.bind("127.0.0.1", port).sync();
            cFuture = bootstrap.bind().sync() ;
            //在这里拿到这个channel,是为了 等下 测试消息发送 用的
            channel = cFuture.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        */
                    
        ChannelFuture cf;
        try {
            cf = bootstrap.connect().sync();
            channel = cf.channel();
            channel.writeAndFlush("ClientConnection客户端已成功启动!");
            
            cf.addListener(new ChannelFutureListener() {            
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if ( future.isSuccess() ) {
                        System.out.println("客户端开始监听") ;
//                        logger.info("客户端开始监听") ;
                    }else {
//                        logger.error("客户端没法使用监听端口",future.cause()) ;
                    }
                }
            }) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
    public void shutdown() {
        System.out.println("关闭 Client 端口");
        logger.info("关闭 Client 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }
    
}
View Code

 

配置 ClientMsgHandler.java

package com.example.demo.handler;

import org.apache.log4j.Logger;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientMsgHandler extends ChannelInboundHandlerAdapter {

    Logger logger = Logger.getLogger(ClientMsgHandler.class) ;
    
    /**
     * 这里咱们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用,
     * 这个例子中,收到的消息的类型是ByteBuf
     * 
     * @param ctx
     *            通道处理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
//        super.channelRead(ctx, msg);
        System.out.println("客户端接收的消息:"+msg.toString()) ;
        //向服务端发送消息
        ctx.writeAndFlush("服务端 , 你好!") ;
    }
    
    /***
     * 这个方法会在发生异常时触发
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 因为 IO
         * 错误或者处理器在处理事件时抛出的异常时。在大部分状况下,捕获的异常应该被记录下来 而且把关联的 channel
         * 给关闭掉。然而这个方法的处理方式会在遇到不一样异常的状况下有不 同的实现,好比你可能想在关闭链接以前发送一个错误码的响应消息。
         */
        // 出现异常就关闭
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        super.channelActive(ctx);
//        logger.info("client channel active");
        System.out.println("client channel active");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel inactive");
        ctx.close() ;
    }
    
}
View Code

 

 

编写测试类 DemoTest.java 

 1 package com.example.demo.netty;
 2 
 3 import org.junit.runner.RunWith;
 4 import org.springframework.boot.test.context.SpringBootTest;
 5 import org.springframework.test.context.ActiveProfiles;
 6 import org.springframework.test.context.junit4.SpringRunner;
 7 
 8 import com.example.demo.DemoApplicationTests;
 9 import com.example.demo.net.ClientConnection;
10 import com.example.demo.net.ServerConnection;
11 
12 import io.netty.channel.Channel;
13 
14 @RunWith(SpringRunner.class)
15 @SpringBootTest(classes = DemoApplicationTests.class)
16 @ActiveProfiles("test")
17 public class DemoTest {
18 
19     public static void main(String[] args) {
20         int port = 2222 ;
21         Thread serverThread = new Thread( new Runnable() {            
22             @Override
23             public void run() {
24                 new ServerConnection(port).run() ;
25             }
26         } ) ;
27         serverThread.start() ;
28         ClientConnection clientConnection = new ClientConnection(port) ;
29 //        Thread clientThread = new Thread( new Runnable() {            
30 //            @Override
31 //            public void run() {
32 //                new ClientConnection(port).run() ;
33 //            }
34 //        } ) ;
35 //        clientThread.start() ;
36         
37         clientConnection.run();
38         Channel channel = clientConnection.getChannel() ;
39         channel.writeAndFlush("高性能NIO框架——Netty");
40         
41 //        new Thread( ()->{
42 //            new ServerConnection(port) ;
43 //        } ).start() ;        
44 //        new Thread( ()->{
45 //            new ClientConnection(port) ;
46 //        } ).start() ;
47         
48     }
49 }
View Code

 

服务端与客户端的区别:

  1. 在客户端只建立了一个NioEventLoopGroup实例,由于客户端并不须要使用I/O多路复用模型,须要有一个Reactor来接受请求。只须要单纯的读写数据便可

  2. 在客户端只须要建立一个Bootstrap对象,它是客户端辅助启动类,功能相似于ServerBootstrap。

 

 

共同窗习,共同进步,如有补充,欢迎指出,谢谢!