接上文《架构设计:系统间通讯(6)——IO通讯模型和Netty 上篇》java
上篇文章咱们讨论了Netty的基本原理,重要概念,并使用java代码描述了Netty的基本使用。固然Netty的技术涵盖点远远不是那一篇基础代码就能够所有归纳的,可是至少能够给读者一个切入点。让你们去思考一个咱们一直在讨论的问题:为何有了JAVA NIO框架后咱们还须要有Netty这样的框架对底层再次进行封装?python
在前文中咱们已经提到了,几种典型的IO模型(参见系统间通讯三、四、5这三篇文章中的介绍,这里再进行一次总结):linux
阻塞和非阻塞:这个概念是针对应用程序而言,是指应用程序中的线程在向操做系统发送IO请求后,是否一直等待操做系统的IO响应。若是是,那么就是阻塞式的;若是不是,那么应用程序通常会以轮询的方式以必定周期询问操做系统,直到某次得到了IO响应为止(轮序间隔应用程序线程能够作一些其余工做)。c++
同步和异步:IO操做都是由操做系统进行的(这里的IO操做是个普遍概念了:磁盘IO、网络IO都算),不一样的操做系统对不一样设备的IO操做都有不一样的模式。同步和异步这两个概念都指代的操做系统级别,同步IO是指操做系统和设备进行交互时,必须等待一次完整的请求-响应完成,才能进行下一次操做(固然操做系统和设备自己也有不少技术加快这个反应过程,例如“磁盘预读”技术、数据缓存技术);异步IO是指操做系统和设备进行交互时,没必要等待本次获得响应,就能够直接进行下一次操做请求。设备处理完某次请求后,会主动给操做系统相应的响应通知。web
多路复用IO:多路复用IO,从本质上看仍是一种同步IO,由于它没有100%消除IO_WAIT,操做系统也没有为它提供“主动通知”机制。可是多路复用IO的处理速度已经至关快了,利用设备执行IO操做的时间,操做系统能够继续执行IO请求。并一样采用周期性轮询的方式,获取一批IO操做请求的执行响应。操做系统支持的多路复用IO技术主要有select、poll、epoll、kqueue。shell
阻塞式同步IO模型:这个从字面上就很好理解了,应用程序请求IO操做,并一直等待处理结果;操做系统同时也进行IO操做,并等待设备的处理结果;能够看出,应用程序的请求线程和操做系统的内核线程都是等待状态。apache
非阻塞式同步IO模型:应用程序请求IO,而且不用一直等待返回结果就去作其余事情。隔必定的周期,再去询问操做系统上次IO操做有没有结果,直到某一次询问从操做系统拿到IO结果;操做系统内核线程在进行IO操做时,仍是处理一直等待设备返回操做结果的状态。bootstrap
非阻塞式多路复用IO模型:应用程序请求IO的工做采用非阻塞方式进行;操做系统采用多路复用模式工做。windows
非阻塞式异步IO模型:应用程序请求IO的工做采用非阻塞方式进行,可是不须要轮询了,由于操做系统异步IO其中一个主要特性就是:能够在有IO响应结果的时候,主动进行通知。缓存
以上这些IO工做模型,在JAVA中都可以找到对应的支持:传统的JAVA Socket套接字支持阻塞/非阻塞模式下的同步IO(有的技术资料里面也称为OIO或者BIO);JAVA NIO框架在不一样操做系统下支持不一样种类的多路复用IO技术(windows下的select模型、Linux下的poll/epoll模型);JAVA AIO框架支持异步IO(windows下的IOCP和Linux使用epoll的模拟AIO)
实际上Netty是对JAVA BIO 、JAVA NIO框架的再次封装。让咱们再也不纠结于选用哪一种底层实现。您能够理解成Netty/MINA 框架是一个面向上层业务实现进行封装的“业务层”框架。而JAVA Socket框架、JAVA NIO框架、JAVA AIO框架更偏向于对下层技术实现的封装,是面向“技术层”的框架。
“技术层”框架自己只对IO模型技术实现进行了封装,并不关心IO模型中流淌的数据格式;“业务层”框架对数据格式也进行了处理,让咱们能够抽出精力关注业务自己。
Protobuf数据协议的集成:Netty利用自身的Channelpipeline的设计(在《架构设计:系统间通讯(6)——IO通讯模型和Netty 上篇》中讲过),对Protobuf数据协议进行了无缝结合。
JBoss Marshalling数据协议的集成:JBoss Marshalling 是一个Java对象的序列化API包,修正了JDK自带的序列化包的不少问题,又保持跟 java.io.Serializable 接口的兼容。Netty经过封装这个协议,能够帮助咱们在客户端-服务端简便的进行对象系列化和反序列化。
HTTP Request / HTTP Response 协议的集成:在Netty中,能够方便的接受和发送Http协议。也就是说,咱们可使用Netty搭建本身的WEB服务器,固然您还能够根据本身的业务要求,方便的设计相似于Struts、Spring MVC这样的WEB框架。
下面是一个使用Netty的Http编码/解码处理器,设计的一个简单的WEB服务器:
package testNetty;
import java.net.InetSocketAddress;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class TestHTTPNetty {
static {
BasicConfigurator.configure();
}
public static void main(String[] args) throws Exception {
//这就是主要的服务启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//=======================下面咱们设置线程池(代码已经详细讲解过,就再也不赘述了)
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool");
int processorsNumber = Runtime.getRuntime().availableProcessors();
EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());
serverBootstrap.group(bossLoopGroup , workLoogGroup);
//========================下面咱们设置咱们服务的通道类型(代码已经详细讲解过,就再也不赘述了)
serverBootstrap.channel(NioServerSocketChannel.class);
//========================设置处理器
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
/* (non-Javadoc) * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel) */
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//咱们在socket channel pipeline中加入http的编码和解码器
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast(new HTTPServerHandler());
}
});
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 83));
}
}
/** * @author yinwenjie */
@Sharable
class HTTPServerHandler extends ChannelInboundHandlerAdapter {
/** * 日志 */
private static Log LOGGER = LogFactory.getLog(HTTPServerHandler.class);
/** * 因为一次httpcontent可能没有传输彻底部的请求信息。因此这里要作一个连续的记录 * 而后在channelReadComplete方法中(执行了这个方法说明此次全部的http内容都传输完了)进行处理 */
private static AttributeKey<StringBuffer> CONNTENT = AttributeKey.valueOf("content");
/* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/* * 在测试中,咱们首先取出客户端传来的参数、URL信息,而且返回给一个确认信息。 * 要使用HTTP服务,咱们首先要了解Netty中http的格式,以下: * ---------------------------------------------- * | http request | http content | http content | * ---------------------------------------------- * * 因此经过HttpRequestDecoder channel handler解码后的msg多是两种类型: * HttpRquest:里面包含了请求head、请求的url等信息 * HttpContent:请求的主体内容 * */
if(msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest)msg;
HttpMethod method = request.getMethod();
String methodName = method.name();
String url = request.getUri();
HTTPServerHandler.LOGGER.info("methodName = " + methodName + " && url = " + url);
}
//若是条件成立,则在这个代码段实现http请求内容的累加
if(msg instanceof HttpContent) {
StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
if(content == null) {
content = new StringBuffer();
ctx.attr(HTTPServerHandler.CONNTENT).set(content);
}
HttpContent httpContent = (HttpContent)msg;
ByteBuf contentBuf = httpContent.content();
String preContent = contentBuf.toString(io.netty.util.CharsetUtil.UTF_8);
content.append(preContent);
}
}
/* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext) */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
HTTPServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContext ctx)");
/* * 一旦本次http请求传输完成,则能够进行业务处理了。 * 而且返回响应 * */
StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
HTTPServerHandler.LOGGER.info("http客户端传来的信息为:" + content);
//开始返回信息了
String returnValue = "return response";
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders httpHeaders = response.headers();
//这些就是http response 的head信息咯,参见http规范。另外您还能够设置本身的head属性
httpHeaders.add("param", "value");
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
//必定要设置长度,不然http客户端会一直等待(由于返回的信息长度客户端不知道)
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, returnValue.length());
ByteBuf responseContent = response.content();
responseContent.writeBytes(returnValue.getBytes("UTF-8"));
//开始返回
ctx.writeAndFlush(response);
}
}
因为上篇文章中已经介绍了Netty的基本使用方法,因此以上的代码将其余没必要要的注释、方法都去掉了,只作了实现web服务器的最简代码。可是这段代码本省是能够运行的。下面是运行效果:
经过阅读Netty框架的代码,咱们知道了Netty框架至少解决了JAVA NIO框架中的一些Bug:
sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException:
private static String bugLevel = null;
static boolean atBugLevel(String bl) { // package-private
if (bugLevel == null) {
if (!sun.misc.VM.isBooted())
return false;
java.security.PrivilegedAction pa =
new GetPropertyAction("sun.nio.ch.bugLevel");
// the next line can reset bugLevel to null
bugLevel = (String)AccessController.doPrivileged(pa);
if (bugLevel == null)
bugLevel = "";
}
return (bugLevel != null) && bugLevel.equals(bl);
}
Suppose that two threads enter the "if (buglevel == null)" body at the same time. The first one runs until the return line and gets scheduled out right after the (buglevel != null) check. The second one then runs until right after the doPrivileged() call, sets bugLevel to null and gets scheduled out. The first one continues and hits a NullPointerException while calling bugLevel.equals() with bugLevel being null.
这个问题在Netty框架中,负责进行JAVA NIO Selector的NioEventLoop类中获得了解决。
A DESCRIPTION OF THE PROBLEM :
Trying to get all bindings from the transient nameserver brings orbd into a state where it consumes 100% CPU. Its interesting to note that the problem only occurs if the client is programmed in c++. I was not able to reproduce the problem with a client programmed in Java.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
1) Get omniORB 4.1 from http://omniorb.sourceforge.net
2) Compile omniORB (requires python devel package installed)
cd /tmp
mkdir omni_local
tar xvzf omniORB-4.1.0.tar.gz
cd omniORB-4.1.0
./configure --prefix=/tmp/omni_local
make
make install
3) Compile the test program (binding_browser, source attached to this report)
g++ -I/tmp/omni_local/include -L/tmp/omni_local/lib -lomniORB4 -lomniDynamic4 -lomnithread -lpthread -lrt BindingBrowser.cc -o binding_browser
4) Start orbd
<JAVA_HOME>/bin/orbd -ORBInitialPort 12345
5) Start binding_browser (from another shell)
5a) export LD_LIBRARY_PATH=/tmp/omni_local/lib:$LD_LIBRARY_PATH
5b) ./binding_browser -ORBInitRef NameService=corbaloc::1.2@localhost:12345/TNameService
Repeat step 5b until orbd consumes 100% cpu.
这个问题从官方的Bug Database中的描述看,是在JDK7的版本中被解决的。Netty框架在JDK 6+的环境下在JAVA NIO框架封装之上解决了这个Bug。
咱们考虑一下这样的状况:咱们编写了一个机器人控制程序,经过一个遥控器(客户端)向机器人(服务器)创建了一个长链接,并经过这个链接接二连三的从遥控器发送控制指令给机器人。因为是连续控制指令,因此指令与指令之间没有间隔(实际上您还能够想一想不少相似场景,例如:开发的Online对战游戏)。
咱们使用JSON格式做为指令数据的承载格式。那么发送方和接收方的数据发送-接受过程可能以下图所示。
经过上图咱们看到了接收方为了接受这两条连贯的指令,一共作了三次接受,第二次接收的时候,收到了一部分message1的内容和一部分message2的内容。这里要说明几个注意事项:
MSS:MSS属性是TCP链接双方在三次握手时所确认的每个TCP报文段中数据字段的最大长度。注意,一是链接双方协商出来的;二是只是数据段的最大长度,不包括IP协议头和TCP协议头的最大长度。
半包是指接收方应用程序在接收信息时,没有接收到一个完成的信息格式块;粘包是指,接收方应用程序在接受信息时,除了接收到发送方应用程序发送的某一个完整数据信息描述外,还接受到了一下发送方应用程序发送的下一个数据信息的一部分。
半包和粘包是针对应用程序来讲的,这个问题只会发生在TCP一些进行连续发送数据时(TCP长链接)。UDP不会出现这个问题,由于UDP都是有边界的数据报;TCP短链接也不会出现,由于发送完一个指令信息后链接就断开了,不会发送第二个指令数据。
半包和粘包问题产生的根本是由于TCP本质上没有“数据块”的概念,而是一连串的数据流。在应用程序层面上咱们所定义的“数据块”在TCP层面上并不被协议承认。
半包/粘包是一个应用层问题。要解决半包/粘包问题,就是在应用程序层面创建协商一致的信息还原依据。常见的有两种方式:一是消息定长,即保证每个完整的信息描述的长度都是必定的,这样不管TCP/IP协议如何进行分片,数据接收方均可以按照固定长度进行消息的还原。二是在完整的一块数据结束后增长协商一致的分隔符(例如增长一个回车符)。
在JAVA NIO技术框架中,半包和粘包问题咱们须要本身解决,若是使用Netty框架,它其中提供了多种解码器的封装帮助咱们解决半包和粘包问题。甚至针对不一样的数据格式,Netty都提供了半包和粘包问题的现成解决方式,例如以前咱们提到的ProtobufVarint32FrameDecoder解码器,就是专门解决Protobuf数据格式在TCP长链接传输时的半包问题的。
下文中咱们会介绍FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LineBasedFrameDecoder来解决半包/粘包的问题。
因为上文中咱们已经经过完整的代码演示了Netty的基本使用,因此下面的示例代码中为了节约篇幅,我只会列出重要的代码片断。
FixedLengthFrameDecoder解码处理器将TCP/IP的数据按照指定的长度进行从新拆分,若是接收到的数据不知足设置的固定长度,Netty将等待新的数据到达:
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new TCPServerHandler());
ch.pipeline().addLast(new ByteArrayDecoder());
}
});
Netty上层的channelRead事件方法将在Channel接收到20个字符的状况下被触发;而若是剩余的内容不到20个字符,channelRead方法将不会被触发(但注意channelReadComplete方法会触发的啦)。
LineBasedFrameDecoder解码器,基于最简单的“换行符”进行接收到的信息的再组织。windows和linux两个操做系统中的“换行符”是不同的,LineBasedFrameDecoder解码器都支持。固然这个解码器没有咱们后面介绍的DelimiterBasedFrameDecoder解码器灵活。
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(new LineBasedFrameDecoder(100));
ch.pipeline().addLast(new TCPServerHandler());
ch.pipeline().addLast(new ByteArrayDecoder());
}
});
那么若是客户端发送的数据是:
this is 0 client \r\n request 1 \r\n”
那么接收方从新经过“换行符”从新组织后,将分两次接受到数据:
this is 0 client
request 1
DelimiterBasedFrameDecoder是按照“自定义”分隔符(也能够是“回车符”或者“空字符”注意windows系统中和linux系统中“回车符”的表示是不同的)进行信息的从新拆分。
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1500, false, Delimiters.lineDelimiter()));
ch.pipeline().addLast(new TCPServerHandler());
ch.pipeline().addLast(new ByteArrayDecoder());
}
});
DelimiterBasedFrameDecoder有三个参数,这里介绍一下:
DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters)
maxFrameLength:最大分割长度,若是接收方在一段长度 大于maxFrameLength的数据段中,没有找到指定的分隔符,那么这个处理器会抛出TooLongFrameException异常。
stripDelimiter:这个是一个布尔型参数,指代是否保留指定的分隔符。
delimiters:设置的分隔符。通常使用Delimiters.lineDelimiter()或者Delimiters.nulDelimiter()。固然您也能够自定义分隔符,定义成bytebuf的类型就好了。
Netty框架的特性,使咱们不须要关心下层所工做的IO模型,利用Netty提供的面向事件驱动的方法结构,使咱们更能集中精力关注应用层业务。
在这5篇文章中,咱们重点介绍了几种典型的IO模型,而且介绍了JAVA语言对这几种IO模型的实现,最后咱们简单介绍了一下Netty框架,而且比较了JAVA NIO框架和Netty框架侧重点。实际上这几篇文章咱们讲述的问题只有一个“信息如何进行传递”。
从下一篇文章开始,咱们开始介绍JAVA RMI技术,并从JAVA RMI技术引出一项系统间通讯重要的技术RPC,咱们还会降到RPC的重要实现 TaoBao-Dubbo框架,最后咱们讲解ESB技术和几个典型的ESB实现。这些技术要解决的问题是“传递过程若是进行统筹管理”。