Netty入门学习总结

Netty 概述

原生 NIO 存在的问题

  1. NIO 的类库与 API 繁杂,须要熟练掌握 Selector、ServerSocketChannel、SocketChannel、Bytebuffer 等。
  2. 要求熟悉 Java 多线程编程和网络编程。
  3. 开发工做量和难度大,例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等。

什么是 Netty

  • Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠的网络 I/O 程序。
  • Netty 能够快速、简单的开发一个网络应用,至关于简化和流程化了 NIO 的开发过程。
  • Netty 是目前最流行的 NIO 框架,在互联网领域、大数据分布式计算领域、游戏行业、通讯行业等得到了普遍的应用,Elasticsearch、Dubbo 框架内部都采用了 Netty。

Netty 做为业界最流行的 nio 框架之一,它的健壮性、功能、性能、可定制性、可扩展性都是数一数二的。html

优势java

  1. API 使用简单,开发门槛低。
  2. 功能强大,预置了多种编解码功能,支持多种主流协议。
  3. 定制能力强,经过 channelHandler 对通讯框架进行灵活扩展。
  4. 高性能。
  5. 成熟,稳定,修复了全部的 NIO BUG.
  6. 社区活跃。
  7. 经历了大规模的商业应用考验,质量获得验证。

线程模型介绍

目前存在的线程模型有:react

  • 传统阻塞 I/O 服务模型。
  • Reactor 模式。
  • 根据 Reactor 的数量和处理资源线程池的数量不一样,有三种不一样实现:web

    • 单 Reactor 单线程。
    • 单 Reactor 多线程。
    • 主从 Reactor 多线程。
  • Netty 线程模式主要基于主从 Reactor 多线程模型作了必定的改进,其中主从 Reactor 多线程模型有多个 Reactor。

传统阻塞 I/O 服务模型

传统阻塞I/O服务模型

模型特色编程

  1. 采用阻塞 I/O 获取输入的数据。
  2. 每一个链接都须要独立的线程完成数据的输入、业务处理、数据返回。

    问题分析bootstrap

  3. 当并发数很大时,会建立大量的线程,占用很大的系统资源。
  4. 链接建立后,若是当前线程暂时没有数据可读,该线程会阻塞在 Read 操做上,形成线程资源浪费。

    解决方案浏览器

  5. 基于I/O复用模型:多个链接共用一个阻塞对象,应用程序只须要在一个阻塞对象等待,无需阻塞全部链接,当某个链接有新的数据能够处理时,操做系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
  6. 基于线程池复用线程资源:没必要为每个链接建立线程,将链接完成后的业务处理任务分配给线程进行处理,一个线程能够处理多个链接的业务。

Reactor 模式

Reactor模式

  1. Reactor 模式,经过一个或多个输入同时传递给服务器处理的模式(基于事件驱动)。
  2. 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程。
  3. Reactor 模式使用了 I/O 复用监听事件,受到事件后分发给某个线程(进程),网络服务高并发处理的关键。

核心组成缓存

  1. Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序对 I/O 事件做出反应。
  2. Handlers:处理程序执行 I/O 事件要完成的实际事件。Reactor 经过调用适当的处理程序来响应 I/O 事件,处理程序非阻塞操做。

单 Reactor 单线程

单Reactor单线程

  • select 是 I/O 复用模型介绍的标准网络编程 API,能够实现应用程序经过一个阻塞对象监听多路链接请求。
  • Reactor 对象经过 Select 监控客户端请求事件,收到事件后经过 Dispatch 进行分发。
  • 若是创建链接请求事件,则由 Acceptor 经过 Accept 处理链接请求,而后建立一个 Handler 对象处理链接完成后的后续业务处理。
  • 若是不是创建链接事件,则 Reactor 会分发给调用链接对应的 Handler 来响应。
  • Handler 会完成 Read—>业务处理—>Send 的完整业务流程。

优缺点服务器

  • 优势:模型简单,无多线程、进程通讯、竞争的问题,所有由一个线程完成。
  • 缺点:性能问题,只有一个线程没法发挥出多核 CPU 的性能,Handler 在处理某链接业务时,整个进程没法处理其余链接事件,容易致使性能瓶颈。
  • 缺点:可靠性问题,线程意外停止,或者进入死循环,会致使整个系统通讯模块不可用,不能接收和处理外部信息,节点故障。
  • 使用场景:客户端数量有限,业务处理快捷(例如 Redis 在业务处理的时间复杂度为 O(1)的状况)。

单 Reactor 多线程

单Reactor多线程

  • Reactor 经过 select 监控客户端请求事件,收到事件后,经过 dispatch 进行分发。
  • 若是是创建链接的请求,则由 Acceptor 经过 accept 处理链接请求,同时建立一个 handler 处理完成链接后的后续请求。
  • 若是不是链接请求,则由 Reactor 分发调用链接对应的 handler 来处理。
  • Handler 只负责响应事件,不作具体的业务处理,经过 read 读取数据后,会分发给后面的 worker 线程池中的某个线程处理业务。
  • Worker 线程池会分配独立的线程处理真正的业务,并将结果返回给 Handler。
  • Handler 收到响应后,经过 send 方法将结果反馈给 Client。

