Netty 做为业界最流行的 nio 框架之一,它的健壮性、功能、性能、可定制性、可扩展性都是数一数二的。html
优势:java
目前存在的线程模型有:react
根据 Reactor 的数量和处理资源线程池的数量不一样,有三种不一样实现:web
主从 Reactor 多线程模型
作了必定的改进,其中主从 Reactor 多线程模型有多个 Reactor。模型特色编程
问题分析bootstrap
解决方案浏览器
基于I/O复用模型
:多个链接共用一个阻塞对象,应用程序只须要在一个阻塞对象等待,无需阻塞全部链接,当某个链接有新的数据能够处理时,操做系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。基于线程池复用线程资源
:没必要为每个链接建立线程,将链接完成后的业务处理任务分配给线程进行处理,一个线程能够处理多个链接的业务。核心组成缓存
Reactor
:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序对 I/O 事件做出反应。Handlers
:处理程序执行 I/O 事件要完成的实际事件。Reactor 经过调用适当的处理程序来响应 I/O 事件,处理程序非阻塞操做。优缺点:服务器
优势
:模型简单,无多线程、进程通讯、竞争的问题,所有由一个线程完成。缺点
:性能问题,只有一个线程没法发挥出多核 CPU 的性能,Handler 在处理某链接业务时,整个进程没法处理其余链接事件,容易致使性能瓶颈。缺点
:可靠性问题,线程意外停止,或者进入死循环,会致使整个系统通讯模块不可用,不能接收和处理外部信息,节点故障。使用场景
:客户端数量有限,业务处理快捷(例如 Redis 在业务处理的时间复杂度为 O(1)的状况)。优缺点:网络
优势
:能够充分的利用多核 CPU 的处理能力。缺点
:多线程数据共享、访问操做比较复杂,Reactor 处理全部的事件的监听和响应,由于 Reactor 在单线程中运行,在高并发场景容易出现性能瓶颈。优缺点:
优势
:父线程和子线程的职责明确,父线程只须要接收新链接,子线程完成后续业务处理。优势
:父线程与子线程的数据交互简单,Reactor 主线程是须要把新链接传给子线程,子线程无需返回数据。缺点
:编程复杂度较高。单 Reactor 单线程:前台接待员和服务员是同一我的,全程为顾客服务。
单 Reactor 多线程:一个前台接待员,多个服务员。
主从 Reactor 多线程:多个前台接待员,多个服务员。
服务端端包含 1 个 Boss NioEventLoopGroup 和 1 个 Worker NioEventLoopGroup。
NioEventLoopGroup 至关于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每一个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。
每一个 Boss NioEventLoop 循环执行的任务包含 3 步:
每一个 Worker NioEventLoop 循环执行的任务包含 3 步:
/** * @author jack */ public class SimpleServer { public static void main(String[] args) { //建立bossGroup , 只负责链接请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); //建立workerGroup , 负责客户端业务处理 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //建立服务端启动对象,配置参数. ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workerGroup)//设置线程组 .channel(NioServerSocketChannel.class)//使用NioSocketChannel做为服务端的通道实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列获得链接个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动链接状态 .childHandler(new ChannelInitializer<SocketChannel>() {//建立一个通道测试对象 //给pipeline设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); //自定义handler } });//workerGroup的EventLoop对应的管道设置处理器 System.out.println("服务端准备就绪..."); //绑定一个端口而且同步,生成了一个channelFuture对象 ChannelFuture cf = serverBootstrap.bind(6667).sync(); //对关闭通道进行监听 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
/** * 服务端自定义handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取实际数据(这里咱们能够读取客户端发送的消息) * * @param ctx 上下文对象,含有管道pipeline,通道channel ,地址 * @param msg 客户端发送的内容 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送: " + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址为:" + ctx.channel().remoteAddress()); } /** * 读取完成后 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端", CharsetUtil.UTF_8)); } /** * 处理异常,通常是关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
/** * @author jack */ public class SimpleClient { public static void main(String[] args) { //客户端须要一个事件循环组 NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); //建立客户端启动对象 Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(clientLoopGroup)//设置线程组 .channel(NioSocketChannel.class)//设置客户端通道实现类 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定义处理器 } }); System.out.println("客户端已准备就绪"); //链接服务器 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6667).sync(); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
/** * 客户端自定义handler */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 通道准备就绪时调用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!", CharsetUtil.UTF_8)); } /** * 获取客户端回复 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务端回复: " + buf.toString(CharsetUtil.UTF_8)); } /** * 处理异常,通常是关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
任务队列中的 task 有 3 种使用场景
用户自定义的普通任务
ctx.channel().eventLoop().execute(() -> System.out.println("任务逻辑"));
用户自定义的定时任务
ctx.channel().eventLoop().schedule(() -> System.out.println("任务逻辑..."), 60, TimeUnit.SECONDS);
例如在推送系统的业务线程里面,根据用户的标识,找到对应的 channel 引用,而后调用 write 类方法向该用户推送消息,就会进入到这种场景。最终的 write 会提交到任务队列中后被异步消费。
一个 Netty 应用一般由一个 BootStrap 开始,主要做用是配置整个 Netty 程序,串联各个组件,Netty 中的 BootStrap 类是客户端程序的启动引导类,ServerBootStrap 是服务端启动引导类。
经常使用方法:
方法 | 含义 |
---|---|
public ServerBootstrap group(EventLoopGroup parentGroup , EventLoopGroup childGroup) | 做用于服务器端,用来设置两个 EventLoop |
public B group(EventLoopGroup group) | 做用于客户端,用来设置一个 EventLoopGroup |
public B channel(Class<? extends C> channelClass) | 用来设置一个服务端的通道实现 |
public <T> B option(ChannelOption<T> option, T value) | 用来给 ServerChannel 添加配置 |
public <T> ServerBootStrap childOption (ChannelOption<T> childOption, T value) | 用来给接收到的通道添加配置 |
public ServerBootstrap childHandler (ChannelHandler childHandler) | 用来设置业务处理类(自定义 handler) |
public B handler(ChannelHandler handler) | Handler 则在服务器端自己 bossGroup 中使用 |
public ChannelFuture bind(int inetPort) | 用于服务端,设置占用的端口号 |
public ChannelFuture connect (String inetHost,int inetPort) | 该方法用于客户端,用来链接服务器 |
Netty 中全部操做都是异步的,不能当即得知消息是否被正确处理,但能够过一会等它执行完成或直接注册一个监听器,具体实现经过 Future 和 ChannelFuture,它们能够注册一个监听,当操做执行成功或失败时,监听会自动触发注册的监听事件。
经常使用方法:
方法 | 含义 |
---|---|
Channel channel() | 返回当前正在进行 I/O 操做的通道 |
ChannelFuture sync() | 等待异步操做执行完毕 |
经常使用的 Channel 类型有:
方法 | 含义 |
---|---|
NioSocketChannel | 异步的客户端 TCP Socket 链接 |
NioServerSocketChannel | 异步的服务端 TCP Socket 链接 |
NioDatagramChannel | 异步的 UDP 链接 |
NioStcpChannel | 异步的客户端 Sctp 链接 |
NioSctpServerChannel | 异步的服务端 Sctp 链接 |
ChannelInboundHandler
: 用于处理 Channel 入站 I/O 事件。ChannelOutBoundHandler
:用于处理 Channel 出站 I/O 操做。适配器:
ChannelInboundHandlerAdapter
:用于处理出站 I/O 操做。ChanneInboundHandlerAdapter
:用于处理入站 I/O 操做。ChannelDuplexHandler
:用于处理入站和出站事件。以客户端应用程序为例:若是事件运动方向是客户端服务器,咱们称之为“出站”,即客户端发送的数据会经过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之称为“入站”。
ChannelPipeline 是一个重点:
经常使用方法:
方法 | 含义 |
---|---|
ChannelPipeline addFirst(ChannelHandler... handlers) | 把一个业务处理类,放到链表中头结点的位置 |
ChannelPipeline addLast(ChannelHandler... handlers) | 把一个业务处理类,放到链表中尾结点的位置 |
经常使用方法:
方法 | 含义 |
---|---|
ChannelFuture close() | 关闭通道 |
ChannelOutboundInvoker flush() | 刷新 |
ChannelFuture writeAndFlush(Object msg) | 将数据写入到 ChannelPipeline 中当前 ChannelHandler 的下一个 ChannelHandler 开始处理(出站) |
ChannelOption 参数以下:
ChannelOption.SO_BACKLOG
:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可链接队列大小。服务端处理客户端链接请求是顺序处理的,因此同一时间只能处理一个客户端链接,多个客户端来的时候,服务端将不能处理的客户端链接请求放在队列中等待处理,backlog 参数指定了队列的大小。ChannelOption.SO_KEEPALIVE
:一直保持链接活动状态。ByteBuf buffer = Unpooled.buffer(10); ByteBuf buf =Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8);
在 Netty 的 buffer 中,读取 buffer 中的数据不须要经过 flip()方法进行状态切换,其底层维护了 readerIndex 和 writerIndex
0 ——> readerIndex
:已读区域。readerIndex ——> writerIndex
:未读但可读区域。writerIndex ——> capacity
:可写区域。public abstract CharSequence getCharSequence(int index, int length, Charset charset)
:的做用是按照某一个范围进行数据的读取,index 表示起始位置,length 表示读取长度,charset 表示字符编码格式。Server 端
public class Server { private static final int port = 6667; public static void main(String[] args) { run(); } /** * 处理客户端请求 */ public static void run() { //建立两个线程组 NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossLoopGroup, workerLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //增长解码器 .addLast("decoder", new StringDecoder()) //增长编码器 .addLast("encoder", new StringEncoder()) //加入自定义业务处理器 .addLast(new ServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 定义一个channel 组,管理全部的channel , GlobalEventExecutor.INSTANCE是全局事件执行器,单例模式 */ private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 链接创建调用,将当前channel加入channelGroup * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //提示其余客户端当前客户端已上线 channels.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天!\n"); channels.add(channel); } /** * 表示channel处于活动状态,提示上线 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + ":已上线!"); } /** * 非活动状态提示 离线 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + ":已离线!"); } /** * 断开链接 * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //提示其余客户端当前客户端已断开链接 channels.writeAndFlush("[客户端]" + channel.remoteAddress() + "断开链接!\n"); } /** * 读取客户端消息并转发 * @param channelHandlerContext * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); channels.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("[客户]: " + channel.remoteAddress() + sdf.format(new Date()) +" 说:" + msg + "\n"); } else { ch.writeAndFlush(sdf.format(new Date())+" 你说:" + msg + "\n"); } }); } /** * 异常关闭 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Client 端
public class Client { private static final String HOST = "127.0.0.1"; private static final int PORT = 6667; public static void main(String[] args) { run(); } public static void run() { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //增长解码器 .addLast("decoder", new StringDecoder()) //增长编码器 .addLast("encoder", new StringEncoder()) .addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); Channel channel = future.channel(); System.out.println("客户端:" + channel.localAddress() + " 准备就绪"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); //经过channel发送到服务器端 channel.writeAndFlush(msg + "\r\n"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg.trim()); } }
客户端同用上面的便可。记得端口对应
Server 端
public class Server { public static void main(String[] args) { //建立两个线程组 NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossLoopGroup, workerLoopGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO))//在bossLoopGroup 增长日志处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 加入 IdleStateHandler // 第一个参数 多长时间没读 就发送心跳监测包看是否链接 // 第二个参数 多长时间没写 就发送心跳监测包看是否链接 // 第三个参数 多长时间没有读写 就发送心跳监测包看是否链接 // 第四个参数 时间单位 //当 触发后 会传递给管道中的下一个handler来处理,调用下一个handler的userEventTriggered pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS)); //加入空闲检测处理的handler pipeline.addLast(new ServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends ChannelInboundHandlerAdapter { /** * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ //将evt转型 IdleStateEvent event = (IdleStateEvent) evt; SocketAddress socketAddress = ctx.channel().remoteAddress(); switch (event.state()){ case READER_IDLE: System.out.println(socketAddress + "发生读空闲"); break; case WRITER_IDLE: System.out.println(socketAddress + "发生写空闲"); break; case ALL_IDLE: System.out.println(socketAddress + "发生读写空闲"); break; } } } }
Http 短链接和长链接
Http 链接无状态
WebSocket 简介
代码案例
Server 端
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //由于基于http协议,故使用http的编解码器 pipeline.addLast(new HttpServerCodec()); //过程当中以块的方式写,添加 ChunkedWriteHandler 处理器 pipeline.addLast(new ChunkedWriteHandler()); /** * 说明 * 一、http数据在传输过程当中是分段的,HttpObjectAggregator 能够将多个数据段整合起来 * 二、所以,当浏览器发送大量数据时,就会发出屡次http请求 * */ pipeline.addLast(new HttpObjectAggregator(8192)); /** * 说明 * 一、对于 WebSocket,它的数据以 帧(Frame)的形式传递 * 二、能够看到 WebSocketFrame 下面有6个子类 * 三、浏览器请求时 ws://localhost:7000/xxx 表示请求的uri * 四、WebSocketServerProtocolHandler 会把 http 协议升级为ws协议 * 即保持长链接----------核心功能 * 五、如何升级——经过状态玛切换101 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); //自定义的 handler 处理业务逻辑 pipeline.addLast(new TextWebSocketFrameHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
Handler
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { System.out.println("服务器收到消息:" + textWebSocketFrame.text()); //回复消息 channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + textWebSocketFrame.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生"+cause.getMessage()); ctx.close(); } }
HTML
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <title>Title</title> </head> <body> <script> var socket; //判断当前浏览器是否支持webSocket编程 if (window.WebSocket) { //go on socket = new WebSocket("ws://localhost:7000/hello"); //至关于channelRead0,收到服务器端回送的消息 socket.onmessage = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + ev.data; }; //至关于链接开启 socket.onopen = function (ev) { var rt = document.getElementById("responseText"); rt.value = "链接开启"; }; socket.onclose = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + "链接关闭"; }; } else { alert("当前浏览器不支持webSocket"); } //发送消息到服务器 function send(message) { if (!window.socket) { //先判断socket是否建立好了 return; } if (socket.readyState == WebSocket.OPEN) { //经过socket发送消息 socket.send(message); } else { alert("链接没有开启"); } } </script> <form onsubmit="return false"> <textarea name="message" style="height: 300px; width: 300px"></textarea> <input type="button" value="发送消息" onclick="send(this.form.message.value)" /> <textarea id="responseText" style="height: 300px; width: 300px" ></textarea> <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''" /> </form> </body> </html>
Netty 自带的 ObjectEncoder 和 ObjectDecoder 能够用于实现 POJO 对象或其余业务对象的编解码,其底层使用的还是 java 的序列化技术,存在如下问题:
第一步:idea 加入插件 protoc
第二步:加入 maven 依赖
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
第三步:编写 proto 文件
syntax = "proto2"; //版本 option java_outer_classname = "StudentPOJO"; //生成的外部类名称,同时文件名 //protobuf以message的形式管理数据 message Student{ //会在 studentPOJO 外部类生成一个内部类 Student,它是真正发送的POJO对象 required int32 id = 1; //表示 Student 类中有一个属性 名字为id,类型为 int32(protoType),1表示属性的序号 required string name = 2; }
根据网上教程安装 protobuf。生成 StudnetPOJO 文件,这里就不展现代码了,比较长。
Server 端
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //在pipeline中加入ProtoBufferDecoder //指定对哪种对象进行解码 pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance())); pipeline.addLast(new ServerHandler()); } }); ChannelFuture cf = serverBootstrap.bind(6668).sync(); //给 cf 添加监听器,监听感兴趣的事件 cf.addListener((ChannelFutureListener) future -> { if (cf.isSuccess()) { System.out.println("绑定端口 6668 成功"); } else { System.out.println(cf.cause()); } }); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception { System.out.println("客户端发送: id = " + msg.getId() + " 名字 = " + msg.getName()); } }
Client 端
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送一个 student 对象到服务器 StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1000).setName("Jack").build(); ctx.writeAndFlush(student); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回送消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器端地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ClientHandler
public class Client { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //在pipeline中加入ProtoBufferEncoder ChannelPipeline pipeline = ch.pipeline(); //编码 pipeline.addLast("encoder", new ProtobufEncoder()); pipeline.addLast(new ClientHandler()); } }); System.out.println("客户端已准备就绪"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventExecutors.shutdownGracefully(); } } }
以入站为例,对于每一个从入站 Channel 读取的消息,这个方法会被调用。随后,他将调用由解码器所提供的 decode()方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
消息入站后,会通过 ChannelPipeline 中的一系列 ChannelHandler 处理,这些 handler 中有 Netty 已经实现的,也有咱们从新实现的自定义 handler,但它们都须要实现 ChannelInboundHandler 接口;即消息入站后所通过的 handler 链是由一系列 ChannelInboundHandler 组成的,其中第一个通过的 handler 就是解码器 Decoder;消息出站与入站相似,但消息出站须要通过一系列 ChannelOutboundHandler 的实现类,最后一个通过的 handler 是编码器 Encoder。
因为不知道远程节点是否会发送一个完整的信息,TCP 可能出现粘包和拆包的问题。ByteToMessageDecoder 的做用就是对入站的数据进行缓冲,直至数据准备好被处理。
ByteToMessageDecoder 示例分析:
public class ToIntgerDecoder extends ByteToMessageDecoder{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{ if (in.readableBytes() >= 4) { out.add(in.readint()); } } }
在此实例中,假设经过 Socket 发送了 8 字节数据,每次入站从 ByteBuf 中读取个 4 字节,将其解码为一个 int,并加入一个 List 中。当没有更多的元素能够被添加到该 List 中时,表明这次发送的数据已发送完成,List 中的全部内容会被发送给下一个 ChannelInboundHandler。Int 在被添加到 List 中时,会被自动装箱为 Intger,调用 readInt()方法前必须验证所输入的 ByteBuf 是否有足够的数据。
代码示例:
Server 端
public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); //自定义初始化类 ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
ServerInitializer 自定义初始化类
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //入站的handler解码 pipeline.addLast(new ByteToLongDecoder()).addLast(new ServerInboundHandler()); } }
ByteToLongDecoder 自定义解码器
public class ByteToLongDecoder extends ByteToMessageDecoder { /** * @param channelHandlerContext 上下文对象 * @param byteBuf 入站的ByteBuf * @param list List集合,将解码后的数据传给下一个Handler * @throws Exception */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // Long 大于 8个字节 if (byteBuf.readableBytes() >= 8) { list.add(byteBuf.readLong()); } } }
ServerInboundHandler 自定义 handler,处理业务
public class ServerInboundHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception { System.out.println("从客户端读取:" + aLong); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端
public class Client { public static void main(String[] args) { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ClientInitializer());//自定义初始化类 ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientInitializer 客户端自定义初始化类
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //出站,数据进行编码 pipeline.addLast(new LongToByteEncoder()).addLast(new ClientHandler()); } }
LongToByteEncoder 编码器
public class LongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception { System.out.println("开始编码,msg = " + aLong); byteBuf.writeLong(aLong); } }
ClientHandler 自定义 handler,处理逻辑
public class ClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器的ip : " + ctx.channel().remoteAddress()); System.out.println("收到服务器数据 : " + msg); } /** * 发送数据 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client发送数据"); ctx.writeAndFlush(12345678L); } }
LineBasedFrameDecoder
:它使用行尾控制字符(\n 或\r\n)做为分割符来解析数据;DelimiterBasedFrameDecoder
:使用自定义的特殊字符做为分隔符;HttpObjectDecoder
:一个 HTTP 数据的解码器;LengthFieldBasedFrameDecoder
:经过指定长度来标识整包信息,这样就能够自动的处理粘包和半包信息Server 端
public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); //自定义初始化类 ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
ServerInitializer
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ServerHandler()); } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { byte[] buffer = new byte[buf.readableBytes()]; buf.readBytes(buffer); //将buffer转换成字符串 String str = new String(buffer, CharsetUtil.UTF_8); System.out.println("服务端接收到数据:" + str); System.out.println("服务端接收次数:" + ++count); ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端
public class Client { public static void main(String[] args) { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ClientInitializer());//自定义初始化类 ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientInitializer
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ClientHandler()); } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String str = new String(bytes, CharsetUtil.UTF_8); System.out.println("客户端接收到数据: " + str); System.out.println("客户端接收次数:" + ++count); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送十条数据 for (int i = 0; i < 10; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
能够看到在第一个客户端没有发生问题,启动第二个客户端后就发生了拆包问题。
使用自定义协议+编解码器实现具体功能:
具体代码
客户端与服务器主程序与以前相同
MessageProtocol 自定义协议
public class MessageProtocol { private int length; //关键 private byte[] context; public int getLength() { return length; } public byte[] getContext() { return context; } public void setLength(int length) { this.length = length; } public void setContext(byte[] context) { this.context = context; }
MessageEncoder 自定义编码器
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MessageEncoder encode方法被调用"); out.writeInt(msg.getLength()); out.writeBytes(msg.getContext()); } }
MessageDecoder.自定义解码器
public class MessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MessageDecoder decode方法被调用"); //将获得的二进制字节码转换为 MessageProtocol 数据包 int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封装成MessageProtocol对象,放入out中交给下一个handler处理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(length); messageProtocol.setContext(content); out.add(messageProtocol); } }
在 ServerInitializer 和 ClientInitializer 中增长 addList()编解码器
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收数据并处理 int len = msg.getLength(); byte[] context = msg.getContext(); System.out.println("服务端接收到信息以下"); System.out.println("数据长度:"+len); System.out.println("内容:"+new String(context, CharsetUtil.UTF_8)); System.out.println("服务器接收到协议包数量 = "+(++this.count)); //回复消息 String response = UUID.randomUUID().toString(); int responseLen = response.getBytes("utf-8").length; byte[] responseBytes = response.getBytes("utf-8"); //构建一个协议包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(responseLen); messageProtocol.setContext(responseBytes); ctx.writeAndFlush(messageProtocol); } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客户端循环发送10条数据 for (int i=0;i<5;i++){ String mes = "今天下雨,出门带伞"; byte[] content = mes.getBytes(Charset.forName("utf-8")); int length = mes.getBytes(Charset.forName("utf-8")).length; //建立协议包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(length); messageProtocol.setContext(content); ctx.writeAndFlush(messageProtocol); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常消息 = "+cause.getMessage()); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { int len = msg.getLength(); byte[] msgContext = msg.getContext(); System.out.println("客户端接收的消息以下:"); System.out.println("消息长度 = "+len); System.out.println("消息内容 = "+new String(msgContext, CharsetUtil.UTF_8)); System.out.println("客户端接收消息的数量 = "+(++this.count)); } }