史诗级最强教科书式“NIO与Netty编程”

史诗级最强教科书式“NIO与Netty编程”java

 

1.1 概述编程

java.nio全称java non-blocking IO,是指JDK1.4开始提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,也被称为NIO(既New IO),新增了许多用于处理输入输出的类,这些类都被放在java.nio包及子包下,而且对原包中的不少类进行改写,新增类知足NIO的功能。
NIO和BIO有着相同的目的和做用,可是它们的实现方式彻底不一样,BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率比流I/O高不少。另外,NIO是非阻塞式的,这一点跟BIO也很不相同,使用它能够提供非阻塞式的高伸缩性网络。
NIO主要有三大核心部分 :Channel(通道),Buffer(缓冲区),Selector(选择器)。传统的BIO基于字节流和字符流进行操做,而NIO基于Channel和Buffer(缓冲区)进行操做,数据老是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(好比 :链接打开,数据到达)。所以使用单个线程就能够监听多个数据管道。bootstrap

 

1.2 文件IO数组

1.2.1 概述和核心API服务器

缓冲区(Buffer):其实是一个容器,是一个特殊的数组,缓冲区对象内置流一些机制,可以跟踪和记录缓冲区的状态变化状况。Channel提供从文件、网络读取数据的渠道,可是读取或写入的数据都必须经由Buffer,以下图所示 :网络


在NIO中,Buffer是一个顶层父类,它是一个抽象类,经常使用的Buffer子类有:
ByteBuffer,存储字节数据到缓冲区
ShortBuffer,存储字符串数据到缓冲区
CharBuffer,存储字符数据到缓冲区
IntBuffer,存储整数数据到缓冲区
LongBuffer,存储长整型数据到缓冲区
DoubleBuffer,存储小数到缓冲区
FloatBuffer,存储小数到缓冲区
对于Java中的基本数据类型,都有一个具体Buffer类型与之相对应,最经常使用的天然是ByteBuffer类(二进制数据),该类的主要方法以下所示 :
ByteBuffer类(二进制数据),该类的主要方法以下所示 :
public abstract ByteBuffer put(byte[] b);存储字节数据到缓冲区
public abstract byte[] get();从缓冲区得到字节数据
public final byte[] array();把缓冲区数据转换成字节数组
public static ByteBuffer allocate(int capacity);设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array);把一个现成的数组放到缓冲区中使用
public final Buffer flip();翻转缓冲区,重置位置到初始位置
管道(Channel) :相似于BIO中的stream,例如FileInputStream对象,用来创建到目标(文件,网络套接字,硬件设备等)的一个链接,可是须要注意 :BIO中的stream是单向的,例如FileInputStream对象只能进行读取数据的操做,而NIO中的通道(Channel)是双向的,既能够用来进行读操做,也能够用来进行写操做。经常使用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用于文件的数据读写,DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。多线程


FileChannel类,该类主要用来对本地文件进行IO操做,主要方法以下所示 :
public int read(ByteBuffer dst),读取数据并放到缓冲区中
public int write(ByteBuffer src) , 把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src,long position,long count),从目标通道中复制数据
public long transferTo(long position,long count,WritableByteChannel target),把数据从当前通道复制给目标通道架构

1.2.2 案例并发

1.往本地文件中写数据框架

