Netty概述:
一、netty是基于Java NIO的网络应用框架,client-server框架
二、Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,
做为一个异步NIO框架,Netty的全部IO操做都是异步非阻塞的,
经过Future-Listener机制,用户能够方便的主动获取或者经过通知机制得到IO操做结果。
三、做为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通讯行业等得到了普遍的应用,
一些业界著名的开源组件也基于Netty的NIO框架构建。
java
Netty建立步骤:web
NIO通信服务端步骤:
一、建立ServerSocketChannel,为它配置非阻塞模式
二、绑定监听,配置TCP参数,录入backlog大小等
三、建立一个独立的IO线程,用于轮询多路复用器Selector
四、建立Selector,将以前的ServerSocketChannel注册到Selector上,并设置监听标识位SelectionKey.ACCEPT
五、启动IO线程,在循环体中执行Selector.select()方法,轮询就绪的通道
六、当轮询处处于就绪的通道时,须要进行判断操做位,若是是ACCEPT状态,说明是新的客户端介入,则调用accept方法接受新的客户端。
七、设置新接入客户端的一些参数,并将其通道继续注册到Selector之中。设置监听标识等
八、若是轮询的通道操做位是READ,则进行读取,构造Buffer对象等
九、更细节的还有数据没发送完成继续发送的问题
Netty实现通信的步骤:
一、建立两个NIO线程组,一个专门用来网络事件处理(接受客户端链接),另外一个则进行网络通信读写
二、建立一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传入数据的缓存大小等。
三、建立一个实际处理数据的类ChannelInitializer,进行初始化的准备工做,好比设置传入数据的字符集,格式,实现实际处理数据的接口。
四、绑定端口,执行同步阻塞方法等待服务器启动便可。
当对于NIO模型,netty简单、健壮、性能稳定,并且这几步都是模板式开发,之后能够直接用,开发只需专一实际处理数据类的实现。
Netty最佳实践(数据通信、心跳检测)
netty服务最好能够单独做为一个项目,固然也能够与web项目集成在一块儿发布到tomcat,
这样好处是能够用到web项目中的service方法,可是web项目8080关闭,netty监听的端口号也关闭了
因此netty能够打成jar包运行,固然若是要用到service层的代码,也能够将service层的代码打成jar包
给netty业务类使用。
netty通信的方式:
①使用长链接通道不断开的形式进行通讯,也就是服务器和客户端的通道一直处于开启状态,若是服务器的
性能比较好,并且客户端的数量也很少的状况下,能够考虑这种方式
②一次性批量提交数据,采用短链接的方式,也就是咱们把数据保存在本地临时缓冲区或者临时表中,
当达到临界值时进行一次性批量提交,又或者根据定时任务轮询提交,这种状况下弊端是作不到
实时性传输,在实时性要求不高的程序中能够采用
③采用一种特殊的长链接,在指定某一段时间以内,服务端和某台客户端没有任何通信,则断开链接,
数据库
下次若是客户端要向服务端发送数据时,再次创建链接。json
但有两个因素要考虑:bootstrap
一、如何在超时(即服务端和客户端没有任何通讯)后关闭通道?关闭后如何再次链接?数组
二、客户端宕机,无需考虑,下次客户端重启后能够与服务端创建链接,可是服务器宕机怎么办?缓存
服务端代码Server:tomcat
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); //线程组:用来处理网络事件处理(接受客户端链接) EventLoopGroup cGroup = new NioEventLoopGroup(); //线程组:用来进行网络通信读写 //Bootstrap用来配置参数 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) //注册服务端channel /** * BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时, * 用于临时存放已完成三次握手的请求的队列的最大长度。若是未设置或所设置的值小于1,将使用默认值50。 * 服务端处理客户端链接请求是顺序处理的,因此同一时间只能处理一个客户端链接,多个客户端来的时候, * 服务端将不能处理的客户端链接请求放在队列中等待处理,backlog参数指定了队列的大小 */ .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { //marshaliing的编解码操做,要传输对象,必须编解码 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //5s没有交互,就会关闭channel sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); //服务端业务处理类 } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
客户端代码:服务器
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通讯,则会关闭响应的通道,主要为减少服务端资源占用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); //客户端业务处理类 } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经链接, 能够进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ //若是管道没有被开启或者被关闭了,那么重连 if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ //客户端发送的数据 UserParam request = new UserParam(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } //当5s没有交互,就会异步关闭channel cf.channel().closeFuture().sync(); //再模拟一次传输 new Thread(new Runnable() { @Override public void run() { try { ChannelFuture cf = c.getChannelFuture(); //System.out.println(cf.channel().isActive()); //System.out.println(cf.channel().isOpen()); //再次发送数据 UserParam request = new UserParam(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开链接,主线程结束.."); } }
服务端处理类:网络
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接受客户端对象 UserParam user = (UserParam)msg; System.out.println("客户端发来的消息 : " + user.getId() + ", " + user.getName() + ", " + user.getRequestMessage()); //给客户端返回对象 UserData response = new UserData(); response.setId(user.getId()); response.setName("response" + user.getId()); response.setResponseMessage("响应内容" + user.getId()); ctx.writeAndFlush(response); //处理完毕,关闭服务端 //ctx.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客户端处理类:
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { UserData user = (UserData)msg; System.out.println("服务器返回的消息 : " + user.getId() + ", " + user.getName() + ", " + user.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客户端传输的参数对象UserParam -- > id name requestMessage
服务端传输的参数对象UserData -- > id name responseMessage
心跳检测:
Server代码,Client代码是模板代码,基本都同样,不一样是业务处理的方法。
Server业务处理类ServerHeartBeatHandler:
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.HashMap; public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** * key:ip value:auth ** * 拥有的客户端列表,从数据库中或者配置文件中读取 */ private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>(); //模拟受权的key private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("192.168.1.200", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { //接受客户端发来的他机器的性能参数 RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu状况: "); HashMap<String, Object> cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory状况: "); HashMap<String, Object> memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); //通知客户端消息已收到 ctx.writeAndFlush("info received!"); }else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } } }
Client业务处理类ClienHeartBeattHandler:
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import java.net.InetAddress; import java.util.HashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.hyperic.sigar.CpuPerc; import org.hyperic.sigar.Mem; import org.hyperic.sigar.Sigar; public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat; //主动向服务器发送认证信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap<String, Object> memoryMap = new HashMap<String, Object>(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } }
netty编解码技术:
java序列化技术,序列化目的:
①网络传输(网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式)
②对象持久化(对象必须在JVM中存活,不可能超过JVM的生命周期)
虽然咱们可使用java进行对象序列化,netty去传输,可是java序列化的硬伤太多:
1.没法跨语言。这应该是java序列化最致命的问题了。
因为java序列化是java内部私有的协议,其余语言不支持,致使别的语言没法反序列化,这严重阻碍了它的应用。
关于跨语言问题,也就是对象传输,通常都采用json字符串。
2.序列后的码流太大。java序列化的大小是二进制编码的5倍多!
3.序列化性能过低。java序列化的性能只有二进制编码的6.17倍,可见java序列化性能实在太差了。
咱们判断一个编码框架的优劣主要从如下几个方面:
1.是否支持跨语言,支持语种是否丰富
2.编码后的码流
3.编解码的性能
4.类库是否小巧,API使用是否方便
5.使用者开发的工做量和难度。
java序列化前3条变现太差,致使在远程服务调用中不多用它
主流的编解码框架:
①JBoss的Marshalling包:
对jdk默认的序列化进行了优化,又保持跟java.io.Serializable接口的兼容,同时增长了一些可调的参数和附加特性,
而且这些参数和特性可经过工厂类的配置
1.可拔插的类解析器,提供更加便捷的类加载定制策略,经过一个接口便可实现定制。
2.可拔插的对象替换技术,不须要经过继承的方式。
3.可拔插的预约义类缓存表,能够减小序列化的字节数组长度,提高经常使用类型的对象序列化性能。
4.无须实现java.io.Serializable接口
5.经过缓存技术提高对象的序列化性能。
6.使用很是简单
②google的Protobuf
③基于Protobuf的Kyro
④MessagePack框架
Marshalling工具类:
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先经过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识建立的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration建立provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
RPC(Remote Procedure Call):
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,
因为不在一个内存空间,不能直接调用,须要经过网络来表达调用的语义和传达调用的数据。
好比远程调用方法:Employee getEmployeeByName(String fullName)
一、要解决通信的问题,主要是经过在客户端和服务器之间创建TCP链接,远程过程调用的全部交换的数据都在这个链接里传输。
链接能够是按需链接,调用结束后就断掉,也能够是长链接,多个远程过程调用共享同一个链接。
二、要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,
如何链接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。
好比基于Web服务协议栈的RPC,就要提供一个endpoint URI,或者是从UDDI服务上查找。
若是是RMI调用的话,还须要一个RMI Registry来注册服务的地址。
三、要解决编码的问题,当A服务器上的应用发起远程过程调用时,方法的参数须要经过底层的网络协议如TCP传递到B服务器,
因为网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize)或编组(marshal),
经过寻址和传输将序列化的二进制发送给B服务器。
四、要解决解码的问题,B服务器收到请求后,须要对参数进行反序列化(序列化的逆操做),恢复为内存中的表达方式,
而后找到对应的方法(寻址的一部分)进行本地调用,而后获得返回值。
五、返回值还要发送回服务器A上的应用,也要通过序列化的方式发送,服务器A接到后,再反序列化,
恢复为内存中的表达方式,交给A服务器上的应用
为何RPC呢?就是没法在一个进程内,甚至一个计算机内经过本地调用的方式完成的需求,好比不一样的系统间的通信,甚至不一样的组织间的通信。因为计算能力须要横向扩展,须要在多台机器组成的集群上部署应用。而Netty框架不局限于RPC,更多的是做为一种网络协议的实现框架,因为RPC须要高效的网络通讯,就可能选择以Netty做为基础 --------------------- 本文来自 wive 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/javadhh/article/details/66477423?utm_source=copy