优缺点网络

  • 优势:能够充分的利用多核 CPU 的处理能力。
  • 缺点:多线程数据共享、访问操做比较复杂,Reactor 处理全部的事件的监听和响应,由于 Reactor 在单线程中运行,在高并发场景容易出现性能瓶颈。

主从 Reactor 多线程

主从Reactor多线程

  • Reactor 主线程 MainReactor 对象经过 select 监听链接事件,收到事件后,经过 Acceptor 处理链接事件。
  • 当 Acceptor 处理链接事件后,MainReactor 将建立好的链接分配给 SubReactor。
  • SubReactor 将链接加入到链接队列进行监听,并建立 Handler 进行各类事件处理。
  • 当有新事件发生时,SubReactor 调用对应的 Handler 进行处理。
  • Handler 经过 read 读取数据,分发给后面的 Worker 线程池处理。
  • Worker 线程池会分配独立的 Worker 线程进行业务处理,并将结果返回。
  • Handler 收到响应结果后,经过 send 方法将结果返回给 Client。

优缺点:

  • 优势:父线程和子线程的职责明确,父线程只须要接收新链接,子线程完成后续业务处理。
  • 优势:父线程与子线程的数据交互简单,Reactor 主线程是须要把新链接传给子线程,子线程无需返回数据。
  • 缺点:编程复杂度较高。

Reactor 模式小结

单 Reactor 单线程:前台接待员和服务员是同一我的,全程为顾客服务。

单 Reactor 多线程:一个前台接待员,多个服务员。

主从 Reactor 多线程:多个前台接待员,多个服务员。

  1. 响应快,虽然 Reactor 自己是同步的,但没必要为单个同步事件所阻塞。
  2. 最大程度的避免了复杂的多线程及同步问题,避免了多线程/进程的切换开销。
  3. 扩展性好,能够方便的经过增长 Reactor 势力个数充分利用 CPU 资源。
  4. 复用性好,Reactor 模型自己与具体事件处理逻辑无关,具备很高的复用性。

Netty 模型

Netty工做架构图

服务端端包含 1 个 Boss NioEventLoopGroup 和 1 个 Worker NioEventLoopGroup。

NioEventLoopGroup 至关于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每一个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。

每一个 Boss NioEventLoop 循环执行的任务包含 3 步:

  1. 轮训 Accept 事件。
  2. 处理 Accept I/O 事件,与 Client 创建链接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上。
  3. 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其它线程提交到该 eventloop 的任务。

每一个 Worker NioEventLoop 循环执行的任务包含 3 步:

  1. 轮询 read、write 事件。
  2. 处理 I/O 事件,即 read、write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理。
  3. 处理任务队列中的任务,runAllTasks。
  4. 每一个 Worker NioEventLoop 处理业务时,会使用 PipeLine(管道),pipeline 中包含了 channel,即经过 pipeline 能够获取对应通道,通道中维护了不少处理器。

Netty 简单通信代码案例

/**
 * @author jack
 */
public class SimpleServer {

    public static void main(String[] args) {
        //建立bossGroup , 只负责链接请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //建立workerGroup , 负责客户端业务处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        //建立服务端启动对象,配置参数.
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)//设置线程组
                    .channel(NioServerSocketChannel.class)//使用NioSocketChannel做为服务端的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列获得链接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动链接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {//建立一个通道测试对象
                        //给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler()); //自定义handler
                        }
                    });//workerGroup的EventLoop对应的管道设置处理器

            System.out.println("服务端准备就绪...");

            //绑定一个端口而且同步,生成了一个channelFuture对象
            ChannelFuture cf = serverBootstrap.bind(6667).sync();
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}
/**
 * 服务端自定义handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取实际数据(这里咱们能够读取客户端发送的消息)
     *
     * @param ctx 上下文对象,含有管道pipeline,通道channel ,地址
     * @param msg 客户端发送的内容
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址为:" + ctx.channel().remoteAddress());
    }

    /**
     * 读取完成后
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端", CharsetUtil.UTF_8));
    }


    /**
     * 处理异常,通常是关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
/**
 * @author jack
 */
public class SimpleClient {

