《Java 编写基于 Netty 的 RPC 框架》

一 简单概念java

RPC: ( Remote Procedure Call),远程调用过程,是经过网络调用远程计算机的进程中某个方法,从而获取到想要的数据,过程如同调用本地的方法同样.git

阻塞IO :当阻塞I/O在调用InputStream.read()方法是阻塞的,一直等到数据到来时才返回,一样ServerSocket.accept()方法时,也是阻塞,直到有客户端链接才返回,I/O通讯模式以下:github

缺点:当客户端多时,会建立大量的处理线程,而且为每个线程分配必定的资源;阻塞可能带来频繁切换上下文,这时引入NIO面试

NIO : jdk1.4引入的(NEW Input/Output),是基于经过和缓存区的I/O方式,(插入一段题外话,学的多忘得也多,以前有认真研究过NIO,后来用到的时候,忘得一干二净,因此学习一些东西,常常返回看看),NIO是一种非阻塞的IO模型,经过不断轮询IO事件是否就绪,非阻塞是指线程在等待IO的时候,能够作其余的任务,同步的核心是Selector,Selector代替线程本省的轮询IO事件,避免了阻塞同时减小了没必要要的线程消耗;非阻塞的核心是通道和缓存区,当IO事件的就绪时,能够将缓存区的数据写入通道sql

其工做原理:编程

1 由专门的线程来处理全部的IO事件,而且负责转发数组

2 事件驱动机制:事件到的时候才触发,而不是同步监视缓存

3 线程通信:线程之间通信经过wait,notify等方式通信,保证每次上下文切换都是有意义的,减小不必的线程切换bash

通道 : 是对原I/O包中流的模拟,全部数据必须经过Channel对象,常见的通道FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP协议向网络链接的两端读写数据)服务器

Buffer缓存区 :其实是一个容器,一个连续的数组,任何读写的数据都通过Buffer

Netty :是由JBOSS提供的一个java开源框架,是一个高性能,异步事件驱动的NIO框架,基于JAVA NIO提供的API实现,他提供了TCP UDP和文件传输的支持,,全部操做都是异步非阻塞的.经过Futrue-Listener机制,本质就是Reactor模式的现实,Selector做为多路复用器,EventLoop做为转发器,并且,netty对NIO中buffer作优化,大大提升了性能

二 Netty 客户端和服务端的

Netty中Bootstrap和Channel的生命周期

Bootstrap简介

Bootstarp:引导程序,将ChannelPipeline,ChannelHandler,EventLoop进行总体关联

Bootstrap具体分为两个实现

ServerBootstrap:用于服务端,使用一个ServerChannel接收客户端的链接,并建立对应的子Channel

Bootstrap:用于客户端,只须要一个单独的Channel,配置整个Netty程序,串联起各个组件

两者的主要区别:

1 ServerBootstrap用于Server端,经过调用bind()绑定一个端口监听链接,Bootstrap用于Client端,须要调用connect()方法来链接服务器端,咱们也能够调用bind()方法接收返回ChannelFuture中Channel

2 客户端的Bootstrap通常用一个EventLoopGroup,而服务器的ServerBootstrap会用两个第一个EventLoopGroup专门负责绑定到端口监听链接事件,而第二个EventLoopGroup专门用来到处理每一个接收的链接,这样大大提升了并发量