@Test public void contextLoads() throws Exception { String str = "hello,nio,我是谷"; // 建立输出流  FileOutputStream fileOutputStream = new FileOutputStream("basic.txt"); // 从流中获得一个通道  FileChannel fileChannel = fileOutputStream.getChannel(); // 提供一个缓冲区  ByteBuffer allocate = ByteBuffer.allocate(1024); // 往缓冲区中存入数据  allocate.put(str.getBytes()); // 当数据写入到缓冲区中时,指针指向数据最后一行,那么缓冲区写入通道中输出时,是从最后一行数据开始写入,  // 这样就会致使写入1024的剩余没有数据的空缓冲区。因此须要翻转缓冲区,重置位置到初始位置  allocate.flip(); // 把缓冲区写到通道中,通道负责把数据写入到文件中  fileChannel.write(allocate); // 关闭输出流,由于通道是输出流建立的,因此会一块儿关闭  fileOutputStream.close(); }

NIO中通道是从输出流对象里经过getChannel方法获取到的,该通道是双向的,既能够读,又能够写。在往通道里写数据以前,必须经过put方法把数据存到ByteBuffer中,而后经过通道write能够写数据。在write以前,须要调用flip方法翻转缓冲区,把内部定位到初始位置,这样在接下来写数据时才能把全部数据写到通道里。运行效果以下图 :

@Test  // 从本地文件中读取数据
public void test2() throws Exception {
	File file = new File("basic.text");
	// 1. 建立输入流
	FileInputStream fis = new FileInputStream(file);
	// 2. 获得一个通道
	FileChannel fc = fis.getChannel();
	// 3. 准备一个缓冲区
	ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
	// 4. 从通道里读取数据并存到缓冲区中
	fc.read(buffer);
	System.out.println(new String(buffer.array));
	// 5.关闭
	fis.close();
}
@Test // 使用NIO实现文件复制
public void test3() throws Exception {
	//1. 建立两个流
	FileInputStream fis = new FileInputStream("basic.text");
	FileOutputStream fos = new FileOutputStream("c:\\test\\basic.text");
	// 2. 获得两个通道
	FileChannel sourceFc = fis.getChannel();
	FileChannel destFc = fos.getChannel();
	//3. 复制
	destFc.transferFrom(sourceFc,0,sourceFc.size());
	//4.关闭
	fis.close();
	fos.close();
}

1.3 网络IO

1.3.1 概述和核心API

前面在进行文件IO时用到的FileChannel并不支持非阻塞操做,学习NIO主要就是进行网络IO,Java NIO中的网络通道是非阻塞IO的实现,基于事件驱动,很是适用于服务器须要维持大量链接,可是数据交换量不大的状况,例如一些即时通信的服务等待。
在Java中编写Socket服务器,一般有如下几种模式 :
一个客户端链接用一个线程,优势 :程序编写简单;缺点 :若是链接很是多,分配的线程也会很是多,服务器可能会由于资源耗尽而崩溃。
把每个客户端链接交给一个拥有固定数量线程的链接池,优势 : 程序编写相对简单,能够处理大量的链接。缺点 :线程的开销很是大,链接若是很是多,排到现象会比较严重。
使用Java的NIO,用非阻塞的IO方式处理。这种模式能够用一个线程,处理大量的客户端链接。
1。Selector,选择器,可以检测多个注册的通道上是否有事件发生,便获取事件而后针对每一个事件进行相应的响应处理。这样就能够只用一个单线程去管理多个通道,也就是管理多个链接。这样使得只用在链接真正有读写事件发生时,才会调用函数来进行读写,就大大地减小了系统开销,而且没必要为每一个链接都建立一个线程,不用去维护多个线程,而且避免了多线程之间的上下文切换致使的开销。
该类的经常使用方法以下所示 :
public static Selector open(),获得一个选择器对象
public int select(long timeout),监控全部注册的channel,当其中有注册的IO操做能够进行时,将对应的SelectionKey加入到内部集团中并返回,参数用来设置超时时间
public Set selectedKeys(),从内部集合中获得全部的SelectionKey
2. SelectionKey,表明了Selector和serverSocketChannel的注册关系,一共四种 :
int OP_ACCEPT :有新的网络链接能够accept,值为16
int OP_CONNECT : 表明链接已经创建,值为8
int OP_READ和int OP_WRITE : 表明了读、写操做,值为1和4
该类的经常使用方法以下所示 :
public abstract Selector selector(),获得与之关联的Selector对象
public abstract SelectorChannel channel(),获得与之关联的通道
public final Object attachment(),获得与之关联的共享数据
public abstract SelectionKey interestOps(int ops),设置或改变监听事件
public final boolean isAcceptable(),是否能够accept
public final boolean isReadable(),是否能够读
public final boolean isWritable(),是否能够写
3. ServerSocketChannel,用来在服务器端监听新的客户端Socket链接,经常使用方法以下所示 :
public static ServerSocketChannel open(),获得一个ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public SocketChannel accept(),接受一个链接,返回表明这个链接的通道对象
public final SelectionKey register(Selector sel,int ops),注册一个选择器并设置监听事件
4. SocketChannel,网络IO通道,具体负责进行读写操做。NIO老是把缓冲区的数据写入通道,或者把通道里的数据读出到缓冲区(buffer)。经常使用方法以下所示 :
public static SocketChannel open(),获得一个SocketChannel通道
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public boolean connect(SocketAddress remote),链接服务器
public boolean finishConnect(),若是上面的方法链接失败,接下来就要经过该方法完成链接操做
public int write(ByteBuffer src),往通道里写数据
public int read(ByteBuffer dst),从通道里读数据
public final SelectionKey register(Selector sel,int ops,Object att),注册一个选择器并设置监听事件,最后一个参数能够设置共享数据
public final void close(),关闭通道

3.4 AIO编程

JDK1.7引入了Asynchronous I/O,既AIO。再进行I/O编程中,经常使用到两种模式 :Reactor和Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端获得通知,进行相应的处理。
AIO即NIO2.0,叫作异步不阻塞的IO。AIO引入异步通道的概念,采用 了Proactor模式,简化了程序编写,一个有效的请求才启动一个线程,它的特色是先有操做系统完成后才通知服务端程序启动线程去处理,通常适用于链接数较多且链接时间长的应用。

3.5 IO对比总结

IO的方式一般分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。
BIO方式适用于链接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4之前的惟一选择,但程序直观简单易理解。
NIO方式适用于链接数据多且链接比较短(轻操做)的架构,好比聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
AIO方式适用于链接数目多且链接比较长(重操做)的架构,好比相册服务器,充分调用OS参与并发操做,编程比较复杂,JDK7开始支持。

4.1 概述

Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络IO程序。
Netty是一个基于NIO的网络编程框架,使用Netty能够帮助你快速、简单的开发出一个网络应用,至关于简化和流程化了NIO的开发过程。做为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通讯行业等得到了普遍的应用,知名的Elasticsearch、Dubbo框架内部都采用了Netty。

 

4.2 Netty总体设计

4.2.1 线程模型

1.单线程模型


服务器端用一个线程经过多路复用搞定全部的IO操做(包括链接,读、写等),编码简单,清洗明了,可是若是客户端链接数量较多,将没法支撑,咋们前面的NIO案例就属于这种模型。
2. 线程池模型
服务器端采用一个线程专门处理客户端链接请求,采用一个线程组负责IO操做。在绝大数场景下,该模型都能知足使用。


3.Netty模型


Netty抽象出两组线程池,BossGroup专门负责接收客户端链接,WorkderGroup专门负责网络读写操做。NioEventLoop表示一个不断循环执行处理任务的线程,每一个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道。NioEventLoop内部采用串行化设计,从消息的读取-》解码-》处理-》编码-》发送,始终由IO线程NioEventLoop负责。
一个NioEventLoopGroup下包含多个NioEventLoop
每一个NioEventLoop中包含有一个Selector,一个taskQueue
每一个NioChannel只会绑定在惟一的NioEventLoop上
每一个NioChannel都绑定有一个本身的ChannelPipeline

4.2.2 异步模型

FUTURE、CALLBACK和HANDLER
Netty的异步模型是创建在future和callback的之上的。Future的核心思想是 :假设一个方法fun,计算过程可能很是耗时,等待fun返回显然不适合。那么能够在调用fun的时候,立马返回一个Future,后续能够经过Future去监控方法fun的处理过程。
在使用Netty进行编程时,拦截操做和转换出入站数据只须要提供callback或利用future便可。这使得链式操做简单、高效,并有利于编写可重用的、通用的代码。Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。

 

4.3 核心API

 

  • ChannelHandler及其实现类
    ChannelHandler接口定义了许多事件处理的方法,咱们能够经过重写这些方法区实现具体的业务逻辑。API关系以下图所示:

 


自定义一个Handler类去继承ChannelInboundHandlerAdapter,而后经过重写相应方法实现业务逻辑,通常都须要重写如下方法:
public void channelActive(ChannelHandlerContext ctx),通道就绪事件
public void channelRead(ChannelHandlerContext ctx,Object msg),通道读取数据事件
public void channelReadComplete(ChannelHandlerContext ctx),数据读取完毕事件
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause),通道发生异常事件

  • Pipeline和ChannelPipline
    ChannelPipeline是一个Handler的集合,它负责处理和拦截inbound或者outbound的事件和操做,至关于一个贯穿Netty的链。