    public static void main(String[] args) {
        //客户端须要一个事件循环组
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        //建立客户端启动对象
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(clientLoopGroup)//设置线程组
                    .channel(NioSocketChannel.class)//设置客户端通道实现类
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定义处理器
                        }
                    });

            System.out.println("客户端已准备就绪");
            //链接服务器
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 6667).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }

}
/**
 * 客户端自定义handler
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 通道准备就绪时调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!", CharsetUtil.UTF_8));
    }

    /**
     * 获取客户端回复
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务端回复: " + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 处理异常,通常是关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

运行结果

服务端

客户端

任务队列中的 task 有 3 种使用场景

  1. 用户自定义的普通任务

    ctx.channel().eventLoop().execute(() -> System.out.println("任务逻辑"));
  2. 用户自定义的定时任务

    ctx.channel().eventLoop().schedule(() -> System.out.println("任务逻辑..."), 60, TimeUnit.SECONDS);
  3. 非当前 reactor 线程调用 channel 的各类方法

    例如在推送系统的业务线程里面,根据用户的标识,找到对应的 channel 引用,而后调用 write 类方法向该用户推送消息,就会进入到这种场景。最终的 write 会提交到任务队列中后被异步消费。

Netty 模型小结

  1. Netty 抽象出两组线程池:BossGroup 专门负责接收客户端的链接;WorkerGroup 专门负责网络的读写。
  2. NioEventLoop 表示一个不断循环的执行任务的线程,每一个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 的网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息读取->处理->编码->发送始终由 I/O 线程 NioEventLoop 负责。
  4. NioEventLoopGroup 下包含多个 NioEventLoop。
  5. 每一个 NioEventLoop 中包含一个 Selector,一个 taskQueue。
  6. 每一个 NioEventLoop 的 Selector 能够注册监听多个 NioChannel。
  7. 每一个 NioChannel 只会绑定惟一的 NioEventLoop。
  8. 每一个 NioChannel 都会绑定一个本身的 ChannelPipeLine。

Netty 核心组件

BootStrap、ServerBootStrap

一个 Netty 应用一般由一个 BootStrap 开始,主要做用是配置整个 Netty 程序,串联各个组件,Netty 中的 BootStrap 类是客户端程序的启动引导类,ServerBootStrap 是服务端启动引导类。

经常使用方法:

方法 含义
public ServerBootstrap group(EventLoopGroup parentGroup , EventLoopGroup childGroup) 做用于服务器端,用来设置两个 EventLoop
public B group(EventLoopGroup group) 做用于客户端,用来设置一个 EventLoopGroup
public B channel(Class<? extends C> channelClass) 用来设置一个服务端的通道实现
public <T> B option(ChannelOption<T> option, T value) 用来给 ServerChannel 添加配置
public <T> ServerBootStrap childOption (ChannelOption<T> childOption, T value) 用来给接收到的通道添加配置
public ServerBootstrap childHandler (ChannelHandler childHandler) 用来设置业务处理类(自定义 handler)
public B handler(ChannelHandler handler) Handler 则在服务器端自己 bossGroup 中使用
public ChannelFuture bind(int inetPort) 用于服务端,设置占用的端口号
public ChannelFuture connect (String inetHost,int inetPort) 该方法用于客户端,用来链接服务器

Future、ChannelFuture

Netty 中全部操做都是异步的,不能当即得知消息是否被正确处理,但能够过一会等它执行完成或直接注册一个监听器,具体实现经过 Future 和 ChannelFuture,它们能够注册一个监听,当操做执行成功或失败时,监听会自动触发注册的监听事件。

经常使用方法:

方法 含义
Channel channel() 返回当前正在进行 I/O 操做的通道
ChannelFuture sync() 等待异步操做执行完毕

Channel

  1. Channel 是 Netty 网络通讯组件,可以用于执行网络 I/O 操做。
  2. 经过 Channel 可得到当前网络链接的通道状态、配置参数(好比缓冲区大小)。
  3. Channel 提供异步的网络 I/O 操做(创建链接,读写,绑定端口),异步调用意味着任何 I/O 调用都将当即返回,但不保证在调用结束时所请求的 I/O 操做已完成。
  4. 调用当即返回一个 ChannelFuture 实例,经过注册监听器,能够在 I/O 操做成功、失败或取消时回调通知调用方。
  5. 支持关联 I/O 操做与对应的处理程序。
  6. 不一样协议、不一样的阻塞类型的链接是不一样的,Channel 类型与之对应。

经常使用的 Channel 类型有:

方法 含义
NioSocketChannel 异步的客户端 TCP Socket 链接
NioServerSocketChannel 异步的服务端 TCP Socket 链接
NioDatagramChannel 异步的 UDP 链接
NioStcpChannel 异步的客户端 Sctp 链接
NioSctpServerChannel 异步的服务端 Sctp 链接

Selector

  • Netty 基于 Selector 对象实现 I/O 多路复用,经过 Selector 一个线程能够监听多个链接的 Channel 事件。
  • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就能够自动不断地查询(select)这些 Channel 中是否有就绪的 I/O 事件(可读、可写、完成网络链接等),这样程序就能够简单地使用一个线程高效地管理多个 Channel。

ChannelHandler

  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操做,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 自己并无提供不少方法,由于这个接口有许多的方法须要实现,方便使用期间,能够继承他的子类。

相关子接口和实现类

  • ChannelInboundHandler : 用于处理 Channel 入站 I/O 事件。
  • ChannelOutBoundHandler:用于处理 Channel 出站 I/O 操做。

适配器:

  • ChannelInboundHandlerAdapter:用于处理出站 I/O 操做。
  • ChanneInboundHandlerAdapter:用于处理入站 I/O 操做。
  • ChannelDuplexHandler:用于处理入站和出站事件。
以客户端应用程序为例:若是事件运动方向是客户端服务器,咱们称之为“出站”,即客户端发送的数据会经过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之称为“入站”。

Pipeline、ChannelPipeline

ChannelPipeline 是一个重点:

  1. ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 Inbound 或者 outbound 的事件和操做。
  2. ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户能够彻底控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
  3. 在 Netty 中每一个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系以下:

ChannelPipeline

  1. 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,而且每一个 ChannelHandlerContext 中又关联了一个 ChannelHandler。
  2. 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 日后传递到最后一个入站的 Handeler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler,两种类型的 Handler 互不干扰。

经常使用方法:

方法 含义
ChannelPipeline addFirst(ChannelHandler... handlers) 把一个业务处理类,放到链表中头结点的位置
ChannelPipeline addLast(ChannelHandler... handlers) 把一个业务处理类,放到链表中尾结点的位置

ChannelHandlerContext

  • 保存 Channel 相关的全部上下文信息,同时关联一个 ChannelHandler 对象。
  • 即 ChannelHandlerContext 中包含一个具体的事件处理器 ChannelHandler,同时 ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用。

经常使用方法:

方法 含义
ChannelFuture close() 关闭通道
ChannelOutboundInvoker flush() 刷新
ChannelFuture writeAndFlush(Object msg) 将数据写入到 ChannelPipeline 中当前 ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

ChannelOption

  1. Netty 在建立 Channel 实例后,通常须要经过 ChannelOption 参数来配置 channel 的相关属性。
  2. ChannelOption 参数以下:

    • ChannelOption.SO_BACKLOG:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可链接队列大小。服务端处理客户端链接请求是顺序处理的,因此同一时间只能处理一个客户端链接,多个客户端来的时候,服务端将不能处理的客户端链接请求放在队列中等待处理,backlog 参数指定了队列的大小。
    • ChannelOption.SO_KEEPALIVE:一直保持链接活动状态。

EventLoopGroup、以及实现类 NioEventLoopGroup

  1. EventLoopGroup 本质上是一个接口(interface),继承了 EventExecutorGroup,经过继承关系分析,能够发现 EventLoopGroup 的实现子类是 MultithreadEventLoopGroup 下的 NioEventLoopGroup。
  2. EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,通常会有多个 EventLoop 同时工做,每一个 EventLoop 维护了一个 selector 实例。
  3. EventLoopGroup 提供next接口,能够从组里按照必定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,咱们通常都须要提供两个 EventLoopGroup,例如 BossEventLoopGroup 和 WorkerEventLoopGroup。
  4. 一般一个服务端口(ServerSocketChannel)对应一个 Selector 和一个 EventLoop 线程。BossEventLoopGroup 负责接收客户端链接并将 SocketChannel 交给 WorkerEventLoopGroup 进行 I/O 处理。

  1. BossEventLoopGroup 一般是一个单线程的 EventLoop,EventLoop 维护了一个注册了 ServerSocketChannel 的 Selector 实例。BossEventLoopGroup 不断轮询 Selector 将链接事件分离出来。
  2. 一般是 OP_ACCEPT 事件,而后将接收的 SocketChannel 交给 WorkerEventLoopGroup。
  3. WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop 将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 I/O 事件进行处理。

Unpooled

  1. Unpolled 类是 Netty 提供的专门用于操做缓冲区(即 Netty 的数据容器)的工具类。
  2. 经过给定的数据和字符编码返回一个 ByteBuf 对象:经常使用方法:public static ByteBuf copierBuffer(CharSequence string, Charset charset)。
ByteBuf buffer = Unpooled.buffer(10);
ByteBuf buf =Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8);

在 Netty 的 buffer 中,读取 buffer 中的数据不须要经过 flip()方法进行状态切换,其底层维护了 readerIndex 和 writerIndex

  • 0 ——> readerIndex :已读区域。
  • readerIndex ——> writerIndex:未读但可读区域。
  • writerIndex ——> capacity:可写区域。
  1. 每调用一次 byteBuf.readByte()读取数据,byteBuf 的 readerIndex 便减小 1;调用 byteBuf.getByte()则不会引发 readerIndex 的变化。
  2. public abstract CharSequence getCharSequence(int index, int length, Charset charset) :的做用是按照某一个范围进行数据的读取,index 表示起始位置,length 表示读取长度,charset 表示字符编码格式。

Netty 实现群聊系统

  1. 服务器端:检测用户上线、离线、转发客户端消息。
  2. 客户端:经过 channel 能够无阻塞发送消息给其余客户,同时能够接收其余客户端发送的消息(服务器转发获得)。

Server 端

public class Server {

    private static final int port = 6667;

    public static void main(String[] args) {
        run();
    }

    /**
     * 处理客户端请求
     */
    public static void run() {
        //建立两个线程组
        NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossLoopGroup, workerLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //增长解码器
                                    .addLast("decoder", new StringDecoder())
                                    //增长编码器
                                    .addLast("encoder", new StringEncoder())
                                    //加入自定义业务处理器
                                    .addLast(new ServerHandler());

                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 定义一个channel 组,管理全部的channel , GlobalEventExecutor.INSTANCE是全局事件执行器,单例模式
     */
    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * 链接创建调用,将当前channel加入channelGroup
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //提示其余客户端当前客户端已上线
        channels.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天!\n");

        channels.add(channel);
    }

    /**
     * 表示channel处于活动状态,提示上线
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + ":已上线!");
    }

    /**
     * 非活动状态提示 离线
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + ":已离线!");
    }

    /**
     * 断开链接
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //提示其余客户端当前客户端已断开链接
        channels.writeAndFlush("[客户端]" + channel.remoteAddress() + "断开链接!\n");
    }

    /**
     * 读取客户端消息并转发
     * @param channelHandlerContext
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        channels.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("[客户]: " + channel.remoteAddress() + sdf.format(new Date()) +" 说:" + msg + "\n");
            } else {
                ch.writeAndFlush(sdf.format(new Date())+" 你说:" + msg + "\n");
            }
        });
    }

    /**
     * 异常关闭
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

Client 端

public class Client {

    private static final String HOST = "127.0.0.1";
    private static final int PORT = 6667;


    public static void main(String[] args) {
        run();
    }

    public static void run() {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //增长解码器
                                    .addLast("decoder", new StringDecoder())
                                    //增长编码器
                                    .addLast("encoder", new StringEncoder())
                                    .addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
            Channel channel = future.channel();
            System.out.println("客户端:" + channel.localAddress() + " 准备就绪");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.nextLine();
                //经过channel发送到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

运行结果

Netty 心跳监测机制案例

客户端同用上面的便可。记得端口对应

Server 端

public class Server {

    public static void main(String[] args) {
        //建立两个线程组
        NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossLoopGroup, workerLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))//在bossLoopGroup 增长日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 加入 IdleStateHandler
                            // 第一个参数 多长时间没读 就发送心跳监测包看是否链接
                            // 第二个参数 多长时间没写 就发送心跳监测包看是否链接
                            // 第三个参数 多长时间没有读写 就发送心跳监测包看是否链接
                            // 第四个参数 时间单位
                            //当 触发后 会传递给管道中的下一个handler来处理,调用下一个handler的userEventTriggered
                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                            //加入空闲检测处理的handler
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }
    }

}

ServerHandler

public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            //将evt转型
            IdleStateEvent event = (IdleStateEvent) evt;
            SocketAddress socketAddress = ctx.channel().remoteAddress();
            switch (event.state()){
                case READER_IDLE:
                    System.out.println(socketAddress + "发生读空闲");
                    break;
                case WRITER_IDLE:
                    System.out.println(socketAddress + "发生写空闲");
                    break;
                case ALL_IDLE:
                    System.out.println(socketAddress + "发生读写空闲");
                    break;
            }

        }
    }
}

运行结果

WebSocket

Http 短链接和长链接

  • Http 短链接即 TCP 短链接,即客户端和服务器经过“三次握手”创建链接后,进行一次 HTTP 操做之后,便断开链接。所以,浏览器每打开一个 web 资源,便建立了一个新的 http 会话。
  • Http 长链接即 TCP 长链接,即客户端和服务器创建链接后保持必定的时间,即便用户在进行某次操做后将浏览器(或客户端)关闭,但只要在保持时间内又一次访问该服务器,则默认使用已经建立好的链接。
  • Http1.0 默认支持短链接,Http1.1 默认支持长链接。

Http 链接无状态

  • Http 协议无状态是指协议对于事务处理没有记忆性,即某一次打开一个服务器的网页和上一次打开这个服务器的网页之间没有关系。

WebSocket 简介

  • WebSocket 是一种能够在单个 TCP 链接上实现全双工通讯的通讯协议,HTTP 协议只能实现客户端请求,服务端响应的单向通讯,而 webSocket 则能够实现服务端主动向客户端推送消息。
  • WebSocket 复用了 HTTP 的握手通道,客户端和服务器的数据交换则遵守升级后的协议进行:WebSocket 相关的业务处理器能够将 HTTP 协议升级为 ws 协议,其核心功能之一为保持稳定的长链接。

代码案例

  • 实现基于 webSocket 的长链接全双工交互。
  • 改变 HTTP 协议屡次请求的约束,实现长链接,服务器能够发送消息给浏览器。
  • 客户端和服务器会相互感知。若服务器关闭,客户端会感知;一样客户端关闭,服务器也会感知。

Server 端

public class Server {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();

                            //由于基于http协议,故使用http的编解码器
                            pipeline.addLast(new HttpServerCodec());
                            //过程当中以块的方式写,添加 ChunkedWriteHandler 处理器
                            pipeline.addLast(new ChunkedWriteHandler());
                            /**
                             * 说明
                             * 一、http数据在传输过程当中是分段的,HttpObjectAggregator 能够将多个数据段整合起来
                             * 二、所以,当浏览器发送大量数据时,就会发出屡次http请求
                             * */
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /**
                             * 说明
                             * 一、对于 WebSocket,它的数据以 帧(Frame)的形式传递
                             * 二、能够看到 WebSocketFrame 下面有6个子类
                             * 三、浏览器请求时 ws://localhost:7000/xxx 表示请求的uri
                             * 四、WebSocketServerProtocolHandler 会把 http 协议升级为ws协议
                             *      即保持长链接----------核心功能
                             * 五、如何升级——经过状态玛切换101
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                            //自定义的 handler 处理业务逻辑
                            pipeline.addLast(new TextWebSocketFrameHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

Handler

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("服务器收到消息:" + textWebSocketFrame.text());
        //回复消息
        channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + textWebSocketFrame.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生"+cause.getMessage());
        ctx.close();
    }
}

HTML

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <title>Title</title>
  </head>
  <body>
    <script>
      var socket;
      //判断当前浏览器是否支持webSocket编程
      if (window.WebSocket) {
        //go on
        socket = new WebSocket("ws://localhost:7000/hello");
        //至关于channelRead0,收到服务器端回送的消息
        socket.onmessage = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + ev.data;
        };
        //至关于链接开启
        socket.onopen = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = "链接开启";
        };

        socket.onclose = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + "链接关闭";
        };
      } else {
        alert("当前浏览器不支持webSocket");
      }
      //发送消息到服务器
      function send(message) {
        if (!window.socket) {
          //先判断socket是否建立好了
          return;
        }
        if (socket.readyState == WebSocket.OPEN) {
          //经过socket发送消息
          socket.send(message);
        } else {
          alert("链接没有开启");
        }
      }
    </script>
    <form onsubmit="return false">
      <textarea name="message" style="height: 300px; width: 300px"></textarea>
      <input
        type="button"
        value="发送消息"
        onclick="send(this.form.message.value)"
      />
      <textarea
        id="responseText"
        style="height: 300px; width: 300px"
      ></textarea>
      <input
        type="button"
        value="清空内容"
        onclick="document.getElementById('responseText').value=''"
      />
    </form>
  </body>
</html>

运行结果

html

服务端

编码和解码

  1. 数据在网络中是以二进制字节码的形式流动,而咱们在接收或发送的数据形式则各类各样(文本、图片、音视频等),所以须要在发送端对数据进行编码,在接收端对收到的数据解码;
  2. codec(编解码器)的组成部分——Encoder(编码器)负责将业务数据转换为二进制字节码;Decoder(解码器)负责将二进制字节码转换为业务数据。
  3. Netty 编码机制——StringEncoder / StringDecoder负责字符串数据对象的编解码;ObjectEncoder / ObjectDecoder负责 java 对象的编解码。
  4. Netty 自带的 ObjectEncoder 和 ObjectDecoder 能够用于实现 POJO 对象或其余业务对象的编解码,其底层使用的还是 java 的序列化技术,存在如下问题:

    • 没法实现客户端与服务器端的跨语言。
    • 序列化体积过大,是二进制字节码的 5 倍多。
    • 序列化性能相对较低。

ProtoBuf 概述

  • ProtoBuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,ProtoBuf 是一种平台无关、语言无关的、可扩展且轻便高效的序列化数据结构的协议,适合用于数据存储和 RPC(远程过程调用)数据交换格式。
  • ProtoBuf 是以Message的方式来管理数据的。
  • 所谓“平台无关、语言无关”,即客户端和服务器可使用不一样的编程语言进行开发。
  • ProtoBuf 具备更高的性能和可靠性。
  • 使用 ProtoBuf 编译器能够自动生成代码,ProtoBuf 是把类的定义使用.proto文件描述出来,在经过 proto.exe 将.proto 文件编译为.java 文件。

protoBuf

ProtoBuf 使用

第一步:idea 加入插件 protoc

第二步:加入 maven 依赖

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>3.6.1</version>
</dependency>

第三步:编写 proto 文件

syntax = "proto2";  //版本
option java_outer_classname = "StudentPOJO"; //生成的外部类名称,同时文件名
//protobuf以message的形式管理数据
message Student{ //会在 studentPOJO 外部类生成一个内部类 Student,它是真正发送的POJO对象
  required int32 id = 1; //表示 Student 类中有一个属性 名字为id,类型为 int32(protoType),1表示属性的序号
  required string name = 2;
}

根据网上教程安装 protobuf。生成 StudnetPOJO 文件,这里就不展现代码了,比较长。

Server 端

public class Server {

    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入ProtoBufferDecoder
                            //指定对哪种对象进行解码
                            pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture cf = serverBootstrap.bind(6668).sync();
            //给 cf 添加监听器,监听感兴趣的事件
            cf.addListener((ChannelFutureListener) future -> {
                if (cf.isSuccess()) {
                    System.out.println("绑定端口 6668 成功");
                } else {
                    System.out.println(cf.cause());
                }
            });
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
      System.out.println("客户端发送: id = " + msg.getId() + " 名字 = " + msg.getName());
    }

}

Client 端

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //发送一个 student 对象到服务器
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1000).setName("Jack").build();
        ctx.writeAndFlush(student);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回送消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器端地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler

public class Client {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //在pipeline中加入ProtoBufferEncoder
                            ChannelPipeline pipeline = ch.pipeline();
                            //编码
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            pipeline.addLast(new ClientHandler());

                        }
                    });
            System.out.println("客户端已准备就绪");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

}

运行结果

服务端

客户端

handler 调用机制

  • ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如:实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),能够接收入站事件和数据,这些数据将被业务逻辑处理;当给客户端回送响应时,也能够经过 ChannelInboundHandler 冲刷数据。业务逻辑一般写在一个或多个 ChannelInboundHandler 中。
  • ChannelOutboundHandler 与之相似,只不过是用来处理出站数据的。
  • ChannelPipeline 提供了 ChannelHandler 链的容器(pipeline.addLast()能够将一系列的 handler 以链表的形式添加),以客户端应用程序为例,若是事件运动方向为客户端->服务器,称之为“出站”,即客户端发送给服务器的数据经过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 handler 处理。反之则称为“入站”。

编码解码器

  1. 当 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节码转换到另外一种格式(好比 Java)。若是是出站消息,它会被编码成字节。
  2. Netty 提供一系列使用的编解码器,它们都实现了 CHannelInboundHandler 或者 ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写。

    以入站为例,对于每一个从入站 Channel 读取的消息,这个方法会被调用。随后,他将调用由解码器所提供的 decode()方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。

消息入站后,会通过 ChannelPipeline 中的一系列 ChannelHandler 处理,这些 handler 中有 Netty 已经实现的,也有咱们从新实现的自定义 handler,但它们都须要实现 ChannelInboundHandler 接口;即消息入站后所通过的 handler 链是由一系列 ChannelInboundHandler 组成的,其中第一个通过的 handler 就是解码器 Decoder;消息出站与入站相似,但消息出站须要通过一系列 ChannelOutboundHandler 的实现类,最后一个通过的 handler 是编码器 Encoder。

解码器 — ByteToMessageDecoder

关系继承图

因为不知道远程节点是否会发送一个完整的信息,TCP 可能出现粘包和拆包的问题。ByteToMessageDecoder 的做用就是对入站的数据进行缓冲,直至数据准备好被处理。

ByteToMessageDecoder 示例分析:

public class ToIntgerDecoder extends ByteToMessageDecoder{
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
                if (in.readableBytes() >= 4) {
                         out.add(in.readint());
                }
        }
}

在此实例中,假设经过 Socket 发送了 8 字节数据,每次入站从 ByteBuf 中读取个 4 字节,将其解码为一个 int,并加入一个 List 中。当没有更多的元素能够被添加到该 List 中时,表明这次发送的数据已发送完成,List 中的全部内容会被发送给下一个 ChannelInboundHandler。Int 在被添加到 List 中时,会被自动装箱为 Intger,调用 readInt()方法前必须验证所输入的 ByteBuf 是否有足够的数据。

代码示例:

  • 使用自定义的编码解码器
  • 客户端能够发送一个 Long 类型的数据给服务器。

Server 端

public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        try {
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer()); //自定义初始化类
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerInitializer 自定义初始化类

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //入站的handler解码
        pipeline.addLast(new ByteToLongDecoder()).addLast(new ServerInboundHandler());
    }
}

ByteToLongDecoder 自定义解码器

public class ByteToLongDecoder extends ByteToMessageDecoder {

    /**
     * @param channelHandlerContext 上下文对象
     * @param byteBuf               入站的ByteBuf
     * @param list                  List集合,将解码后的数据传给下一个Handler
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // Long 大于 8个字节
        if (byteBuf.readableBytes() >= 8) {
            list.add(byteBuf.readLong());
        }
    }
}

ServerInboundHandler 自定义 handler,处理业务

public class ServerInboundHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception {

        System.out.println("从客户端读取:" + aLong);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端

public class Client {

    public static void main(String[] args) {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());//自定义初始化类
            ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientInitializer 客户端自定义初始化类

public class ClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //出站,数据进行编码
        pipeline.addLast(new LongToByteEncoder()).addLast(new ClientHandler());
    }
}

LongToByteEncoder 编码器

public class LongToByteEncoder extends MessageToByteEncoder<Long> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception {
        System.out.println("开始编码,msg = " + aLong);
        byteBuf.writeLong(aLong);
    }
}

ClientHandler 自定义 handler,处理逻辑

public class ClientHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("服务器的ip : " + ctx.channel().remoteAddress());
        System.out.println("收到服务器数据 : " + msg);
    }

    /**
     * 发送数据
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client发送数据");
        ctx.writeAndFlush(12345678L);
    }

}

运行结果

客户端

服务端

其余解码器

  1. LineBasedFrameDecoder:它使用行尾控制字符(\n 或\r\n)做为分割符来解析数据;
  2. DelimiterBasedFrameDecoder:使用自定义的特殊字符做为分隔符;
  3. HttpObjectDecoder:一个 HTTP 数据的解码器;
  4. LengthFieldBasedFrameDecoder:经过指定长度来标识整包信息,这样就能够自动的处理粘包和半包信息

TCP 粘包和拆包基本介绍

  • TCP 是面向链接,面向流,提供高可靠性服务。在消息收发过程当中,须要在发送端和接收端创建对应的 Socket,发送端不会一有数据就进行发送,而是将屡次间隔较小的,数据量较小的数据合并成必定长度的数据包总体发送。这样能够提升效率,但会给接收方分辨单个数据消息增长难度,由于面向流的通讯是没有消息保护边界的。
  • TCP 粘包与拆包,是指发送端在发送多个数据消息时出现的不一样情形。因为数据在发送前须要先转换为二进制字节码,当多个数据消息的字节码被合并成一个数据包发送时,称为粘包;当某个数据消息的字节码被划分到几个数据包内发送时,称为拆包粘包拆包可能使接收端解码数据包时出现错误。
  • TCP 粘包和拆包的解决方案:使用自定义协议+编解码器解决,只要接收端可以知道每次读取数据的长度,就能够按位读取,避免出现读取错误。咱们须要作的就是使接收端知道每次读取数据的长度。

TCP粘包、拆包图解

TCP 粘包拆包代码演示

Server 端

public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        try {
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer()); //自定义初始化类
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerInitializer

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new ServerHandler());
    }
}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);

        //将buffer转换成字符串
        String str = new String(buffer, CharsetUtil.UTF_8);
        System.out.println("服务端接收到数据:" + str);
        System.out.println("服务端接收次数:" + ++count);

        ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8);
        ctx.writeAndFlush(byteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端

public class Client {

    public static void main(String[] args) {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());//自定义初始化类
            ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientInitializer

public class ClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new ClientHandler());
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String str = new String(bytes, CharsetUtil.UTF_8);
        System.out.println("客户端接收到数据: " + str);
        System.out.println("客户端接收次数:" + ++count);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送十条数据
        for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, CharsetUtil.UTF_8);
            ctx.writeAndFlush(byteBuf);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

运行结果

能够看到在第一个客户端没有发生问题,启动第二个客户端后就发生了拆包问题。

服务端

自定义协议解决粘包拆包

  1. 要求客户端发送 5 个 message 对象,客户端每次发送一个 message 对象。
  2. 服务器端每次接收一个 message,分 5 次进行解码,每读取一个 message,会回送一个 message 对象给客户端。

使用自定义协议+编解码器实现具体功能:

具体代码

客户端与服务器主程序与以前相同

MessageProtocol 自定义协议

public class MessageProtocol {

    private int length;  //关键
    private byte[] context;

    public int getLength() {
        return length;
    }

    public byte[] getContext() {
        return context;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public void setContext(byte[] context) {
        this.context = context;
    }

MessageEncoder 自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MessageEncoder encode方法被调用");
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContext());

    }
}

MessageDecoder.自定义解码器

public class MessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MessageDecoder decode方法被调用");
        //将获得的二进制字节码转换为 MessageProtocol 数据包
        int length = in.readInt();
        byte[] content = new byte[length];

        in.readBytes(content);

        //封装成MessageProtocol对象,放入out中交给下一个handler处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(length);
        messageProtocol.setContext(content);

        out.add(messageProtocol);
    }
}

在 ServerInitializer 和 ClientInitializer 中增长 addList()编解码器

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        //接收数据并处理
        int len = msg.getLength();
        byte[] context = msg.getContext();

        System.out.println("服务端接收到信息以下");
        System.out.println("数据长度:"+len);
        System.out.println("内容:"+new String(context, CharsetUtil.UTF_8));

        System.out.println("服务器接收到协议包数量 = "+(++this.count));

        //回复消息
        String response = UUID.randomUUID().toString();
        int responseLen = response.getBytes("utf-8").length;
        byte[] responseBytes = response.getBytes("utf-8");
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(responseLen);
        messageProtocol.setContext(responseBytes);

        ctx.writeAndFlush(messageProtocol);
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端循环发送10条数据

        for (int i=0;i<5;i++){
            String mes = "今天下雨,出门带伞";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));

            int length = mes.getBytes(Charset.forName("utf-8")).length;

            //建立协议包
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLength(length);
            messageProtocol.setContext(content);

            ctx.writeAndFlush(messageProtocol);
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常消息 = "+cause.getMessage());
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLength();
        byte[] msgContext = msg.getContext();

        System.out.println("客户端接收的消息以下:");
        System.out.println("消息长度 = "+len);
        System.out.println("消息内容 = "+new String(msgContext, CharsetUtil.UTF_8));

        System.out.println("客户端接收消息的数量 = "+(++this.count));
    }
}
相关文章
相关标签/搜索