原创: 老刘 码农翻身 2017-11-20html
随着移动互联网的爆发性增加,小明公司的电子商务系统访问量愈来愈大,因为现有系统是个单体的巨型应用,已经没法知足海量的并发请求,拆分势在必行。java
在微服务的大潮之中, 架构师小明把系统拆分红了多个服务,根据须要部署在多个机器上,这些服务很是灵活,能够随着访问量弹性扩展。linux
世界上没有免费的午饭, 拆分红多个“微服务”之后虽然增长了弹性,但也带来了一个巨大的挑战:服务之间互相调用的开销。git
好比说:原来用户下一个订单须要登陆,浏览产品详情,加入购物车,支付,扣库存等一系列操做,在单体应用的时候它们都在一台机器的同一个进程中,说白了就是模块之间的函数调用,效率超级高。 程序员
如今好了,服务被安置到了不一样的服务器上,一个订单流程,几乎每一个操做都要越网络,都是远程过程调用(RPC), 那执行时间、执行效率可远远比不上之前了。github
远程过程调用的初版实现使用了HTTP协议,也就是说各个服务对外提供HTTP接口。 小明发现,HTTP协议虽然简单明了,可是废话太多,仅仅是给服务器发个简单的消息都会附带一大堆无用信息:面试
GET /orders/1 HTTP/1.1 算法
Host: order.myshop.com数据库
User-Agent: Mozilla/5.0 (Windows NT 6.1; )编程
Accept: text/html;
Accept-Language: en-US,en;
Accept-Encoding: gzip
Connection: keep-alive
......
看看那User-Agent,Accept-Language ,这个协议明显是为浏览器而生的!可是我这里是程序之间的调用,用这个HTTP有点亏。
能不能自定义一个精简的协议? 在这个协议中我只须要把要调用方法名和参数发给服务器便可,根本不用这么多乱七八糟的额外信息。
可是自定义协议客户端和服务器端就得直接使用“低级”的Socket了,尤为是服务器端,得可以处理高并发的访问请求才行。
小明复习了一下服务器端的socket编程,最先的Java是所谓的阻塞IO(Blocking IO), 想处理多个socket的链接的话须要建立多个线程, 一个线程对应一个。
这种方式写起来却是挺简单的,可是链接(socket)多了就受不了了,若是真的有成千上万个线程同时处理成千上万个socket,占用大量的空间不说,光是线程之间的切换就是一个巨大的开销。
更重要的是,虽然有大量的socket,可是真正须要处理的(能够读写数据的socket)却很少,大量的线程处于等待数据状态(这也是为何叫作阻塞的缘由),资源浪费得让人心疼。
后来Java为了解决这个问题,又搞了一个非阻塞IO(NIO:Non-Blocking IO,有人也叫作New IO), 改变了一下思路:经过多路复用的方式让一个线程去处理多个Socket。
这样一来,只须要使用少许的线程就能够搞定多个socket了,线程只须要经过Selector去查一下它所管理的socket集合,哪一个Socket的数据准备好了,就去处理哪一个Socket,一点儿都不浪费。
好了,就是Java NIO了!
小明先定义了一套精简的RPC的协议,里边规定了如何去调用一个服务,方法名和参数该如何传递,返回值用什么格式......等等。而后雄心勃勃地要把这个协议用Java NIO给实现了。
但是美好的理想很快被无情的现实给击碎, 小明努力了一周就意识到本身陷入了一个大坑之中,Java NIO虽然看起来简单,可是API仍是太“低级”了,有太多的复杂性,没有强悍的、一流的编程能力根本没法驾驭,根本作不到高并发状况下的可靠和高效。
小明不死心,继续向领导要人要资源,必定要把这个坑给填上,挣扎了6个月之后,终于实现了一个本身的NIO框架,能够执行高并发的RPC调用了。
而后又是长达6个月的修修补补,小明常常半夜被叫醒:生产环境的RPC调用没法返回了! 这样的Bug不知道改了多少个。
在那些不眠之夜中,小明常常仰天长叹:我用NIO作个高并发的RPC框架怎么这么难呐!
一年以后,自研的框架终于稳定,但是小明也从张大胖那里听到了一个让他崩溃的消息: 小明你知道吗?有个叫Netty的开源框架,能够快速地开发高性能的面向协议的服务器和客户端。 易用、健壮、安全、高效,你能够在Netty上轻松实现各类自定义的协议!我们也试试?
小明赶忙研究,看完后不禁得“泪流满面”:这东西怎么不早点出来啊!
好了,这个故事我快编不下去了,要烂尾了。
说说Netty究竟是何方神圣, 要解决什么问题吧。
像上面小明的例子,想使用Java NIO来实现一个高性能的RPC框架,调用协议,数据的格式和次序都是本身定义的,现有的HTTP根本玩不转,那使用Netty就是绝佳的选择。
其实游戏领域是个更好的例子,长链接,自定义协议,高并发,Netty就是绝配。
由于Netty自己就是一个基于NIO的网络框架, 封装了Java NIO那些复杂的底层细节,给你提供简单好用的抽象概念来编程。
注意几个关键词,首先它是个框架,是个“半成品”,不能开箱即用,你必须得拿过来作点定制,利用它开发出本身的应用程序,而后才能运行(就像使用Spring那样)。
一个更加知名的例子就是阿里巴巴的Dubbo了,这个RPC框架的底层用的就是Netty。
另一个关键词是高性能,若是你的应用根本没有高并发的压力,那就不必定要用Netty了。
netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO之后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在之后推出。
基于NIO的网络编程框架Netty
转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/
Netty是一个基于异步与事件驱动的网络应用程序框架,它支持快速与简单地开发可维护的高性能的服务器与客户端。
所谓事件驱动就是由经过各类事件响应来决定程序的流程,在Netty中处处都充满了异步与事件驱动,这种特色使得应用程序能够以任意的顺序响应在任意的时间点产生的事件,它带来了很是高的可伸缩性,让你的应用能够在须要处理的工做不断增加时,经过某种可行的方式或者扩大它的处理能力来适应这种增加。
Netty提供了高性能与易用性,它具备如下特色:
拥有设计良好且统一的API,支持NIO与OIO(阻塞IO)等多种传输类型,支持真正的无链接UDP Socket。
简单而强大的线程模型,可高度定制线程(池)。
良好的模块化与解耦,支持可扩展和灵活的事件模型,能够很轻松地分离关注点以复用逻辑组件(可插拔的)。
性能高效,拥有比Java核心API更高的吞吐量,经过zero-copy功能以实现最少的内存复制消耗。
内置了许多经常使用的协议编解码器,如HTTP、SSL、WebScoket等常见协议能够经过Netty作到开箱即用。用户也能够利用Netty简单方便地实现本身的应用层协议。
大多数人使用Netty主要仍是为了提升应用的性能,而高性能则离不开非阻塞IO。Netty的非阻塞IO是基于Java NIO的,而且对其进行了封装(直接使用Java NIO API在高复杂度下的应用中是一项很是繁琐且容易出错的操做,而Netty帮你封装了这些复杂操做)。
Netty简介
读完这一章,咱们基本上能够了解到Netty全部重要的组件,对Netty有一个全面的认识,这对下一步深刻学习Netty是十分重要的,而学完这一章,咱们其实已经能够用Netty解决一些常规的问题了。
1、先纵览一下Netty,看看Netty都有哪些组件?
为了更好的理解和进一步深刻Netty,咱们先整体认识一下Netty用到的组件及它们在整个Netty架构中是怎么协调工做的。Netty应用中必不可少的组件:
Bootstrap,一个Netty应用一般由一个Bootstrap开始,它主要做用是配置整个Netty程序,串联起各个组件。
Handler,为了支持各类协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各类事件,这里的事件很普遍,好比能够是链接、数据接收、异常、数据转换等。
ChannelInboundHandler,一个最经常使用的Handler。这个Handler的做用就是处理接收到数据时的事件,也就是说,咱们的业务逻辑通常就是写在这个Handler里面的,ChannelInboundHandler就是用来处理咱们的核心业务逻辑。
ChannelInitializer,当一个连接创建时,咱们须要知道怎么来接收或者发送数据,固然,咱们有各类各样的Handler实现来处理它,那么ChannelInitializer即是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。
ChannelPipeline,一个Netty应用基于ChannelPipeline机制,这种机制须要依赖于EventLoop和EventLoopGroup,由于它们三个都和事件或者事件处理相关。
EventLoops的目的是为Channel处理IO操做,一个EventLoop能够为多个Channel服务。
EventLoopGroup会包含多个EventLoop。
Channel表明了一个Socket连接,或者其它和IO操做相关的组件,它和EventLoop一块儿用来参与IO处理。
Future,在Netty中全部的IO操做都是异步的,所以,你不能马上得知消息是否被正确处理,可是咱们能够过一会等它执行完成或者直接注册一个监听,具体的实现就是经过Future和ChannelFutures,他们能够注册一个监听,当操做执行成功或失败时监听会自动触发。总之,全部的操做都会返回一个ChannelFuture。
2、Netty是如何处理链接请求和业务逻辑的呢?-- Channels、Events 和 IO
Netty是一个非阻塞的、事件驱动的、网络编程框架。固然,咱们很容易理解Netty会用线程来处理IO事件,对于熟悉多线程编程的人来讲,你或许会想到如何同步你的代码,可是Netty不须要咱们考虑这些,具体是这样:
一个Channel会对应一个EventLoop,而一个EventLoop会对应着一个线程,也就是说,仅有一个线程在负责一个Channel的IO操做。
关于这些名词之间的关系,能够见下图:
如图所示:当一个链接到达,Netty会注册一个channel,而后EventLoopGroup会分配一个EventLoop绑定到这个channel,在这个channel的整个生命周期过程当中,都会由绑定的这个EventLoop来为它服务,而这个EventLoop就是一个线程。
说到这里,那么EventLoops和EventLoopGroups关系是如何的呢?咱们前面说过一个EventLoopGroup包含多个Eventloop,可是咱们看一下下面这幅图,这幅图是一个继承树,从这幅图中咱们能够看出,EventLoop其实继承自EventloopGroup,也就是说,在某些状况下,咱们能够把一个EventLoopGroup当作一个EventLoop来用。
3、咱们来看看如何配置一个Netty应用?-- BootsStrapping
咱们利用BootsStrapping来配置netty 应用,它有两种类型,一种用于Client端:BootsStrap,另外一种用于Server端:ServerBootstrap,要想区别如何使用它们,你仅须要记住一个用在Client端,一个用在Server端。下面咱们来详细介绍一下这两种类型的区别:
1.第一个最明显的区别是,ServerBootstrap用于Server端,经过调用bind()方法来绑定到一个端口监听链接;Bootstrap用于Client端,须要调用connect()方法来链接服务器端,但咱们也能够经过调用bind()方法返回的ChannelFuture中获取Channel去connect服务器端。
2.客户端的Bootstrap通常用一个EventLoopGroup,而服务器端的ServerBootstrap会用到两个(这两个也能够是同一个实例)。为什么服务器端要用到两个EventLoopGroup呢?这么设计有明显的好处,若是一个ServerBootstrap有两个EventLoopGroup,那么就能够把第一个EventLoopGroup用来专门负责绑定到端口监听链接事件,而把第二个EventLoopGroup用来处理每一个接收到的链接,下面咱们用一幅图来展示一下这种模式:
PS: 若是仅由一个EventLoopGroup处理全部请求和链接的话,在并发量很大的状况下,这个EventLoopGroup有可能会忙于处理已经接收到的链接而不能及时处理新的链接请求,用两个的话,会有专门的线程来处理链接请求,不会致使请求超时的状况,大大提升了并发处理能力。
咱们知道一个Channel须要由一个EventLoop来绑定,并且二者一旦绑定就不会再改变。通常状况下一个EventLoopGroup中的EventLoop数量会少于Channel数量,那么就颇有可能出现一个多个Channel公用一个EventLoop的状况,这就意味着若是一个Channel中的EventLoop很忙的话,会影响到这个Eventloop对其它Channel的处理,这也就是为何咱们不能阻塞EventLoop的缘由。
固然,咱们的Server也能够只用一个EventLoopGroup,由一个实例来处理链接请求和IO事件,请看下面这幅图:
4、咱们看看Netty是如何处理数据的?-- Netty核心ChannelHandler
下面咱们来看一下netty中是怎样处理数据的,回想一下咱们前面讲到的Handler,对了,就是它。说到Handler咱们就不得不提ChannelPipeline,ChannelPipeline负责安排Handler的顺序及其执行,下面咱们就来详细介绍一下他们:
ChannelPipeline and handlers
咱们的应用程序中用到的最多的应该就是ChannelHandler,咱们能够这么想象,数据在一个ChannelPipeline中流动,而ChannelHandler即是其中的一个个的小阀门,这些数据都会通过每个ChannelHandler而且被它处理。这里有一个公共接口ChannelHandler:
从上图中咱们能够看到,ChannelHandler有两个子类ChannelInboundHandler和ChannelOutboundHandler,这两个类对应了两个数据流向,若是数据是从外部流入咱们的应用程序,咱们就看作是inbound,相反即是outbound。其实ChannelHandler和Servlet有些相似,一个ChannelHandler处理完接收到的数据会传给下一个Handler,或者什么不处理,直接传递给下一个。下面咱们看一下ChannelPipeline是如何安排ChannelHandler的:
从上图中咱们能够看到,一个ChannelPipeline能够把两种Handler(ChannelInboundHandler和ChannelOutboundHandler)混合在一块儿,当一个数据流进入ChannelPipeline时,它会从ChannelPipeline头部开始传给第一个ChannelInboundHandler,当第一个处理完后再传给下一个,一直传递到管道的尾部。与之相对应的是,当数据被写出时,它会从管道的尾部开始,先通过管道尾部的“最后”一个ChannelOutboundHandler,当它处理完成后会传递给前一个ChannelOutboundHandler。
数据在各个Handler之间传递,这须要调用方法中传递的ChanneHandlerContext来操做, 在netty的API中提供了两个基类分ChannelOutboundHandlerAdapter和ChannelOutboundHandlerAdapter,他们仅仅实现了调用ChanneHandlerContext来把消息传递给下一个Handler,由于咱们只关心处理数据,所以咱们的程序中能够继承这两个基类来帮助咱们作这些,而咱们仅需实现处理数据的部分便可。
咱们知道InboundHandler和OutboundHandler在ChannelPipeline中是混合在一块儿的,那么它们如何区分彼此呢?其实很容易,由于它们各自实现的是不一样的接口,对于inbound event,Netty会自动跳过OutboundHandler,相反如果outbound event,ChannelInboundHandler会被忽略掉。
当一个ChannelHandler被加入到ChannelPipeline中时,它便会得到一个ChannelHandlerContext的引用,而ChannelHandlerContext能够用来读写Netty中的数据流。所以,如今能够有两种方式来发送数据,一种是把数据直接写入Channel,一种是把数据写入ChannelHandlerContext,它们的区别是写入Channel的话,数据流会从Channel的头开始传递,而若是写入ChannelHandlerContext的话,数据流会流入管道中的下一个Handler。
5、咱们最关心的部分,如何处理咱们的业务逻辑? -- Encoders, Decoders and Domain Logic
Netty中会有不少Handler,具体是哪一种Handler还要看它们继承的是InboundAdapter仍是OutboundAdapter。固然,Netty中还提供了一些列的Adapter来帮助咱们简化开发,咱们知道在Channelpipeline中每个Handler都负责把Event传递给下一个Handler,若是有了这些辅助Adapter,这些额外的工做均可自动完成,咱们只需覆盖实现咱们真正关心的部分便可。此外,还有一些Adapter会提供一些额外的功能,好比编码和解码。那么下面咱们就来看一下其中的三种经常使用的ChannelHandler:
Encoders和Decoders
由于咱们在网络传输时只能传输字节流,所以,才发送数据以前,咱们必须把咱们的message型转换为bytes,与之对应,咱们在接收数据后,必须把接收到的bytes再转换成message。咱们把bytes to message这个过程称做Decode(解码成咱们能够理解的),把message to bytes这个过程成为Encode。
Netty中提供了不少现成的编码/解码器,咱们通常从他们的名字中即可知道他们的用途,如ByteToMessageDecoder、MessageToByteEncoder,如专门用来处理Google Protobuf协议的ProtobufEncoder、 ProtobufDecoder。
咱们前面说过,具体是哪一种Handler就要看它们继承的是InboundAdapter仍是OutboundAdapter,对于Decoders,很容易即可以知道它是继承自ChannelInboundHandlerAdapter或 ChannelInboundHandler,由于解码的意思是把ChannelPipeline传入的bytes解码成咱们能够理解的message(即Java Object),而ChannelInboundHandler正是处理Inbound Event,而Inbound Event中传入的正是字节流。Decoder会覆盖其中的“ChannelRead()”方法,在这个方法中来调用具体的decode方法解码传递过来的字节流,而后经过调用ChannelHandlerContext.fireChannelRead(decodedMessage)方法把编码好的Message传递给下一个Handler。与之相似,Encoder就没必要多少了。
Domain Logic
其实咱们最最关心的事情就是如何处理接收到的解码后的数据,咱们真正的业务逻辑即是处理接收到的数据。Netty提供了一个最经常使用的基类SimpleChannelInboundHandler<T>,其中T就是这个Handler处理的数据的类型(上一个Handler已经替咱们解码好了),消息到达这个Handler时,Netty会自动调用这个Handler中的channelRead0(ChannelHandlerContext,T)方法,T是传递过来的数据对象,在这个方法中咱们即可以任意写咱们的业务逻辑了。
Netty源码剖析
Netty从某方面来讲就是一套NIO框架,在Java NIO基础上作了封装,因此要想学好Netty我建议先理解好Java NIO,
NIO能够称为New IO也能够称为Non-blocking IO,它比Java旧的阻塞IO在性能上要高效许多(若是让每个链接中的IO操做都单首创建一个线程,那么阻塞IO并不会比NIO在性能上落后,但不可能建立无限多的线程,在链接数很是多的状况下会很糟糕)。
ByteBuffer:NIO的数据传输是基于缓冲区的,ByteBuffer正是NIO数据传输中所使用的缓冲区抽象。ByteBuffer支持在堆外分配内存,而且尝试避免在执行I/O操做中的多余复制。通常的I/O操做都须要进行系统调用,这样会先切换到内核态,内核态要先从文件读取数据到它的缓冲区,只有等数据准备完毕后,才会从内核态把数据写到用户态,所谓的阻塞IO其实就是说的在等待数据准备好的这段时间内进行阻塞。若是想要避免这个额外的内核操做,能够经过使用mmap(虚拟内存映射)的方式来让用户态直接操做文件。
Channel:它相似于文件描述符,简单地来讲它表明了一个实体(如一个硬件设备、文件、Socket或者一个可以执行一个或多个不一样的I/O操做的程序组件)。你能够从一个Channel中读取数据到缓冲区,也能够将一个缓冲区中的数据写入到Channel。
Selector:选择器是NIO实现的关键,NIO采用的是I/O多路复用的方式来实现非阻塞,Selector经过在一个线程中监听每一个Channel的IO事件来肯定有哪些已经准备好进行IO操做的Channel,所以能够在任什么时候间检查任意的读操做或写操做的完成状态。这种方式避免了等待IO操做准备数据时的阻塞,使用较少的线程即可以处理许多链接,减小了线程切换与维护的开销。
了解了NIO的实现思想以后,我以为还颇有必要了解一下Unix中的I/O模型,Unix中拥有如下5种I/O模型:
阻塞I/O(Blocking I/O)
非阻塞I/O(Non-blocking I/O)
I/O多路复用(I/O multiplexing (select and poll))
信号驱动I/O(signal driven I/O (SIGIO))
异步I/O(asynchronous I/O (the POSIX aio_functions))
阻塞I/O模型是最多见的I/O模型,一般咱们使用的InputStream/OutputStream都是基于阻塞I/O模型。在上图中,咱们使用UDP做为例子,recvfrom()函数是UDP协议用于接收数据的函数,它须要使用系统调用并一直阻塞到内核将数据准备好,以后再由内核缓冲区复制数据到用户态(便是recvfrom()接收到数据),所谓阻塞就是在等待内核准备数据的这段时间内什么也不干。
举个生活中的例子,阻塞I/O就像是你去餐厅吃饭,在等待饭作好的时间段中,你只能在餐厅中坐着干等(若是你在玩手机那么这就是非阻塞I/O了)。
在非阻塞I/O模型中,内核在数据还没有准备好的状况下回返回一个错误码EWOULDBLOCK
,而recvfrom并无在失败的状况下选择阻塞休眠,而是不断地向内核询问是否已经准备完毕,在上图中,前三次内核都返回了EWOULDBLOCK
,直到第四次询问时,内核数据准备完毕,而后开始将内核中缓存的数据复制到用户态。这种不断询问内核以查看某种状态是否完成的方式被称为polling(轮询)
。
非阻塞I/O就像是你在点外卖,只不过你很是心急,每隔一段时间就要打电话问外卖小哥有没有到。
I/O多路复用的思想跟非阻塞I/O是同样的,只不过在非阻塞I/O中,是在recvfrom的用户态(或一个线程)中去轮询内核,这种方式会消耗大量的CPU时间。而I/O多路复用则是经过select()或poll()系统调用来负责进行轮询,以实现监听I/O读写事件的状态。如上图中,select监听到一个datagram可读时,就交由recvfrom去发送系统调用将内核中的数据复制到用户态。
这种方式的优势很明显,经过I/O多路复用能够监听多个文件描述符,且在内核中完成监控的任务。但缺点是至少须要两个系统调用(select()与recvfrom())。
I/O多路复用一样适用于点外卖这个例子,只不过你在等外卖的期间彻底能够作本身的事情,当外卖到的时候会经过外卖APP或者由外卖小哥打电话来通知你。
Unix中提供了两种I/O多路复用函数,select()和poll()。select()的兼容性更好,但它在单个进程中所能监控的文件描述符是有限的,这个值与FD_SETSIZE
相关,32位系统中默认为1024,64位系统中为2048。select()还有一个缺点就是他轮询的方式,它采起了线性扫描的轮询方式,每次都要遍历FD_SETSIZE个文件描述符,无论它们是否活不活跃的。poll()本质上与select()的实现没有区别,不过在数据结构上区别很大,用户必须分配一个pollfd结构数组,该数组维护在内核态中,正因如此,poll()并不像select()那样拥有大小上限的限制,但缺点一样也很明显,大量的fd数组会在用户态与内核态之间不断复制,无论这样的复制是否有意义。
还有一种比select()与poll()更加高效的实现叫作epoll(),它是由Linux内核2.6推出的可伸缩的I/O多路复用实现,目的是为了替代select()与poll()。epoll()一样没有文件描述符上限的限制,它使用一个文件描述符来管理多个文件描述符,并使用一个红黑树来做为存储结构。同时它还支持边缘触发(edge-triggered)与水平触发(level-triggered)两种模式(poll()只支持水平触发),在边缘触发模式下,epoll_wait
仅会在新的事件对象首次被加入到epoll时返回,而在水平触发模式下,epoll_wait
会在事件状态未变动前不断地触发。也就是说,边缘触发模式只会在文件描述符变为就绪状态时通知一次,水平触发模式会不断地通知该文件描述符直到被处理。
关于epoll_wait
请参考以下epoll API。
12345678910 | // 建立一个epoll对象并返回它的文件描述符。// 参数flags容许修改epoll的行为,它只有一个有效值EPOLL_CLOEXEC。int epoll_create1(int flags);// 配置对象,该对象负责描述监控哪些文件描述符和哪些事件。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 等待与epoll_ctl注册的任何事件,直至事件发生一次或超时。// 返回在events中发生的事件,最多同时返回maxevents个。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); |
---|
epoll另外一亮点是采用了事件驱动的方式而不是轮询,在epoll_ctl
中注册的文件描述符在事件触发的时候会经过一个回调机制来激活该文件描述符,epoll_wait
即可以收到通知。这样效率就不会与文件描述符的数量成正比。
在Java NIO2(从JDK1.7开始引入)中,只要Linux内核版本在2.6以上,就会采用epoll,以下源码所示(DefaultSelectorProvider.java)。
| 123456789101112131415161718192021222324252627 | public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if ("SunOS".equals(osname)) {return new sun.nio.ch.DevPollSelectorProvider();}// use EPollSelectorProvider for Linux kernels >= 2.6if ("Linux".equals(osname)) {String osversion = AccessController.doPrivileged(new GetPropertyAction("os.version"));String[] vers = osversion.split("\.", 0);if (vers.length >= 2) {try {int major = Integer.parseInt(vers[0]);int minor = Integer.parseInt(vers[1]);if (major > 2 || (major == 2 && minor >= 6)) {return new sun.nio.ch.EPollSelectorProvider();}} catch (NumberFormatException x) {// format not recognized}}}return new sun.nio.ch.PollSelectorProvider();} | | --- | --- |
信号驱动I/O模型使用到了信号,内核在数据准备就绪时会经过信号来进行通知。咱们首先开启了一个信号驱动I/O套接字,并使用sigaction系统调用来安装信号处理程序,内核直接返回,不会阻塞用户态。当datagram准备好时,内核会发送SIGIO信号,recvfrom接收到信号后会发送系统调用开始进行I/O操做。
这种模型的优势是主进程(线程)不会被阻塞,当数据准备就绪时,经过信号处理程序来通知主进程(线程)准备进行I/O操做与对数据的处理。
咱们以前讨论的各类I/O模型不管是阻塞仍是非阻塞,它们所说的阻塞都是指的数据准备阶段。异步I/O模型一样依赖于信号处理程序来进行通知,但与以上I/O模型都不相同的是,异步I/O模型通知的是I/O操做已经完成,而不是数据准备完成。
能够说异步I/O模型才是真正的非阻塞,主进程只管作本身的事情,而后在I/O操做完成时调用回调函数来完成一些对数据的处理操做便可。
闲扯了这么多,想必你们已经对I/O模型有了一个深入的认识。以后,咱们将会结合部分源码(Netty4.X)来探讨Netty中的各大核心组件,以及如何使用Netty,你会发现实现一个Netty程序是多么简单(并且还伴随了高性能与可维护性)。
本文做者为SylvanasSun(sylvanas.sun@gmail.com),首发于SylvanasSun’s Blog。 原文连接:https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ (转载请务必保留本段声明,而且保留超连接。)
网络传输的基本单位是字节,在Java NIO中提供了ByteBuffer做为字节缓冲区容器,但该类的API使用起来不太方便,因此Netty实现了ByteBuf做为其替代品,下面是使用ByteBuf的优势:
相比ByteBuffer使用起来更加简单。
经过内置的复合缓冲区类型实现了透明的zero-copy。
容量能够按需增加。
读和写使用了不一样的索引指针。
支持链式调用。
支持引用计数与池化。
能够被用户自定义的缓冲区类型扩展。
在讨论ByteBuf以前,咱们先须要了解一下ByteBuffer的实现,这样才能比较深入地明白它们之间的区别。
ByteBuffer继承于abstract class Buffer
(因此还有LongBuffer、IntBuffer等其余类型的实现),本质上它只是一个有限的线性的元素序列,包含了三个重要的属性。
Capacity:缓冲区中元素的容量大小,你只能将capacity个数量的元素写入缓冲区,一旦缓冲区已满就须要清理缓冲区才能继续写数据。
Position:指向下一个写入数据位置的索引指针,初始位置为0,最大为capacity-1。当写模式转换为读模式时,position须要被重置为0。
Limit:在写模式中,limit是能够写入缓冲区的最大索引,也就是说它在写模式中等价于缓冲区的容量。在读模式中,limit表示能够读取数据的最大索引。
因为Buffer中只维护了position一个索引指针,因此它在读写模式之间的切换须要调用一个flip()方法来重置指针。使用Buffer的流程通常以下:
写入数据到缓冲区。
调用flip()方法。
从缓冲区中读取数据
调用buffer.clear()或者buffer.compact()清理缓冲区,以便下次写入数据。
12345678910111213141516171819 | RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");FileChannel inChannel = aFile.getChannel();// 分配一个48字节大小的缓冲区ByteBuffer buf = ByteBuffer.allocate(48);int bytesRead = inChannel.read(buf); // 读取数据到缓冲区while (bytesRead != -1) {buf.flip(); // 将position重置为0while(buf.hasRemaining()){System.out.print((char) buf.get()); // 读取数据并输出到控制台}buf.clear(); // 清理缓冲区bytesRead = inChannel.read(buf);}aFile.close(); |
---|
Buffer中核心方法的实现也很是简单,主要就是在操做指针position。
12345678910111213141516171819202122232425262728293031323334353637383940 | /*** Sets this buffer's mark at its position.** @return This buffer*/public final Buffer mark() {mark = position; // mark属性是用来标记当前索引位置的return this;}// 将当前索引位置重置为mark所标记的位置public final Buffer reset() {int m = mark;if (m < 0)throw new InvalidMarkException();position = m;return this;}// 翻转这个Buffer,将limit设置为当前索引位置,而后再把position重置为0public final Buffer flip() {limit = position;position = 0;mark = -1;return this;}// 清理缓冲区// 说是清理,也只是把postion与limit进行重置,以后再写入数据就会覆盖以前的数据了public final Buffer clear() {position = 0;limit = capacity;mark = -1;return this;}// 返回剩余空间public final int remaining() {return limit - position;} |
---|
Java NIO中的Buffer API操做的麻烦之处就在于读写转换须要手动重置指针。而ByteBuf没有这种繁琐性,它维护了两个不一样的索引,一个用于读取,一个用于写入。当你从ByteBuf读取数据时,它的readerIndex将会被递增已经被读取的字节数,一样的,当你写入数据时,writerIndex则会递增。readerIndex的最大范围在writerIndex的所在位置,若是试图移动readerIndex超过该值则会触发异常。
ByteBuf中名称以read或write开头的方法将会递增它们其对应的索引,而名称以get或set开头的方法则不会。ByteBuf一样能够指定一个最大容量,试图移动writerIndex超过该值则会触发异常。
1234567891011121314151617181920212223242526272829303132333435363738394041424344 | public byte readByte() {this.checkReadableBytes0(1); // 检查readerIndex是否已越界int i = this.readerIndex;byte b = this._getByte(i);this.readerIndex = i + 1; // 递增readerIndexreturn b;}private void checkReadableBytes0(int minimumReadableBytes) {this.ensureAccessible();if(this.readerIndex > this.writerIndex - minimumReadableBytes) {throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));}}public ByteBuf writeByte(int value) {this.ensureAccessible();this.ensureWritable0(1); // 检查writerIndex是否会越过capacitythis._setByte(this.writerIndex++, value);return this;}private void ensureWritable0(int minWritableBytes) {if(minWritableBytes > this.writableBytes()) {if(minWritableBytes > this.maxCapacity - this.writerIndex) {throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));} else {int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);this.capacity(newCapacity);}}}// get与set只对传入的索引进行了检查,而后对其位置进行get或setpublic byte getByte(int index) {this.checkIndex(index);return this._getByte(index);}public ByteBuf setByte(int index, int value) {this.checkIndex(index);this._setByte(index, value);return this;} |
---|
ByteBuf一样支持在堆内和堆外进行分配。在堆内分配也被称为支撑数组模式,它能在没有使用池化的状况下提供快速的分配和释放。
12345678 | ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);if (heapBuf.hasArray()) { // 判断是否有一个支撑数组byte[] array = heapBuf.array();// 计算第一个字节的偏移量int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();int length = heapBuf.readableBytes(); // 得到可读字节handleArray(array,offset,length); // 调用你的处理方法} |
---|
另外一种模式为堆外分配,Java NIO ByteBuffer类在JDK1.4时就已经容许JVM实现经过JNI调用来在堆外分配内存(调用malloc()函数在JVM堆外分配内存),这主要是为了不额外的缓冲区复制操做。
12345678 | ByteBuf directBuf = Unpooled.directBuffer(capacity);if (!directBuf.hasArray()) {int length = directBuf.readableBytes();byte[] array = new byte[length];// 将字节复制到数组中directBuf.getBytes(directBuf.readerIndex(),array);handleArray(array,0,length);} |
---|
ByteBuf还支持第三种模式,它被称为复合缓冲区,为多个ByteBuf提供了一个聚合视图。在这个视图中,你能够根据须要添加或者删除ByteBuf实例,ByteBuf的子类CompositeByteBuf实现了该模式。
一个适合使用复合缓冲区的场景是HTTP协议,经过HTTP协议传输的消息都会被分红两部分——头部和主体,若是这两部分由应用程序的不一样模块产生,将在消息发送时进行组装,而且该应用程序还会为多个消息复用相同的消息主体,这样对于每一个消息都将会建立一个新的头部,产生了不少没必要要的内存操做。使用CompositeByteBuf是一个很好的选择,它消除了这些额外的复制,以帮助你复用这些消息。
1234567 | CompositeByteBuf messageBuf = Unpooled.compositeBuffer();ByteBuf headerBuf = ....;ByteBuf bodyBuf = ....;messageBuf.addComponents(headerBuf,bodyBuf);for (ByteBuf buf : messageBuf) {System.out.println(buf.toString());} |
---|
CompositeByteBuf透明的实现了zero-copy,zero-copy其实就是避免数据在两个内存区域中来回的复制。从操做系统层面上来说,zero-copy指的是避免在内核态与用户态之间的数据缓冲区复制(经过mmap避免),而Netty中的zero-copy更偏向于在用户态中的数据操做的优化,就像使用CompositeByteBuf来复用多个ByteBuf以免额外的复制,也能够使用wrap()方法来将一个字节数组包装成ByteBuf,又或者使用ByteBuf的slice()方法把它分割为多个共享同一内存区域的ByteBuf,这些都是为了优化内存的使用率。
那么如何建立ByteBuf呢?在上面的代码中使用到了Unpooled,它是Netty提供的一个用于建立与分配ByteBuf的工具类,建议都使用这个工具类来建立你的缓冲区,不要本身去调用构造函数。常用的是wrappedBuffer()与copiedBuffer(),它们一个是用于将一个字节数组或ByteBuffer包装为一个ByteBuf,一个是根据传入的字节数组与ByteBuffer/ByteBuf来复制出一个新的ByteBuf。
12345678910111213141516 | // 经过array.clone()来复制一个数组进行包装public static ByteBuf copiedBuffer(byte[] array) {return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());}// 默认是堆内分配public static ByteBuf wrappedBuffer(byte[] array) {return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));}// 也提供了堆外分配的方法private static final ByteBufAllocator ALLOC;public static ByteBuf directBuffer(int initialCapacity) {return ALLOC.directBuffer(initialCapacity);} |
---|
相对底层的分配方法是使用ByteBufAllocator,Netty实现了PooledByteBufAllocator和UnpooledByteBufAllocator,前者使用了jemalloc(一种malloc()的实现)来分配内存,而且实现了对ByteBuf的池化以提升性能。后者分配的是未池化的ByteBuf,其分配方式与以前讲的一致。
1234 | Channel channel = ...;ByteBufAllocator allocator = channel.alloc();ByteBuf buffer = allocator.directBuffer();do something....... |
---|
为了优化内存使用率,Netty提供了一套手动的方式来追踪不活跃对象,像UnpooledHeapByteBuf这种分配在堆内的对象得益于JVM的GC管理,无需额外操心,而UnpooledDirectByteBuf是在堆外分配的,它的内部基于DirectByteBuffer,DirectByteBuffer会先向Bits类申请一个额度(Bits还拥有一个全局变量totalCapacity,记录了全部DirectByteBuffer总大小),每次申请前都会查看是否已经超过-XX:MaxDirectMemorySize所设置的上限,若是超限就会尝试调用Sytem.gc(),以试图回收一部份内存,而后休眠100毫秒,若是内存仍是不足,则只能抛出OOM异常。堆外内存的回收虽然有了这么一层保障,但为了提升性能与使用率,主动回收也是颇有必要的。因为Netty还实现了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必须依赖于手动的方式来进行回收(放回池中)。
Netty使用了引用计数器的方式来追踪那些不活跃的对象。引用计数的接口为ReferenceCounted,它的思想很简单,只要ByteBuf对象的引用计数大于0,就保证该对象不会被释放回收,能够经过手动调用release()与retain()方法来操做该对象的引用计数值递减或递增。用户也能够经过自定义一个ReferenceCounted的实现类,以知足自定义的规则。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | package io.netty.buffer;public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {// 因为ByteBuf的实例对象会很是多,因此这里没有将refCnt包装为AtomicInteger// 而是使用一个全局的AtomicIntegerFieldUpdater来负责操做refCntprivate static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");// 每一个ByteBuf的初始引用值都为1private volatile int refCnt = 1;public int refCnt() {return this.refCnt;}protected final void setRefCnt(int refCnt) {this.refCnt = refCnt;}public ByteBuf retain() {return this.retain0(1);}// 引用计数值递增increment,increment必须大于0public ByteBuf retain(int increment) {return this.retain0(ObjectUtil.checkPositive(increment, "increment"));}public static int checkPositive(int i, String name) {if(i <= 0) {throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");} else {return i;}}// 使用CAS操做不断尝试更新值private ByteBuf retain0(int increment) {int refCnt;int nextCnt;do {refCnt = this.refCnt;nextCnt = refCnt + increment;if(nextCnt <= increment) {throw new IllegalReferenceCountException(refCnt, increment);}} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));return this;}public boolean release() {return this.release0(1);}public boolean release(int decrement) {return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));}private boolean release0(int decrement) {int refCnt;do {refCnt = this.refCnt;if(refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);}} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));if(refCnt == decrement) {this.deallocate();return true;} else {return false;}}protected abstract void deallocate();} |
---|
Netty中的Channel与Java NIO的概念同样,都是对一个实体或链接的抽象,但Netty提供了一套更加通用的API。就以网络套接字为例,在Java中OIO与NIO是大相径庭的两套API,假设你以前使用的是OIO而又想更改成NIO实现,那么几乎须要重写全部代码。而在Netty中,只须要更改短短几行代码(更改Channel与EventLoop的实现类,如把OioServerSocketChannel替换为NioServerSocketChannel),就能够完成OIO与NIO(或其余)之间的转换。
每一个Channel最终都会被分配一个ChannelPipeline和ChannelConfig,前者持有全部负责处理入站与出站数据以及事件的ChannelHandler,后者包含了该Channel的全部配置设置,而且支持热更新,因为不一样的传输类型可能具备其特别的配置,因此该类可能会实现为ChannelConfig的不一样子类。
Channel是线程安全的(与以后要讲的线程模型有关),所以你彻底能够在多个线程中复用同一个Channel,就像以下代码所示。
12345678910111213 | final Channel channel = ...final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();Runnable writer = new Runnable() {@Overridepublic void run() {channel.writeAndFlush(buffer.duplicate());}};Executor executor = Executors.newCachedThreadPool();executor.execute(writer);executor.execute(writer);....... |
---|
Netty除了支持常见的NIO与OIO,还内置了其余的传输类型。
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | 以Java NIO为基础实现 |
OIO | io.netty.channel.socket.oio | 以java.net为基础实现,使用阻塞I/O模型 |
Epoll | io.netty.channel.epoll | 由JNI驱动epoll()实现的更高性能的非阻塞I/O,它只能使用在Linux |
Local | io.netty.channel.local | 本地传输,在JVM内部经过管道进行通讯 |
Embedded | io.netty.channel.embedded | 容许在不须要真实网络传输的环境下使用ChannelHandler,主要用于对ChannelHandler进行测试 |
NIO、OIO、Epoll咱们应该已经很熟悉了,下面主要说说Local与Embedded。
Local传输用于在同一个JVM中运行的客户端和服务器程序之间的异步通讯,与服务器Channel相关联的SocketAddress并无绑定真正的物理网络地址,它会被存储在注册表中,并在Channel关闭时注销。所以Local传输不会接受真正的网络流量,也就是说它不能与其余传输实现进行互操做。
Embedded传输主要用于对ChannelHandler进行单元测试,ChannelHandler是用于处理消息的逻辑组件,Netty经过将入站消息与出站消息都写入到EmbeddedChannel中的方式(提供了write/readInbound()与write/readOutbound()来读写入站与出站消息)来实现对ChannelHandler的单元测试。
ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器,该类是基于事件驱动的,它会响应相关的事件而后去调用其关联的回调函数,例如当一个新的链接被创建时,ChannelHandler的channelActive()方法将会被调用。
关于入站消息和出站消息的数据流向定义,若是以客户端为主视角来讲的话,那么从客户端流向服务器的数据被称为出站,反之为入站。
入站事件是可能被入站数据或者相关的状态更改而触发的事件,包括:链接已被激活、链接失活、读取入站数据、用户事件、发生异常等。
出站事件是将来将会触发的某个动做的结果的事件,这些动做包括:打开或关闭远程节点的链接、将数据写(或冲刷)到套接字。
ChannelHandler的主要用途包括:
对入站与出站数据的业务逻辑处理
记录日志
将数据从一种格式转换为另外一种格式,实现编解码器。以一次HTTP协议(或者其余应用层协议)的流程为例,数据在网络传输时的单位为字节,当客户端发送请求到服务器时,服务器须要经过解码器(处理入站消息)将字节解码为协议的消息内容,服务器在发送响应的时候(处理出站消息),还须要经过编码器将消息内容编码为字节。
捕获异常
提供Channel生命周期内的通知,如Channel活动时与非活动时
Netty中处处都充满了异步与事件驱动,而回调函数正是用于响应事件以后的操做。因为异步会直接返回一个结果,因此Netty提供了ChannelFuture(实现了java.util.concurrent.Future)来做为异步调用返回的占位符,真正的结果会在将来的某个时刻完成,到时候就能够经过ChannelFuture对其进行访问,每一个Netty的出站I/O操做都将会返回一个ChannelFuture。
Netty还提供了ChannelFutureListener接口来监听ChannelFuture是否成功,并采起对应的操做。
12345678910111213141516 | Channel channel = ...ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666));// 注册一个监听器future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.isSuccess()) {// do something....} else {// 输出错误信息Throwable cause = future.cause();cause.printStackTrace();// do something....}}}); |
---|
ChannelFutureListener接口中还提供了几个简单的默认实现,方便咱们使用。
12345678910111213141516171819202122232425262728293031 | package io.netty.channel;import io.netty.channel.ChannelFuture;import io.netty.util.concurrent.GenericFutureListener;public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {// 在Future完成时关闭ChannelFutureListener CLOSE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {future.channel().close();}};// 若是失败则关闭ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {if(!future.isSuccess()) {future.channel().close();}}};// 将异常信息传递给下一个ChannelHandlerChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {if(!future.isSuccess()) {future.channel().pipeline().fireExceptionCaught(future.cause());}}};} |
---|
ChannelHandler接口定义了对它生命周期进行监听的回调函数,在ChannelHandler被添加到ChannelPipeline或者被移除时都会调用这些函数。
12345678910111213141516171819 | package io.netty.channel;public interface ChannelHandler {void handlerAdded(ChannelHandlerContext var1) throws Exception;void handlerRemoved(ChannelHandlerContext var1) throws Exception;/** @deprecated */@Deprecatedvoid exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;// 该注解代表这个ChannelHandler可被其余线程复用@Inherited@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface Sharable {}} |
---|
入站消息与出站消息由其对应的接口ChannelInboundHandler与ChannelOutboundHandler负责,这两个接口定义了监听Channel的生命周期的状态改变事件的回调函数。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | package io.netty.channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;public interface ChannelInboundHandler extends ChannelHandler {// 当channel被注册到EventLoop时被调用void channelRegistered(ChannelHandlerContext var1) throws Exception;// 当channel已经被建立,但还未注册到EventLoop(或者从EventLoop中注销)被调用void channelUnregistered(ChannelHandlerContext var1) throws Exception;// 当channel处于活动状态(链接到远程节点)被调用void channelActive(ChannelHandlerContext var1) throws Exception;// 当channel处于非活动状态(没有链接到远程节点)被调用void channelInactive(ChannelHandlerContext var1) throws Exception;// 当从channel读取数据时被调用void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;// 当channel的上一个读操做完成时被调用void channelReadComplete(ChannelHandlerContext var1) throws Exception;// 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;// 当channel的可写状态发生改变时被调用void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;// 当处理过程当中发生异常时被调用void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;}package io.netty.channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import java.net.SocketAddress;public interface ChannelOutboundHandler extends ChannelHandler {// 当请求将Channel绑定到一个地址时被调用// ChannelPromise是ChannelFuture的一个子接口,定义了如setSuccess(),setFailure()等方法void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;// 当请求将Channel链接到远程节点时被调用void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;// 当请求将Channel从远程节点断开时被调用void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 当请求关闭Channel时被调用void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 当请求将Channel从它的EventLoop中注销时被调用void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 当请求从Channel读取数据时被调用void read(ChannelHandlerContext var1) throws Exception;// 当请求经过Channel将数据写到远程节点时被调用void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;// 当请求经过Channel将缓冲中的数据冲刷到远程节点时被调用void flush(ChannelHandlerContext var1) throws Exception;} |
---|
经过实现ChannelInboundHandler或者ChannelOutboundHandler就能够完成用户自定义的应用逻辑处理程序,不过Netty已经帮你实现了一些基本操做,用户只须要继承并扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来做为自定义实现的起始点。
ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter都继承于ChannelHandlerAdapter,该抽象类简单实现了ChannelHandler接口。
123456789101112131415161718192021222324252627282930313233343536 | public abstract class ChannelHandlerAdapter implements ChannelHandler {boolean added;public ChannelHandlerAdapter() {}// 该方法不容许将此ChannelHandler共享复用protected void ensureNotSharable() {if(this.isSharable()) {throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");}}// 使用反射判断实现类有没有@Sharable注解,以确认该类是否为可共享复用的public boolean isSharable() {Class clazz = this.getClass();Map cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = (Boolean)cache.get(clazz);if(sharable == null) {sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));cache.put(clazz, sharable);}return sharable.booleanValue();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}} |
---|
ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter默认只是简单地将请求传递给ChannelPipeline中的下一个ChannelHandler,源码以下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 | public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {public ChannelInboundHandlerAdapter() {}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}public void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}}public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {public ChannelOutboundHandlerAdapter() {}public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);}public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.close(promise);}public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}public void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}public void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();}} |
---|
对于处理入站消息,另一种选择是继承SimpleChannelInboundHandler,它是Netty的一个继承于ChannelInboundHandlerAdapter的抽象类,并在其之上实现了自动释放资源的功能。
咱们在了解ByteBuf时就已经知道了Netty使用了一套本身实现的引用计数算法来主动释放资源,假设你的ChannelHandler继承于ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那么你就有责任去管理你所分配的ByteBuf,通常来讲,一个消息对象(ByteBuf)已经被消费(或丢弃)了,而且不会传递给ChannelHandler链中的下一个处理器(若是该消息到达了实际的传输层,那么当它被写入或Channel关闭时,都会被自动释放),那么你就须要去手动释放它。经过一个简单的工具类ReferenceCountUtil的release方法,就能够作到这一点。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 | // 这个泛型为消息对象的类型public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {private final TypeParameterMatcher matcher;private final boolean autoRelease;protected SimpleChannelInboundHandler() {this(true);}protected SimpleChannelInboundHandler(boolean autoRelease) {this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");this.autoRelease = autoRelease;}protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {this(inboundMessageType, true);}protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {this.matcher = TypeParameterMatcher.get(inboundMessageType);this.autoRelease = autoRelease;}public boolean acceptInboundMessage(Object msg) throws Exception {return this.matcher.match(msg);}// SimpleChannelInboundHandler只是替你作了ReferenceCountUtil.release()public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {boolean release = true;try {if(this.acceptInboundMessage(msg)) {this.channelRead0(ctx, msg);} else {release = false;ctx.fireChannelRead(msg);}} finally {if(this.autoRelease && release) {ReferenceCountUtil.release(msg);}}}// 这个方法才是咱们须要实现的方法protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;}// ReferenceCountUtil中的源码,release方法对消息对象的类型进行判断而后调用它的release()方法public static boolean release(Object msg) {return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false;} |
---|
为了模块化与解耦合,不可能由一个ChannelHandler来完成全部应用逻辑,因此Netty采用了拦截器链的设计。ChannelPipeline就是用来管理ChannelHandler实例链的容器,它的职责就是保证明例链的流动。
每个新建立的Channel都将会被分配一个新的ChannelPipeline,这种关联关系是永久性的,一个Channel一辈子只能对应一个ChannelPipeline。
一个入站事件被触发时,它会先从ChannelPipeline的最左端(头部)开始一直传播到ChannelPipeline的最右端(尾部),而出站事件正好与入站事件顺序相反(从最右端一直传播到最左端)。这个顺序是定死的,Netty老是将ChannelPipeline的入站口做为头部,而将出站口做为尾部。在事件传播的过程当中,ChannelPipeline会判断下一个ChannelHandler的类型是否和事件的运动方向相匹配,若是不匹配,就跳过该ChannelHandler并继续检查下一个(保证入站事件只会被ChannelInboundHandler处理),一个ChannelHandler也能够同时实现ChannelInboundHandler与ChannelOutboundHandler,它在入站事件与出站事件中都会被调用。
在阅读ChannelHandler的源码时,发现不少方法须要一个ChannelHandlerContext类型的参数,该接口是ChannelPipeline与ChannelHandler之间相关联的关键。ChannelHandlerContext能够通知ChannelPipeline中的当前ChannelHandler的下一个ChannelHandler,还能够动态地改变当前ChannelHandler在ChannelPipeline中的位置(经过调用ChannelPipeline中的各类方法来修改)。
ChannelHandlerContext负责了在同一个ChannelPipeline中的ChannelHandler与其余ChannelHandler之间的交互,每一个ChannelHandlerContext都对应了一个ChannelHandler。在DefaultChannelPipeline的源码中,已经表现的很明显了。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | public class DefaultChannelPipeline implements ChannelPipeline {.........// 头部节点和尾部节点的引用变量// ChannelHandlerContext在ChannelPipeline中是以链表的形式组织的final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;.........// 添加一个ChannelHandler到链表尾部public final ChannelPipeline addLast(String name, ChannelHandler handler) {return this.addLast((EventExecutorGroup)null, name, handler);}public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized(this) {// 检查ChannelHandler是否为一个共享对象(@Sharable)// 若是该ChannelHandler没有@Sharable注解,而且是已被添加过的那么就抛出异常checkMultiplicity(handler);// 返回一个DefaultChannelHandlerContext,注意该对象持有了传入的ChannelHandlernewCtx = this.newContext(group, this.filterName(name, handler), handler);this.addLast0(newCtx);// 若是当前ChannelPipeline没有被注册,那么就先加到未决链表中if(!this.registered) {newCtx.setAddPending();this.callHandlerCallbackLater(newCtx, true);return this;}// 不然就调用ChannelHandler中的handlerAdded()EventExecutor executor = newCtx.executor();if(!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {public void run() {DefaultChannelPipeline.this.callHandlerAdded0(newCtx);}});return this;}}this.callHandlerAdded0(newCtx);return this;}// 将新的ChannelHandlerContext插入到尾部与尾部以前的节点之间private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = this.tail.prev;newCtx.prev = prev;newCtx.next = this.tail;prev.next = newCtx;this.tail.prev = newCtx;}.....} |
---|
ChannelHandlerContext还定义了许多与Channel和ChannelPipeline重合的方法(像read()、write()、connect()这些用于出站的方法或者如fireChannelXXXX()这样用于入站的方法),不一样之处在于调用Channel或者ChannelPipeline上的这些方法,它们将会从头沿着整个ChannelHandler实例链进行传播,而调用位于ChannelHandlerContext上的相同方法,则会从当前所关联的ChannelHandler开始,且只会传播给实例链中的下一个ChannelHandler。并且,事件之间的移动(从一个ChannelHandler到下一个ChannelHandler)也是经过ChannelHandlerContext中的方法调用完成的。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 | public class DefaultChannelPipeline implements ChannelPipeline {public final ChannelPipeline fireChannelRead(Object msg) {// 注意这里将头节点传入了进去AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);return this;}}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if(executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {public void run() {next.invokeChannelRead(m);}});}}private void invokeChannelRead(Object msg) {if(this.invokeHandler()) {try {((ChannelInboundHandler)this.handler()).channelRead(this, msg);} catch (Throwable var3) {this.notifyHandlerException(var3);}} else {// 寻找下一个ChannelHandlerthis.fireChannelRead(msg);}}public ChannelHandlerContext fireChannelRead(Object msg) {invokeChannelRead(this.findContextInbound(), msg);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while(!ctx.inbound); // 直到找到一个ChannelInboundHandlerreturn ctx;}} |
---|
为了最大限度地提供高性能和可维护性,Netty设计了一套强大又易用的线程模型。在一个网络框架中,最重要的能力是可以快速高效地处理在链接的生命周期内发生的各类事件,与之相匹配的程序构造被称为事件循环,Netty定义了接口EventLoop来负责这项工做。
若是是常常用Java进行多线程开发的童鞋想必常常会使用到线程池,也就是Executor这套API。Netty就是从Executor(java.util.concurrent)之上扩展了本身的EventExecutorGroup(io.netty.util.concurrent),同时为了与Channel的事件进行交互,还扩展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX负责实现线程并发相关的工做,而在io.netty.channel包下的EventLoopXXX负责实现网络编程相关的工做(处理Channel中的事件)。
在Netty的线程模型中,一个EventLoop将由一个永远不会改变的Thread驱动,而一个Channel一辈子只会使用一个EventLoop(可是一个EventLoop可能会被指派用于服务多个Channel),在Channel中的全部I/O操做和事件都由EventLoop中的线程处理,也就是说一个Channel的一辈子之中都只会使用到一个线程。不过在Netty3,只有入站事件会被EventLoop处理,全部出站事件都会由调用线程处理,这种设计致使了ChannelHandler的线程安全问题。Netty4简化了线程模型,经过在同一个线程处理全部事件,既解决了这个问题,还提供了一个更加简单的架构。
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 | package io.netty.channel;public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));private final Queue<Runnable> tailTasks;protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());}protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());}protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);this.tailTasks = this.newTaskQueue(maxPendingTasks);}protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);this.tailTasks = this.newTaskQueue(maxPendingTasks);}// 返回它所在的EventLoopGrouppublic EventLoopGroup parent() {return (EventLoopGroup)super.parent();}public EventLoop next() {return (EventLoop)super.next();}// 注册Channel,这里ChannelPromise和Channel关联到了一块儿public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));}public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}// 剩下这些函数都是用于调度任务public final void executeAfterEventLoopIteration(Runnable task) {ObjectUtil.checkNotNull(task, "task");if(this.isShutdown()) {reject();}if(!this.tailTasks.offer(task)) {this.reject(task);}if(this.wakesUpForTask(task)) {this.wakeup(this.inEventLoop());}}final boolean removeAfterEventLoopIterationTask(Runnable task) {return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));}protected boolean wakesUpForTask(Runnable task) {return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable);}protected void afterRunningAllTasks() {this.runAllTasksFrom(this.tailTasks);}protected boolean hasTasks() {return super.hasTasks() || !this.tailTasks.isEmpty();}public int pendingTasks() {return super.pendingTasks() + this.tailTasks.size();}interface NonWakeupRunnable extends Runnable {}} | | --- | --- |
为了确保一个Channel的整个生命周期中的I/O事件会被一个EventLoop负责,Netty经过inEventLoop()方法来判断当前执行的线程的身份,肯定它是不是分配给当前Channel以及它的EventLoop的那一个线程。若是当前(调用)线程正是EventLoop中的线程,那么所提交的任务将会被直接执行,不然,EventLoop将调度该任务以便稍后执行,并将它放入内部的任务队列(每一个EventLoop都有它本身的任务队列,从SingleThreadEventLoop的源码就能发现不少用于调度内部任务队列的方法),在下次处理它的事件时,将会执行队列中的那些任务。这种设计可让任何线程与Channel直接交互,而无需在ChannelHandler中进行额外的同步。
从性能上来考虑,千万不要将一个须要长时间来运行的任务放入到任务队列中,它会影响到该队列中的其余任务的执行。解决方案是使用一个专门的EventExecutor来执行它(ChannelPipeline提供了带有EventExecutorGroup参数的addXXX()方法,该方法能够将传入的ChannelHandler绑定到你传入的EventExecutor之中),这样它就会在另外一条线程中执行,与其余任务隔离。
12345678910111213141516171819202122232425262728293031 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {.....public void execute(Runnable task) {if(task == null) {throw new NullPointerException("task");} else {boolean inEventLoop = this.inEventLoop();if(inEventLoop) {this.addTask(task);} else {this.startThread();this.addTask(task);if(this.isShutdown() && this.removeTask(task)) {reject();}}if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {this.wakeup(inEventLoop);}}}public boolean inEventLoop(Thread thread) {return thread == this.thread;}.....} |
---|
EventLoopGroup负责管理和分配EventLoop(建立EventLoop和为每一个新建立的Channel分配EventLoop),根据不一样的传输类型,EventLoop的建立和分配方式也不一样。例如,使用NIO传输类型,EventLoopGroup就会只使用较少的EventLoop(一个EventLoop服务于多个Channel),这是由于NIO基于I/O多路复用,一个线程能够处理多个链接,而若是使用的是OIO,那么新建立一个Channel(链接)就须要分配一个EventLoop(线程)。
在深刻了解地Netty的核心组件以后,发现它们的设计都很模块化,若是想要实现你本身的应用程序,就须要将这些组件组装到一块儿。Netty经过Bootstrap类,以对一个Netty应用程序进行配置(组装各个组件),并最终使它运行起来。对于客户端程序和服务器程序所使用到的Bootstrap类是不一样的,后者须要使用ServerBootstrap,这样设计是由于,在如TCP这样有链接的协议中,服务器程序每每须要一个以上的Channel,经过父Channel来接受来自客户端的链接,而后建立子Channel用于它们之间的通讯,而像UDP这样无链接的协议,它不须要每一个链接都建立子Channel,只须要一个Channel便可。
一个比较明显的差别就是Bootstrap与ServerBootstrap的group()方法,后者提供了一个接收2个EventLoopGroup的版本。
12345678910111213141516171819202122232425262728 | // 该方法在Bootstrap的父类AbstractBootstrap中,泛型B为它当前子类的类型(为了链式调用)public B group(EventLoopGroup group) {if(group == null) {throw new NullPointerException("group");} else if(this.group != null) {throw new IllegalStateException("group set already");} else {this.group = group;return this;}}// ServerBootstrap中的实现,它也支持只用一个EventLoopGrouppublic ServerBootstrap group(EventLoopGroup group) {return this.group(group, group);}public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if(childGroup == null) {throw new NullPointerException("childGroup");} else if(this.childGroup != null) {throw new IllegalStateException("childGroup set already");} else {this.childGroup = childGroup;return this;}} |
---|
Bootstrap其实没有什么能够好说的,它就只是一个装配工,将各个组件拼装组合到一块儿,而后进行一些配置,有关它的详细API请参考Netty JavaDoc。下面咱们将经过一个经典的Echo客户端与服务器的例子,来梳理一遍建立Netty应用的流程。
首先实现的是服务器,咱们先实现一个EchoServerInboundHandler,处理入站消息。
1234567891011121314151617181920212223242526 | public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));// 因为读事件不是一次性就能把完整消息发送过来的,这里并无调用writeAndFlushctx.write(in); // 直接把消息写回给客户端(会被出站消息处理器处理,不过咱们的应用没有实现任何出站消息处理器)}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 等读事件已经完成时,冲刷以前写数据的缓冲区// 而后添加了一个监听器,它会在Future完成时进行关闭该Channel.ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}// 处理异常,输出异常信息,而后关闭Channel@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}} |
---|
服务器的应用逻辑只有这么多,剩下就是用ServerBootstrap进行配置了。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public void start() throws Exception {final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();EventLoopGroup group = new NioEventLoopGroup(); // 传输类型使用NIOtry {ServerBootstrap b = new ServerBootstrap();b.group(group) // 配置EventLoopGroup.channel(NioServerSocketChannel.class) // 配置Channel的类型.localAddress(new InetSocketAddress(port)) // 配置端口号.childHandler(new ChannelInitializer<SocketChannel>() {// 实现一个ChannelInitializer,它能够方便地添加多个ChannelHandler@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(serverHandler);}});// i绑定地址,同步等待它完成ChannelFuture f = b.bind().sync();// 关闭这个Futuref.channel().closeFuture().sync();} finally {// 关闭应用程序,通常来讲Netty应用只须要调用这个方法就够了group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {if (args.length != 1) {System.err.printf("Usage: %s <port> \n",EchoServer.class.getSimpleName());return;}int port = Integer.parseInt(args[0]);new EchoServer(port).start();}} |
---|
接下来实现客户端,一样须要先实现一个入站消息处理器。
1234567891011121314151617181920212223 | public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {/*** 咱们在Channel链接到远程节点直接发送一条消息给服务器*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {// 输出从服务器Echo的消息System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}} |
---|
而后配置客户端。
123456789101112131415161718192021222324252627282930313233343536373839404142 | public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)) // 服务器的地址.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientInboundHandler());}});ChannelFuture f = b.connect().sync(); // 链接到服务器f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());return;}String host = args[0];int port = Integer.parseInt(args[1]);new EchoClient(host, port).start();}} |
---|
实现一个Netty应用程序就是如此简单,用户大多数都是在编写各类应用逻辑的ChannelHandler(或者使用Netty内置的各类实用ChannelHandler),而后只须要将它们所有添加到ChannelPipeline便可。
微信公众号【黄小斜】做者是蚂蚁金服 JAVA 工程师,目前在蚂蚁财富负责后端开发工做,专一于 JAVA 后端技术栈,同时也懂点投资理财,坚持学习和写做,用大厂程序员的视角解读技术与互联网,个人世界里不仅有 coding!关注公众号后回复”架构师“便可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送做者原创的Java学习指南、Java程序员面试指南等干货资源