ChannelPipeline addFirst(ChannelHandler…handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler…handlers),把一个业务处理类(handler)添加到链中的最后一个位置

  • ChannelHandlerContext
    这是事件处理器上下文对象,Pipeline链中的实际处理节点。每一个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用。
    经常使用方法以下所示 :
    ChannelFuture close(),关闭通道
    ChannelOutboundInvoker flush(),刷新
    ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline中当前ChannelHandler的下一个ChanelHandler开始处理(出站)
  • ChannelOption
    Netty在建立Channel实例后,通常都须要设置ChannelOption参数。ChannelOption是Socket的标准参数,而非Netty首创的。经常使用的参数配置有 :

 

 

  1. ChannelOption.SO_BACKLOG
    对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器可链接队列大小。服务端处理客户端链接请求是顺序处理的,因此同一时间只能处理一个客户端链接。多个客户端来的时候,服务端将不能处理的客户端链接请求放在队列中等待处理,backlog参数指定了队列d大小。
  2. ChannelOption.SO_KEEPALIVE
    一直保持链接活动状态。

 

 

  • ChannelFuture
    表示Channel中异步I/O操做的结果,在Netty中全部的I/O操做都是异步的,I/O的调用会直接返回,调用者并不能马上得到结果,可是能够经过ChannelFuture来获取I/O操做的处理状态。经常使用方法以下所示 :
    Channel channel(),返回当前正在进行IO操做的通道
    ChannelFuture sync(),等待异步操做执行完毕
  • EventLoopGroup和其实现类NioEventLoopGroup
    EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核CPU资源,通常会有多个EventLoop同时工做,每一个EventLoop维护着一个Selector实例。
    EventLoopGroup提供next接口,能够从组里面按照必定规则获取其中一个EventLoop来处理任务。在Netty服务器端编程中,咱们通常都须要提供两个EventLoopGroup,例如:BossEventLoopGroup和WorkderEventLoopGroup。
    一般一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop负责接收客户端的链接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理,以下图所示 :


