mina2中的session

简介

session类图java

Mina每创建一个链接同时会建立一个session对象,用于保存此次读写须要用到的全部信息。从抽象类AbstractIoSession中能够看出session具备以下功能:
一、从attributes成员能够看出session能够存放用户关心的键值对
二、注意到WriteRequestQueue,这是一个写请求队列,processor中调用flush或者flushNow方法时会将用户写入的数据包装成一个writeRequest对象,并加入这个队列中。
三、提供了大量的统计功能,好比接收到了多少消息、最后读取时间等
在代码中通常是这样使用session的
// 建立服务器监听  
IoAcceptor acceptor = new NioSocketAcceptor();  
// 设置buffer的长度  
acceptor.getSessionConfig().setReadBufferSize(2048);  
// 设置链接超时时间  
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  

session作为一个链接的具体对象,缓存当前链接用户的一些信息。

linux

建立好acceptor或者connector以后,经过IoSessionConfig对session对行配置。
用得最多的是经过session写入数据,这是调用了IoSession的write方法
WriteFuture write(Object message);  
WriteFuture write(Object message, SocketAddress destination);  

下面着重分析建立过程以及session的状态缓存

建立与初始化

每创建一个链接,就会建立一个session,IoAcceptor的accept方法的返回值正是一个session。见 NioSocketAcceptor .accept( IoProcessor < NioSession > processor, ServerSocketChannel handle)方法:
 protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch);
    }
由以上代码可知, session包含了对众多对象的引用,好比processor,socketChannel,SelectionKey,IoFilter等。
session在建立好后,紧接着就会对其进行初始化。 AbstractIoService .initSession( IoSession session, IoFuture future, IoSessionInitializer sessionInitializer)方法以下:
    protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
        // Update lastIoTime if needed.
        if (stats.getLastReadTime() == 0) {
            stats.setLastReadTime(getActivationTime());
        }

        if (stats.getLastWriteTime() == 0) {
            stats.setLastWriteTime(getActivationTime());
        }

        // Every property but attributeMap should be set now.
        // Now initialize the attributeMap.  The reason why we initialize
        // the attributeMap at last is to make sure all session properties
        // such as remoteAddress are provided to IoSessionDataStructureFactory.
        try {
            ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                    .getAttributeMap(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
        }

        try {
            ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
        }

        if ((future != null) && (future instanceof ConnectFuture)) {
            // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
        }

        if (sessionInitializer != null) {
            sessionInitializer.initializeSession(session, future);
        }

        finishSessionInitialization0(session, future);
    }
设置上次读写时间,初始化属性map和写请求队列等。
session被初始化好以后会加入到processor中,processor中有一个队列专门存放session:
例如:AbstractPollingIoProcessor.java中有:
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();

    /** A queue used to store the sessions to be removed */
    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();

    /** A queue used to store the sessions to be flushed */
    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();

    /**
     * A queue used to store the sessions which have a trafficControl to be
     * updated
     */
    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();

    /** The processor thread : it handles the incoming messages */
    private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
加入队列以后,processor就会从队列中取出session,如下是processor的run方法关键代码:
  1. private class Processor implements Runnable {  
         public void run() {  
             for (;;) {  
                 long t0 = System.currentTimeMillis();  
                 int selected = select(SELECT_TIMEOUT);  
                 long t1 = System.currentTimeMillis();  
                 long delta = (t1 - t0);  
      
                 if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
                     if (isBrokenConnection()) {  
                         wakeupCalled.getAndSet(false);  
                         continue;  
                     } else {  
                         registerNewSelector();  
                     }  
                     wakeupCalled.getAndSet(false);  
                     continue;  
                 }  
      
                 nSessions += handleNewSessions();  
      
                 updateTrafficMask();  
      
                 if (selected > 0) {  
                     process();  
                 }  
               
                 nSessions -= removeSessions();  
                              
             }  
         }  
     }  
一、不断地调用select方法来检查是否有session准备就绪,若是没有或者间隔时间小于100ms则检查selector是否可用,若是不可用从新建一个selector(这里linux下的epoll的问题可能致使selector不可用。)
二、从newSessions队列中取出这些session,并将其负责的通道注册到selector上
三、处理准备就绪的session
AbstractPollingIoProcessor.java
private void process(S session) {  
    // Process Reads  
    if (isReadable(session) && !session.isReadSuspended()) {  
        read(session);  
    }  
  
    // Process writes  
    if (isWritable(session) && !session.isWriteSuspended()) {  
        // add the session to the queue, if it's not already there  
        if (session.setScheduledForFlush(true)) {  
            flushingSessions.add(session);  
        }  
    }  
}
总结一下建立与初始化过程:链接到来建立一个session,初始化好以后加入到processor负责的一个队列中。processor线程会把队列中的session对应的通道都注册到它本身的selector上,而后这个selector轮询这些通道是否准备就绪,一旦准备就绪就调用对应方法进行处理(read or flushNow)。

状态

Mina中的session具备状态,且状态之间是能够相互转化的
Connected:session被建立时处于这种状态
Idle:没有请求能够处理(可配置)
Closing:正处于关闭状态(可能正在作一些清理工做)
Closed:关闭状态
下图是这几种状态之间的转化图:
IoFilter与IoHandler就是在这些状态上面加以干预,下面重点看一下IDLE状态,它分三种:
Idle for read:在规定时间内没有数据可读
Idle for write:在规定时间内没有数据可写
Idle for both:在规定时间内没有数据可读和可写
这三种状态分别对应IdleStatus类的三个常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE
前面session的用法中有以下设置:
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 
acceptor的run方法中调用了notifyIdleSessions
private void notifyIdleSessions( long currentTime )  
    {  
        // process idle sessions  
        if ( currentTime - lastIdleCheckTime >= 1000 )  
        {  
            lastIdleCheckTime = currentTime;  
            AbstractIoSession.notifyIdleness( getListeners().getManagedSessions().values().iterator(), currentTime );  
        }  
 } 
每隔一秒一检查是否到达了设置的空闲时间
  1. private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,  
            long lastIoTime) {  
        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {  
            session.getFilterChain().fireSessionIdle(status);  
        }  
    }  
若是当前时间减去上一次IDLE事件触发的时间大于用户设置的idleTime,则触发一次sessionIdle事件。
public void fireSessionIdle(IdleStatus status) {  
    session.increaseIdleCount(status, System.currentTimeMillis());  
    Entry head = this.head;  
    callNextSessionIdle(head, session, status);  
}  
increaseIdleCount这个方法会更新lastToTime的值为当前时间,紧接着穿透过滤器链(固然在过滤器的sessionIdle中可能会作一些操做)到达IoHandler的sessionIdle方法,若是须要在session空闲的时候作一些操做,就能够在这个方法里面作。
相关文章
相关标签/搜索