本章介绍java
EventLoopbootstrap
从EventLoop注册和取消注册网络
在Netty中使用旧的Socket和Channel框架
Netty提供了一个简单的方法来链接Socket/Channel,这是在Netty以外建立并转移他们的责任到Netty。这容许你将遗留的集成框架以无缝方式一步一步迁移到Netty;Netty还容许取消注册的通道来中止处理IO,这能够暂停程序处理并释放资源。异步
这些功能在某些状况或某种程度上可能不是很是有用,但使用这些特性能够解决一些困难的问题。举个例子,有一个很是受欢迎的社交网络,其用户增加很是快,系统程序须要处理每秒几千个交互或消息,若是用户持续增加,系统将会处理每秒数以万计的交互;这很使人兴奋,但随着用户数的增加,系统将消耗大量的内存和CPU而致使性能低下;此时最须要作的就是改进他们,而且不要花太多的钱在硬件设备上。这种状况下,系统必须保持功能正常能处理日益增加的数据量,此时,注册/注销事件循环就派上用场了。ide
经过容许外部Socket/Channel来注册和注销,Netty可以以这样的方式改进旧系统的缺陷,全部的Netty程序均可以经过一种有效精巧的方式整合到现有系统,本章将重点讲解Netty是如何整合。oop
16.1 注册和取消注册的Channel和Socket性能
前面章节讲过,每一个通道须要注册到一个EventLoop来处理IO或事件,这是在引导过程当中自动完成。下图显示了他们的关系:this
上图只是显示了他们关系的一部分,通道关闭时,还须要将注册到EventLoop中的Socket/Channel注销以释放资源。netty
有时不得不处理java.nio.channels.SocketChannel或其余java.nio.channes.Channel实现,这多是遗留程序或框架的一些缘由所致。咱们可使用Netty来包装预先建立的java.nio.channels.Channel,而后再注册到EventLoop。咱们可使用Netty的全部特性,同时还能重用现有的东西。下面代码显示了此功能:
//nio java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open(); //netty SocketChannel ch = new NioSocketChannel(mySocket); EventLoopGroup group = new NioEventLoopGroup(); //register channel ChannelFuture registerFuture = group.register(ch); //de-register channel ChannelFuture deregisterFuture = ch.deregister();
Netty也适用于包装OIO,看下面代码:
//oio Socket mySocket = new Socket("www.baidu.com", 80); //netty SocketChannel ch = new OioSocketChannel(mySocket); EventLoopGroup group = new OioEventLoopGroup(); //register channel ChannelFuture registerFuture = group.register(ch); //de-register channel ChannelFuture deregisterFuture = ch.deregister();
只有2个重点以下:
使用Netty包装已建立的Socket或Channel必须使用与之对应的实现,如Socket是OIO,则使用Netty的OioSocketChannel;SocketChannel是NIO,则使用NioSocketChannel。
EventLoop.register(...)和Channel.deregister(...)都是非阻塞异步的,也就是说它们可能不会理解执行完成,可能稍后完成。它们返回ChannelFuture,咱们在须要进一步操做或确认完成操做时能够添加一个ChannelFutureLister或在ChannelFuture上同步等待至完成;选择哪种方式看实际需求,通常建议使用ChannelFutureLister,应避免阻塞。
16.2 挂起IO处理
在一些状况下可能须要中止一个指定通道的处理操做,好比程序耗尽内存、崩溃、失去一些消息,此时,咱们能够中止处理事件的通道来清理系统资源,以保持程序稳定继续处理后续消息。若这样作,最好的方式就是从EventLoop取消注册的通道,这能够有效阻止通道再处理任何事件。若须要被取消的通道再次处理事件,则只须要将该通道从新注册到EventLooop便可。看下图:
看下面代码:
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //remove this ChannelHandler and de-register ctx.pipeline().remove(this); ctx.deregister(); } }); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.baidu.com", 80)).sync(); //.... Channel channel = future.channel(); //re-register channel and add ChannelFutureLister group.register(channel).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()){ System.out.println("Channel registered"); }else{ System.out.println("register channel on EventLoop fail"); future.cause().printStackTrace(); } } });
16.3 迁移通道到另外一个事件循环
另外一个取消注册和注册一个Channel的用例是将一个活跃的Channel移到另外一个EventLoop,有下面一些缘由可能致使须要这么作:
当前EventLoop太忙碌,须要将Channel移到一个不是很忙碌的EventLoop;
终止EventLoop释放资源同时保持活跃Channel能够继续使用;
迁移Channel到一个执行级别较低的非关键业务的EventLoop中。
下图显示迁移Channel到另外一个EventLoop:
看下面代码:
EventLoopGroup group = new NioEventLoopGroup(); final EventLoopGroup group2 = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // remove this channel handler and de-register ctx.pipeline().remove(this); ChannelFuture f = ctx.deregister(); // add ChannelFutureListener f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // migrate this handler register to group2 group2.register(future.channel()); } }); } }); ChannelFuture future = b.connect("www.baidu.com", 80); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("connection established"); } else { System.out.println("connection attempt failed"); future.cause().printStackTrace(); } } });