待完善java
Channel
、EventLoop
和ChannelFuture
这些类组合在一块儿,能够被认为是Netty网络抽象的表明:bootstrap
Channel
—Socket;EventLoop
—控制流、多线程处理、并发;ChannelFuture
—异步通知暂略promise
EventLoop
定义了Netty的核心抽象,用于处理链接的生命周期中所发生的事件。缓存
一个EventLoopGroup
包含一个或者多个EventLoop
;安全
一个EventLoop
在它的生命周期内只和一个Thread
绑定;服务器
全部由EventLoop
处理的I/O事件都将在它专有的Thread
上被处理;网络
一个Channel
在它的生命周期内只注册于一个EventLoop
;多线程
一个EventLoop
可能会被分配给一个或多个Channel
。并发
注意,在这种设计中,必定程度上消除了对于同步的须要。app
暂略
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline
.
ChannelHandler
public interface ChannelHandler {
// ChannelHandler添加到ChannelPipeline中时被调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// ChannelHandler从ChannelPipeline中移除时被调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 处理过程当中在ChannelPipeline中有错误产生时被调用
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码
ChannelHandler
层次结构以下图所示:
ChannelHandler
itself does not provide many methods, but you usually have to implement one of its subtypes:
ChannelInboundHandler
to handle inbound I/O events, andChannelOutboundHandler
to handle outbound I/O operations.Alternatively, the following adapter classes are provided for your convenience:
ChannelInboundHandlerAdapter
to handle inbound I/O events,ChannelOutboundHandlerAdapter
to handle outbound I/O operations, andChannelDuplexHandler
to handle both inbound and outbound eventsA ChannelHandler often needs to store some stateful information?
ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码
ChannelInboundHandlerAdapter
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
复制代码
ChannelInboundHandlerAdapter
的channelRead
方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可使用SimpleChannelInboundHandler
。
Usually, channelRead()
handler method is implemented like the following:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
复制代码
ChannelOutboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码
ChannelOutboundHandlerAdapter
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
复制代码
使ChannelHandler
可以与其ChannelPipeline
和其余处理程序进行交互。除其余事项外,处理程序能够通知ChannelPipeline
中的下一个ChannelHandler
,也能够动态修改它所属的ChannelPipeline
。
Notify
You can notify the closest handler in the same ChannelPipeline by calling one of the various methods provided here.
Modifying a pipeline
You can get the ChannelPipeline
your handler belongs to by calling pipeline()
. A non-trivial application could insert, remove, or replace handlers in the pipeline dynamically at runtime.
Retrieving for later use
You can keep the ChannelHandlerContext for later use, such as triggering an event outside the handler methods, even from a different thread.
public class MyHandler extends ChannelDuplexHandler {
private ChannelHandlerContext ctx;
public void beforeAdd(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void login(String username, password) {
ctx.write(new LoginMessage(username, password));
}
...
}
复制代码
Storing stateful information
attr(AttributeKey)
allow you to store and access stateful information that is related with a handler and its context. Please refer to ChannelHandler
to learn various recommended ways to manage stateful information.
A handler can have more than one context
Please note that a ChannelHandler
instance can be added to more than one ChannelPipeline
. It means a single ChannelHandler
instance can have more than one ChannelHandlerContext
and therefore the single instance can be invoked with different ChannelHandlerContexts
if it is added to one or more ChannelPipelines
more than once.
For example, the following handler will have as many independent AttributeKeys as how many times it is added to pipelines, regardless if it is added to the same pipeline multiple times or added to different pipelines multiple times:
public class FactorialHandler extends ChannelInboundHandlerAdapter {
private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");
// This handler will receive a sequence of increasing integers starting
// from 1.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Integer a = ctx.attr(counter).get();
if (a == null) {
a = 1;
}
attr.set(a * (Integer) msg);
}
}
// Different context objects are given to "f1", "f2", "f3", and "f4" even if
// they refer to the same handler instance. Because the FactorialHandler
// stores its state in a context object (using an AttributeKey), the factorial is
// calculated correctly 4 times once the two pipelines (p1 and p2) are active.
FactorialHandler fh = new FactorialHandler();
ChannelPipeline p1 = Channels.pipeline();
p1.addLast("f1", fh);
p1.addLast("f2", fh);
ChannelPipeline p2 = Channels.pipeline();
p2.addLast("f3", fh);
p2.addLast("f4", fh);
复制代码
ChannelPipeline
提供了ChannelHandler
链的容器,并定义了用于在该链上传播入站和出站事件流的API。当Channel
被建立时,它会被自动地分配到它专属的ChannelPipeline
。
Creation of a pipeline
==Each channel has its own pipeline and it is created automatically== when a new channel is created.
How an event flows in a pipeline
The following diagram describes how I/O events are processed by ChannelHandler
s in a ChannelPipeline
typically. An I/O event is handled by either a ChannelInboundHandler
or a ChannelOutboundHandler
and be forwarded to its closest handler by calling the event propagation methods defined in ChannelHandlerContext
, such as ChannelHandlerContext.fireChannelRead(Object)
and ChannelHandlerContext.write(Object)
.
I/O Request via Channel or
ChannelHandlerContext
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+ 复制代码
An inbound event is handled by the inbound handlers in the ==bottom-up direction== as shown on the left side of the diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the diagram. The inbound data is often read from a remote peer via the actual input operation such as SocketChannel.read(ByteBuffer)
. If an inbound event goes beyond the top inbound handler, it is discarded silently, or logged if it needs your attention.
An outbound event is handled by the outbound handler in the ==top-down direction== as shown on the right side of the diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the Channel. The I/O thread often performs the actual output operation such as SocketChannel.write(ByteBuffer)
.
Forwarding an event to the next handler
a handler has to invoke the event propagation methods in ChannelHandlerContext
to forward an event to its next handler. Those methods include:
Inbound event propagation methods:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Outbound event propagation methods:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
Building a pipeline
A user is supposed to have one or more ChannelHandler
s in a pipeline to receive I/O events (e.g. read) and to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the protocol and business logic:
ByteBuf
) into a Java object.and it could be represented as shown in the following example:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
复制代码
Thread safety
A ChannelHandler
can be added or removed at any time because a ChannelPipeline
is thread safe. For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it after the exchange.
在深刻地学习了ChannelPipeline
、ChannelHandler
和EventLoop
以后,你接下来 的问题多是:“如何将这些部分组织起来,成为一个可实际运行的应用程序呢?”
答案是?==“引导”(Bootstrapping)==。简单来讲,引导一个应用程序是指对它进行配置,并使它运行起来的过程—尽管该过程的具体细节可能并不如它的定义那样简单,尤为是对于一个网络应用程序来讲。
引导类层次结构
ServerBootstrap
和Bootstrap
分别做用于==服务器==和==客户端==。ServerBootstrap
致力于使用一个==父Channel==来接受来自客户端的链接,并建立==子Channel==以用于它们之间的通讯;而客户端只须要==一个单独的、没有父Channel的Channel==来用于全部的网络交互(这也适用于无链接的传输协议,如UDP,由于它们并非每一个链接都须要一个单独的Channel)。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 复制代码
子类型B
是其父类型的一个类型参数,所以能够返回到运行时实例的引用以 支持方法的链式调用(也就是所谓的流式语法)
为何引导类是
Cloneable
?
你有时可能会须要建立多个具备相似配置或者彻底相同配置的 Channel 。为了支持这种模式而又不 需 要 为 每 个 Channel 都 创 建 并 配 置 一 个 新 的 引 导 类 实 例 , AbstractBootstrap 被 标 记 为 了 Cloneable 。在一个已经配置完成的引导类实例上调用 clone() 方法将返回另外一个能够当即使用的引 导类实例。
注意,这种方式只会建立引导类实例的 EventLoopGroup 的一个浅拷贝,因此,后者 将在全部克 隆的 Channel 实例之间共享。这是能够接受的,由于一般这些克隆的 Channel 的生命周期都很短暂,一 个典型的场景是——建立一个 Channel 以进行一次HTTP请求。
Bootstrap
类被用于客户端或者使用了无链接协议的应用程序中。Bootstrap
类的API以下:
Bootstrap group(EventLoopGroup)
设置用于处理Channel全部事件的EventLoopGroup
Bootstrap channel( Class<? extends C>)
Bootstrap channelFactory(ChannelFactory<? extends C>)
channel()
方法指定了Channel的实现类。若是该实现类没提供默认的构造函数 , 能够经过调用channelFactory()
方法来指定一个工厂类,它将会被bind()
方法调用。
Bootstrap localAddress(SocketAddress)
指定Channel应该绑定到的本地地址。若是没有指定,则将由操做系统建立一个随机的地址。或者,也能够经过bind()
或者connect()
方法指定localAddress。
<T> Bootstrap option(ChannelOption<T> option, T value)
设置ChannelOption
, 其将被应用到每一个新建立的Channel的ChannelConfig。 这些选项将会经过bind()
或者connect()
方法设置到Channel ,无论哪一个先被调用。这个方法在Channel已经被建立后再调用将不会有任何的效果。支持的ChannelOption取决于使用的 Channel类型。
<T> Bootstrap attr( Attribute<T> key, T value)
指定新建立的Channel的属性值。这些属性值是经过bind()
或者connect()
方法设置到Channel的,具体取决于谁最早被调用。这个方法在Channel被建立后将不会有任何的效果。
Bootstrap handler(ChannelHandler)
设置将被添加到ChannelPipeline
以接收事件通知的ChannelHandler
。
Bootstrap clone()
建立一个当前Bootstrap的克隆,其具备和原始的Bootstrap相同的设置信息。
Bootstrap remoteAddress(SocketAddress)
设置远程地址。或者,也能够经过connect()
方法来指定它。
ChannelFuture connect()
链接到远程节点并返回一个ChannelFuture,其将会在链接操做完成后接收到通知。
ChannelFuture bind()
绑定Channel并返回一个ChannelFuture,其将会在绑定操做完成后接收到通知,在那以后必须调用Channel.connect()
方法来创建链接。
Bootstrap
类负责为客户端和使用无链接协议的应用程序建立Channel,如图所示:
在引导的过程当中,在调用
bind()
或者connect()
方法以前,必须调用如下方法来设置所需的组件:
- group();
- channel()或者channelFactory();
- handler()
若是不这样作,则将会致使IllegalStateException 。对handler()方法的调用尤为重要,由于它须要配置好ChannelPipeline 。
ServerBootstrap
的API以下:
group
设置ServerBootstrap要用的EventLoopGroup。这个EventLoopGroup将用于ServerChannel和被接受的子Channel的I/O处理。
channel
设置将要被实例化的ServerChannel类。
channelFactory
若是不能经过默认的构造函数建立Channel,那么能够提供一个ChannelFactory。
localAddress
指定ServerChannel应该绑定到的本地地址。若是没有指定,则将由操做系统使用一个随机地址。或者,能够经过bind()
方法来指定该localAddress。
option
指定要应用到新建立的ServerChannel的ChannelConfig的ChannelOption
。这些选项将会经过bind()
方法设置到Channel。在bind()
方法被调用以后,设置或者改变 ChannelOption
都不会有任何的效果。所支持的ChannelOption取决于所使用的Channel类型。
childOption
指定当子Channel
被接受时,应用到==子Channel的ChannelConfig==的ChannelOption
。所支持的ChannelOption取决于所使用的Channel的类型。
attr
指定ServerChannel
上的属性,属性将会经过bind()
方法设置给Channel。在调用 bind()
方法以后改变它们将不会有任何的效果
childAttr
将属性设置给已经被接受的子Channel
。
handler
设置被添加到ServerChannel的ChannelPipeline中的ChannelHandler。
childHandler
设置将被添加到已被接受的子Channel的ChannelPipeline中的ChannelHandler。 handler()
方法和childHandler()
方法之间的区别是:前者所添加的ChannelHandler由接受子Channel的ServerChannel处理,而childHandler()方法所添加ChannelHandler将由已被接受的子Channel处理,其表明一个绑定到远程节点的套接字。
clone
克隆一个设置和原始的ServerBootstrap相同的ServerBootstrap。
bind
绑定ServerChannel而且返回一个ChannelFuture成后收到通知(带着成功或者失败的结果)
ServerBootstrap在bind()
方法被调用时建立了一个ServerChannel
,而且该ServerChannel
管理了多个子Channel
。
从Channel引导客户端?
在引导的过程当中调用了handler()
或者childHandler()
方法来添加单个的ChannelHandler
。对于简单的应用程序来讲可能已经足够了,可是它不能知足更加复杂的需求。
能够经过在ChannelPipeline
中将它们连接在一块儿来部署尽量多的ChannelHandler
,Netty提供了一个特殊的ChannelInitializer
类。
A special
ChannelInboundHandler
which offers an easy way to initialize aChannel
once it was registered to itsEventLoop
. Implementations are most often used in the context ofBootstrap.handler(ChannelHandler)
,ServerBootstrap.handler(ChannelHandler)
andServerBootstrap.childHandler(ChannelHandler)
to setup theChannelPipeline
of aChannel
.
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter 复制代码
它定义了下面的方法:
protected abstract void initChannel(C ch) throws Exception;
复制代码
这个方法提供了一种将多个ChannelHandler
添加到一个ChannelPipeline
中的简便方法。只须要简单地向Bootstrap
或ServerBootstrap
的实例提供你的ChannelInitializer
实现便可,而且一旦Channel
被注册到了它的EventLoop
以后,就会调用你的initChannel
方法。在该方法返回以后,ChannelInitializer
的实例将会从ChannelPipeline
中移除它本身。
示例代码以下:
public class MyChannelInitializer extends ChannelInitializer {
public void initChannel(Channel channel) {
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());
...
复制代码
使用option()
方法能够将ChannelOption
应用到引导,你所提供的值将会被自动应用到引导所建立的全部Channel
(这样就能够不用在每一个Channel建立时都手动配置它。)。可用的ChannelOption
包括了底层链接的详细信息,如keep-alive或者超时属性以及缓存区设置。
Netty应用程序一般与组织的专有软件集成在一块儿,而像Channel
这样的组件可能甚至会在正常的Netty生命周期以外被使用。 在某些经常使用的属性和数据不可用时, Netty提供了 AttributeMap
抽象(一个由Channel
和引导类提供的集合)以及AttributeKey<T>
(一个用于插入和获取属性值的泛型类)。使用这些工具,即可以安全地将任何类型的数据项与客户端和服务器Channel
(包含ServerChannel的子Channel)相关联了。
有点重要?
优雅是指干净地释放资源。关闭Netty应用程序并无太多的魔法,最重要的是须要关闭EventLoopGroup
,它将处理任何挂起的事件和任务,而且随后释放全部活动的线程。
// 建立处理 I/O 的 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
// 建立一个 Bootstrap 类的实例并配置它
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
复制代码
每当经过调用 ChannelInboundHandler.channelRead()或者 ChannelOutbound- Handler.write()方法来处理数据时,你都须要确保没有任何的资源泄漏。
为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector1, 它将对你应用程序的缓冲区分配作大约 1%的采样来检测内存泄露。