复制代码
public class Server { public static void main(String[] args) throws Exception { // 1 建立线两个事件循环组 // 一个是用于处理服务器端接收客户端链接的 // 一个是进行网络通讯的(网络读写的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 建立辅助工具类ServerBootstrap,用于服务器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 绑定俩个线程组 .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持链接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel创建链接后的管道 // 3 在这里配置 通讯数据的处理逻辑, 能够addLast多个... sc.pipeline().addLast(new ServerHandler()); } }); // 4 绑定端口, bind返回future(异步), 加上sync阻塞在获取链接处 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); //能够绑定多个端口 // 5 等待关闭, 加上sync阻塞在关闭请求处 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Server :" + body ); String response = "返回给客户端的响应:" + body ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); // future完成后触发监听器, 此处是写完即关闭(短链接). 所以须要关闭链接时, 要经过server端关闭. 直接关闭用方法ctx[.channel()].close() //.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("读完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } } public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可使用多个端口 //发送消息, Buffer类型. write须要flush才发送, 可用writeFlush代替 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes())); cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } } public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Client :" + body ); } finally { // 记得释放xxxHandler里面的方法的msg参数: 写(write)数据, msg引用将被自动释放不用手动处理; 但只读数据时,!必须手动释放引用数 ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } 复制代码

其余组件:

Handle: 为了支持各类协议和处理数据的方式,能够是链接,数据接收,异常,数据格式转换等

ChannelHandler

ChannelInboundHandler :最经常使用的Handler,做用是处理接收数据的事件,来处理咱们的核心业务逻辑。

ChannelInitializer :,当一个连接创建时,咱们须要知道怎么来接收或者发送数据,固然,咱们有各类各样的Handler实现来处理它,那么ChannelInitializer即是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。

ChannelPipeline :一个Netty应用基于ChannelPipeline机制,这种机制依赖EventLoop和EventLoopGroup,这三个都和事件或者事件处理相关

EventLoop : 为Channel处理IO操做,一个EventLoop能够为多个Channel服务

EventLoopGroup :包含多个EventLoop

Channel :表明一个Socket链接

Future :在Netty中全部的IO操做都是异步的,,所以咱们不知道,过来的请求是否被处理了,因此咱们注册一个监听,当操做执行成功或者失败时监听自动触发,全部操做都会返回一个ChannelFutrue

ChannelFuture

Netty 是一个非阻塞的,事件驱动的,网络编程框架,咱们经过一张图理解一下,Channel,EventLoop以及EventLoopGroup之间的关系

解释一下,当一个链接过来,Netty首先会注册一个channel,而后EventLoopGroup会分配一个EventLoop绑定到这个channel,在这个channel的整个生命周期过程当中,这个EventLoop一直为他服务,这个玩意就是一个线程

这下聊一下Netty如何处理数据?

前面有讲到,handler数据处理核心,,而ChannelPipeline负责安排Handler的顺序和执行,咱们能够这样理解,数据在ChannelPipeline中流动,其中ChannelHandler就是一个个阀门,这些数据都会通过每个ChannelHandler而且被他处理,其中ChannelHandler的两个子类ChannelOutboundHandler和ChannelInboundHandler,根据不一样的流向,选择不一样的Handler

由图能够看出,一个数据流进入ChannelPipeline时,一个一个handler挨着执行,各个handler的数据传递,这须要调用方法中ChannelHandlerContext来操做,而这个ChannelHandlerContext能够用来读写Netty中的数据流

三 Netty中的业务处理

netty中会有不少Handler.具体哪种Handler还要看继承是InboundAdapter仍是OutboundAdapter,Netty中提供一系列的Adapter来帮助咱们简化开发,在ChannelPipeline中的每个handler都负责把Event传递个洗下一个handler,有这些adapter,这些工做能够自动完成,,咱们只需覆盖咱们真正实现的部分便可,接下来比较经常使用的三种ChannelHandler

Encoders和Decodeers

咱们在网络传输只能传输字节流,在发送数据时,把咱们的message转换成bytes这个过程叫Encode(编码),相反,接收数据,须要把byte转换成message,这个过程叫Decode(解码)

Domain Logic

咱们真正关心的如何处理解码之后的数据,咱们真正的业务逻辑即是接收处理的数据,Netty提供一个经常使用的基类就是SimpleChannelInboundHandler<T>,其中T就是Handler处理的数据类型,消息到达这个Handler,会自动调用这个Handler中的channelRead0(ChannelHandlerContext,T)方法,T就是传过来的数据对象

四 基于netty实现的Rpc的例子

这是个人github上项目的位置

https://github.com/developerxiaofeng/rpcByNetty

项目目录结构以下

详细的项目细节看类中的注释,很详细哦。

获取资料:

最后给你们分享一些学习资料,里面包括:(BATJ面试资料、高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)和Java进阶学习路线图。

领取方式:加微信号 weixin99ting 备注 :(资料) 便可获取。

最后,祝你们早日学有所成!

相关文章
相关标签/搜索