BossEventLoopGroup一般是一个单线程的EventLoop,EventLoop维护着一个注册了ServerSocketChannel的Selector实例,BossEventLoop不断轮询Selector将链接事件分离出来,一般是OP_ACCEPT事件,而后将接收到的SocketChannel交给WorkderEventLoopGroup,WorkderEventLoopGroup会由next选择其中一个EventLoopGroup来将这个SocketChannel注册到其维护的Selector并对其后续的IO事件进行处理。
经常使用方法以下所示 :
public NioEventLoopGroup(),构造方法
public Future<?> shutdownGracefully(),断开链接,关闭线程

  • ServerBootstrap和Bootstrap
    ServerBootStrap是Netty中的服务端端启动助手,经过它能够完成服务器端的各类配置;Bootstrap是Netty中的客户端启动助手,经过它能够完成客户端的各类配置。经常使用方法以下所示 :
    public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop
    public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoop
    public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
    public B option(ChannelOption option,T value),用来给ServerChannel添加配置
    public ServerBootstrap childOption(ChannelOption childOption,T value),用来给接收到的通道添加配置
    public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler)
    public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号
    public ChannelFuture connect(String inetHost,int inetPort),该方法用于客户端,用来链接服务器端
  • Unpooled类
    这是Netty提供的一个专门用来操做缓冲区的工具类,经常使用方法以下所示 :
    public static ByteBuf copiedBuffer(CharSequence string, Charset charset),经过给定的数据和字符编码返回一个ByteBuf对象(相似于Nio中的ByteBuffer对象)

 

 

入门案例 :

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        // 1. 建立一个线程组,接收客户端链接
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        // 2. 建立一个线程组,处理网络操做
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 3. 建立服务器端启动助手来配置参数
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4. 设置两个线程组
        serverBootstrap.group(bossGroup, workerGroup)
                // 5。 使用NioServerSocketChannel做为服务器端通道的实现
                .channel(NioServerSocketChannel.class)
                // 6. 设置线程队列中等待链接的个数
                .option(ChannelOption.SO_BACKLOG, 128)
                // 7. 保持活跃链接状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 8。 建立一个通道初始化对象
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // 9。往pipline链中添加自定义的handler类
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("...Server ready....");
        // 10. 绑定端口 ,bind方法是异步的,sync同步阻塞
        ChannelFuture sync = serverBootstrap.bind(9999).sync();
        System.out.println("...server start");
        // 11。 关闭通道,关闭线程组
        sync.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server : " + ctx);
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("客户端发来的消息 :" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("就是没钱", CharsetUtil.UTF_8));
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 1. 建立一个线程组
        EventLoopGroup group = new NioEventLoopGroup();
        // 2。建立客户端的启动助手,完成香港配置
        Bootstrap bootstrap = new Bootstrap();
        // 3.设置线程组
        bootstrap.group(group)
                // 4. 设置客户端通道的实现类
                .channel(NioSocketChannel.class)
                // 5. 建立一个通道初始化对象
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 6. 往pipline链中添加自定义handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        // 7. 启动客户端去链接服务器 异步非阻塞,connect是异步的,它会立马返回一个future对象,sync是同步阻塞的用于等待主线程
        System.out.println("...Client is ready ...");
        ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();
        // 8. 关闭链接 异步非阻塞
        sync.channel().closeFuture().sync();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 客户端业务处理类
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("服务器端发来的消息 : " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client : " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("老板,还钱吧", CharsetUtil.UTF_8));
    }
}

