目标:介绍基于Mina的来实现的远程通讯、介绍dubbo-remoting-mina内的源码解析。java
Apache MINA是一个网络应用程序框架,可帮助用户轻松开发高性能和高可扩展性的网络应用程序。它经过Java NIO在各类传输(如TCP / IP和UDP / IP)上提供抽象的事件驱动异步API。它一般被称为NIO框架库、客户端服务器框架库或者网络套接字库。那么本问就要讲解在dubbo项目中,基于mina的API实现服务端和客户端来完成远程通信这件事情。git
下面是mina实现的包结构:github
该类继承了AbstractChannel,是基于mina实现的通道。segmentfault
private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
/** * 通道的key */
private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";
/** * mina中的一个句柄,表示两个端点之间的链接,与传输类型无关 */
private final IoSession session;
复制代码
该类的属性除了封装了一个CHANNEL_KEY之外,还用到了mina中的IoSession,它封装着一个链接所须要的方法,好比得到远程地址等。缓存
static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
// 若是链接session为空,则返回空
if (session == null) {
return null;
}
// 得到MinaChannel实例
MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
// 若是不存在,则建立
if (ret == null) {
// 建立一个MinaChannel实例
ret = new MinaChannel(session, url, handler);
// 若是两个端点链接
if (session.isConnected()) {
// 把新建立的MinaChannel添加到session 中
MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
// 若是属性的旧值不为空,则从新设置旧值
if (old != null) {
session.setAttribute(CHANNEL_KEY, old);
ret = old;
}
}
}
return ret;
}
复制代码
该方法是一个得到MinaChannel对象的方法,其中每个MinaChannel都会被放在session的属性值中。服务器
static void removeChannelIfDisconnected(IoSession session) {
if (session != null && !session.isConnected()) {
session.removeAttribute(CHANNEL_KEY);
}
}
复制代码
该方法是当没有链接时移除该通道,比较简单。网络
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息,返回future
WriteFuture future = session.write(message);
// 若是已经发送过了
if (sent) {
// 得到延迟时间
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待timeout的链接时间后查看是否发送成功
success = future.join(timeout);
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
复制代码
该方法是最关键的发送消息,其中调用到了session的write方法,就是mina封装的发送消息。而且根据返回的WriteFuture对象来判断是否发送成功。session
该类继承了IoHandlerAdapter,是通道处理器实现类,其中就是mina项目中IoHandler接口的几个 方法。app
/** * url对象 */
private final URL url;
/** * 通道处理器对象 */
private final ChannelHandler handler;
复制代码
该类有两个属性,上述提到的实现IoHandler接口方法都是调用了handler来实现的,我就举例讲一个,其余的都同样的写法:框架
@Override
public void sessionOpened(IoSession session) throws Exception {
// 得到MinaChannel对象
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 调用接连该通道
handler.connected(channel);
} finally {
// 若是没有链接则移除通道
MinaChannel.removeChannelIfDisconnected(session);
}
}
复制代码
该方法在IoHandler中叫作sessionOpened,其实就是链接方法,因此调用的是handler.connected。其余方法也同样,请自行查看。
该类继承了AbstractClient类,是基于mina实现的客户端类。
/** * 套接字链接集合 */
private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();
/** * 链接的key */
private String connectorKey;
/** * 套接字链接者 */
private SocketConnector connector;
/** * 一个句柄 */
private volatile IoSession session; // volatile, please copy reference to use
复制代码
该类中的属性都跟mina项目中封装类有关系。
@Override
protected void doOpen() throws Throwable {
// 用url来做为key
connectorKey = getUrl().toFullString();
// 先从集合中取套接字链接
SocketConnector c = connectors.get(connectorKey);
if (c != null) {
connector = c;
// 若是为空
} else {
// set thread pool. 设置线程池
connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
// config 得到套接字链接配置
SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// 启用TCP_NODELAY
cfg.getSessionConfig().setTcpNoDelay(true);
// 启用SO_KEEPALIVE
cfg.getSessionConfig().setKeepAlive(true);
int timeout = getConnectTimeout();
// 设置链接超时时间
cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
// set codec.
// 设置编解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 加入集合
connectors.put(connectorKey, connector);
}
}
复制代码
该方法是打开客户端,在mina中用套接字链接者connector来表示。其中的操做就是新建一个connector,而且设置相应的属性,而后加入集合。
@Override
protected void doConnect() throws Throwable {
// 链接服务器
ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
long start = System.currentTimeMillis();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
// 用于对线程的阻塞和唤醒
final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
// 加入监听器
future.addListener(new IoFutureListener() {
@Override
public void operationComplete(IoFuture future) {
try {
// 若是已经读完了
if (future.isReady()) {
// 建立得到该链接的IoSession实例
IoSession newSession = future.getSession();
try {
// Close old channel 关闭旧的session
IoSession oldSession = MinaClient.this.session; // copy reference
if (oldSession != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
}
// 关闭链接
oldSession.close();
} finally {
// 移除通道
MinaChannel.removeChannelIfDisconnected(oldSession);
}
}
} finally {
// 若是MinaClient关闭了
if (MinaClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new mina channel " + newSession + ", because the client closed.");
}
// 关闭session
newSession.close();
} finally {
MinaClient.this.session = null;
MinaChannel.removeChannelIfDisconnected(newSession);
}
} else {
// 设置新的session
MinaClient.this.session = newSession;
}
}
}
} catch (Exception e) {
exception.set(e);
} finally {
// 减小数量,释放全部等待的线程
finish.countDown();
}
}
});
try {
// 当前线程等待,直到锁存器倒计数到零,除非线程被中断,或者指定的等待时间过去
finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: " + e.getMessage(), e);
}
Throwable e = exception.get();
if (e != null) {
throw e;
}
}
复制代码
该方法是客户端链接服务器的实现方法。其中用到了CountDownLatch来表明完成完成事件,它来作一个线程等待,直到1个线程完成上述的动做,也就是链接完成结束,才释放等待的线程。保证每次只有一条线程去链接,解决future.awaitUninterruptibly()死锁问题。
其余方法请自行查看我写的注释。
该类继承了AbstractServer,是基于mina实现的服务端实现类。
private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
/** * 套接字接收者对象 */
private SocketAcceptor acceptor;
复制代码
@Override
protected void doOpen() throws Throwable {
// set thread pool.
// 建立套接字接收者对象
acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",
true)));
// config
// 设置配置
SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// set codec. 设置编解码器
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 开启服务器
acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
}
复制代码
该方法是建立服务器,而且打开服务器。关键就是调用了acceptor的方法。
@Override
protected void doClose() throws Throwable {
try {
if (acceptor != null) {
// 取消绑定,也就是关闭服务器
acceptor.unbind(getBindAddress());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
复制代码
该方法是关闭服务器,就是调用了acceptor.unbind方法。
@Override
public Collection<Channel> getChannels() {
// 得到链接到该服务器到全部链接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
Collection<Channel> channels = new HashSet<Channel>();
for (IoSession session : sessions) {
if (session.isConnected()) {
// 每次都用一个链接句柄建立一个通道
channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
}
}
return channels;
}
复制代码
该方法是得到全部链接该服务器的通道。
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
// 得到链接到该服务器到全部链接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
// 遍历全部句柄,找到要找的通道
for (IoSession session : sessions) {
if (session.getRemoteAddress().equals(remoteAddress)) {
return MinaChannel.getOrAddChannel(session, getUrl(), this);
}
}
return null;
}
复制代码
该方法是得到地址对应的单个通道。
public class MinaTransporter implements Transporter {
public static final String NAME = "mina";
@Override
public Server bind(URL url, ChannelHandler handler) throws RemotingException {
// 建立MinaServer实例
return new MinaServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
// 建立MinaClient实例
return new MinaClient(url, handler);
}
}
复制代码
该类实现了Transporter接口,是基于mina的传输层实现。能够看到,bind和connect方法分别就是建立了MinaServer和MinaClient实例。这里我建议查看一下《dubbo源码解析(九)远程通讯——Transport层》。
该类是基于mina实现的编解码类,实现了ProtocolCodecFactory。
/** * 编码对象 */
private final ProtocolEncoder encoder = new InternalEncoder();
/** * 解码对象 */
private final ProtocolDecoder decoder = new InternalDecoder();
/** * 编解码器 */
private final Codec2 codec;
/** * url对象 */
private final URL url;
/** * 通道处理器对象 */
private final ChannelHandler handler;
/** * 缓冲区大小 */
private final int bufferSize;
复制代码
属性比较好理解,该编解码器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder两个类是该类的内部类,它们实现了ProtocolEncoder和ProtocolDecoder,关键的编解码逻辑在这两个类中实现。
public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
// 若是缓存区大小在16字节之内,则设置配置大小,若是不是,则设置8字节的缓冲区大小
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
复制代码
private class InternalEncoder implements ProtocolEncoder {
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
// 动态分配一个1k的缓冲区
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
// 得到通道
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 编码
codec.encode(channel, buffer, msg);
} finally {
// 检测是否断开链接,若是断开,则移除
MinaChannel.removeChannelIfDisconnected(session);
}
// 写数据到out中
out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
out.flush();
}
}
复制代码
该内部类是编码类,其中的encode方法中写到了编码核心调用的是codec.encode。
private class InternalDecoder implements ProtocolDecoder {
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
@Override
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
int readable = in.limit();
if (readable <= 0) return;
ChannelBuffer frame;
// 若是缓冲区还有可读字节数
if (buffer.readable()) {
// 若是缓冲区是DynamicChannelBuffer类型的
if (buffer instanceof DynamicChannelBuffer) {
// 往buffer中写入数据
buffer.writeBytes(in.buf());
frame = buffer;
} else {
// 缓冲区大小
int size = buffer.readableBytes() + in.remaining();
// 动态分配一个缓冲区
frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
// buffer的数据把写到frame
frame.writeBytes(buffer, buffer.readableBytes());
// 把流中的数据写到frame
frame.writeBytes(in.buf());
}
} else {
// 不然是基于Java NIO的ByteBuffer生成的缓冲区
frame = ChannelBuffers.wrappedBuffer(in.buf());
}
// 得到通道
Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
Object msg;
int savedReadIndex;
try {
do {
// 得到读索引
savedReadIndex = frame.readerIndex();
try {
// 解码
msg = codec.decode(channel, frame);
} catch (Exception e) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw e;
}
// 拆包
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
frame.readerIndex(savedReadIndex);
break;
} else {
if (savedReadIndex == frame.readerIndex()) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw new Exception("Decode without read data.");
}
if (msg != null) {
// 把数据写到输出流里面
out.write(msg);
}
}
} while (frame.readable());
} finally {
// 若是frame还有可读数据
if (frame.readable()) {
//丢弃可读数据
frame.discardReadBytes();
buffer = frame;
} else {
buffer = ChannelBuffers.EMPTY_BUFFER;
}
MinaChannel.removeChannelIfDisconnected(session);
}
}
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
}
}
复制代码
该内部类是解码类,其中decode方法中关键的是调用了codec.decode,其他的操做是利用缓冲区对数据的冲刷流转。
该部分相关的源码解析地址:github.com/CrazyHZM/in…
该文章讲解了基于mina的来实现的远程通讯、介绍dubbo-remoting-mina内的源码解析,关键须要对mina有所了解。下一篇我会讲解基于netty3实现远程通讯部分。