聊天案例 :

package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 聊天程序服务器端
 */
public class ChatServer {

    /**
     * 服务器端端口号
     */
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    // 往pipeline链中添加一个解码器
                                    .addLast("decoder", new StringDecoder())
                                    // 往pipeline链中添加一个编码器
                                    .addLast("encoder", new StringEncoder())
                                    // 往pipline链中添加自定义的handler(业务处理类)
                                    .addLast(new ChatServerHandler());
                        }
                    });
            System.out.println("Netty chat Server启动。。。");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("Netty chat Server关闭。。。");
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatServer(9999).run();
    }

}
package com.example.testdemo.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义一个服务器端业务处理类
 */
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static List<Channel> channels = new ArrayList<>();

    /**
     * 读取数据
     *
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inChannel = channelHandlerContext.channel();
        channels.forEach(channel -> {
            if (channel != inChannel) {
                channel.writeAndFlush("[" + inChannel.remoteAddress().toString().substring(1) + "]" + "说 :" + s + "\n");
            }
        });
    }

    /**
     * 通道就绪
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.add(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "上线");
    }

    /**
     * 通道未就绪
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.remove(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "离线");
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * 聊天程序客户端
 */
public class ChatClient {
    /**
     * 服务器端IP地址
     */
    private final String host;

    /**
     * 服务器端端口和
     */
    private final int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 往pipeline链中添加一个解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            // 往pipeline链中添加一个编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            // 往pipeline链中添加自定义的handler(业务处理类)
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("----" + channel.localAddress().toString().substring(1) + "----");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String nextLine = scanner.nextLine();
                channel.writeAndFlush(nextLine  + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("127.0.0.1", 9999).run();
    }
}
package com.example.testdemo.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 自定义一个客户端业务处理类
 */
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

4.6 编码和解码

4.6.1 概述

在编写网络应用程序的时候须要注意codec(编解码器),由于数据在网络中传输的都是二进制字节吗数据,而咱们拿到的目标数据每每不是字节吗数据。所以在发送数据时就须要编码,接收数据时须要解码。
codec的组成部分有两个 :decoder(解码器)和encoder(编码器)。encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据。


其实java的序列化技术就能够做为codec去使用,可是它的硬伤太多 :

 

  1. 没法跨语言,这应该是Java序列化最致命的问题了。
  2. 序列化后体积太大,是二进制编码的5倍多。
  3. 序列化性能过低。
    因为Java序列化硬伤太多,所以Netty自身提供了一些codec,以下所示 :
    Netty提供的解码器 :
  4. StringDecoder,对字符串数据解码
  5. ObjectDecoder,对Java对象进行解码
    Netty提供的解码器:
  6. StringEncoder,对字符串数据进行编码
  7. ObjectEncoder,对Java对象进行编码
    Netty自带的ObjectDecoder和ObjectEncoder能够用来实现POJP对象或各类业务对象的编码和解码,但其内部使用的还是Java序列化技术,因此不建议使用。

 

4.6.2 Google的Protobuf

Protobuf是Google发布的开源项目,全称Google Protocol Buffers,特定以下 :

 

  • 支持跨平台、多语言(支持目前绝大多数语言,例如C++、C#、Java、Python等)
  • 高性能,高可靠性
  • 使用protobuf编译器能自动生成代码,Protpbuf是将类的定义使用.protp文件进行描述,而后经过protoc.exe编译器根据.proto自动生成.java文件
    目前在使用Netty开发时,常常会结合Protobuf做为codec(编解码器)去使用,具体用法以下所示 :

 

<!--  -->

<dependency>

<groupId>com.google.protobuf</groupId>

<artifactId>protobuf-java</artifactId>

<version>3.9.1</version>

</dependency>

 


经过protoc.exe根据该文件生成java类,以下操做所示 :


在protoc.exe根目录运行命令 :protoc --java_out=. Book.pro

5.1 自定义RPC

概述:
RPC(Remote Procedure Call),即远程过程调用,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络实现的技术。常见的RPC框架有 :Dubbo,Grpc。

 

 

  1. 服务消费房(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成可以进行传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给server stub
  7. server stub将返回导入结果进行编码并发送至消费方
  8. client stub接收到消息并进行解码
  9. 服务消费方(client)获得结果
    RPC的目标就是将2-8这些步骤都封装起来,用户无需关系这些细节,能够像调用本地方法同样便可完成远程服务调用。

 

5.2 设计与实现

5.2.1 结果设计

 

 

 

  • Client(服务的调用方) :两个接口 + 一个包含main方法的测试类
  • Client Stub :一个客户端代理类 + 一个客户端业务处理类
  • Server(服务的提供方) :两个接口 + 两个实现类
  • Server Stub :一个网络处理服务器 + 一个服务器业务处理类
    注意 :服务调用方的接口必须跟服务提供方的接口保持一致(包路径能够不一致)
    最终要实现的目标是 :在TestNettyRPC中远程调用HelloRPCImpl或HelloNettyImpl中的方法
package com.example.testdemo.rpc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyRPCServer {
    private int port;
    public NettyRPCServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 编码器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            // 解码器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            // 服务器端业务处理类
                            pipeline.addLast(new InvokerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("...Server is ready...");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRPCServer(9999).start();
    }
}
package com.example.testdemo.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class NettyRPCProxy {

    /**
     * 根据接口建立代理对象
     *
     * @param target
     * @return
     */
    public static Object create(Class target) {
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 封装classinfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(getClass().getName());
                classInfo.setMethodName(method.getName());
                classInfo.setObjects(args);
                classInfo.setTypes(method.getParameterTypes());

                // 开始用Netty发送数据
                EventLoopGroup group = new NioEventLoopGroup();
                ResultHandler resultHandler = new ResultHandler();
                try {
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(group)
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    // 编码器
                                    pipeline.addLast("encoder", new ObjectEncoder());
                                    // 解码器
                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                    // 客户端业务处理类
                                    pipeline.addLast("handler", resultHandler);
                                }
                            });
                    ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
                    future.channel().writeAndFlush(classInfo).sync();
                    future.channel().closeFuture().sync();
                } finally {
                    group.shutdownGracefully();
                }

                return resultHandler.getResponse();
            }
        });
    }

}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * 服务器端业务处理类
 *
 */
public class InvokerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 获得某接口下某个实现类的名字
     *
     * @param classInfo
     * @return
     * @throws Exception
     */
    private String getImplClassName(ClassInfo classInfo) throws Exception {
        // 服务方接口和实现类所在的包路径
        String interfacePath = "com.example.testdemo.rpc";
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(lastDot);
        Class supperClass = Class.forName(interfacePath + interfaceName);
        Reflections reflection = new Reflections(interfacePath);
        // 获得某接口下的全部实现类
        Set<Class> implClassSet = reflection.getSubTypesOf(supperClass);
        if (implClassSet.size() == 0) {
            System.out.println("未找到实现类");
            return null;
        } else if (implClassSet.size() > 1) {
            System.out.println("找个多个实现类,未明确使用哪个");
            return null;
        } else {
            // 把集合转换为数组
            Class[] classes = implClassSet.toArray(new Class[0]);
            // 获得实现类的名字
            return classes[0].getName();
        }
    }

    /**
     * 读取客户端发来的数据并经过反射调用实现类的方法
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo)msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = classInfo.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        // 经过反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }
}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客户端业务处理类
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}
package com.example.testdemo.rpc;

import java.io.Serializable;

public class ClassInfo implements Serializable {

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypes() {
        return types;
    }

    public void setTypes(Class<?>[] types) {
        this.types = types;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }

    private static final long serialVersionUID = 1L;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] types;

    /**
     * 参数列表
     */
    private Object[] objects;


}
相关文章
相关标签/搜索