zookeeper源码 — 4、session创建

目录java

  • session创建的主要过程
  • 客户端发起链接
  • 服务端建立session

session创建的主要过程

用一张图来讲明session创建过程当中client和server的交互算法

主要流程apache

  • 服务端启动,客户端启动
  • 客户端发起socket链接
  • 服务端accept socket链接,socket链接创建
  • 客户端发送ConnectRequest给server
  • server收到后初始化ServerCnxn,表明一个和客户端的链接,即session,server发送ConnectResponse给client
  • client处理ConnectResponse,session创建完成

客户发起链接

和server创建socket链接

客户端要发起链接要先启动,不管是使用curator client仍是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper网络

Zookeeper初始化的主要工做是初始化本身的一些关键组件session

  • Watcher,外部构造好传入
  • 初始化StaticHostProvider,决定客户端选择链接哪个server
  • ClientCnxn,客户端网络通讯的组件,主要启动逻辑就是启动这个类

ClientCnxn包含两个线程框架

  • SendThread,负责client端消息的发送和接收
  • EventThread,负责处理event

ClientCnxn初始化的过程就是初始化启动这两个线程,客户端发起链接的主要逻辑在SendThread线程中异步

// org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = System.currentTimeMillis();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            // client是否链接到server,若是没有链接到则链接server
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                if (closing || !state.isAlive()) {
                    break;
                }
                // 这个里面去链接server
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            // 省略中间代码...
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
            // 省略中间代码...
}

SendThread#run是一个while循环,只要client没有被关闭会一直循环,每次循环判断当前client是否链接到server,若是没有则发起链接,发起链接调用了startConnectsocket

private void startConnect() throws IOException {
    state = States.CONNECTING;

    InetSocketAddress addr;
    if (rwServerAddress != null) {
        addr = rwServerAddress;
        rwServerAddress = null;
    } else {
        // 经过hostProvider来获取一个server地址
        addr = hostProvider.next(1000);
    }
    // 省略中间代码...

    // 创建client与server的链接
    clientCnxnSocket.connect(addr);
}

到这里client发起了socket链接,server监听的端口收到client的链接请求后和client创建链接。ide

经过一个request来创建session链接

socket链接创建后,client会向server发送一个ConnectRequest来创建session链接。两种状况会发送ConnectRequestthis

  • 在上面的connect方法中会判断是否socket已经创建成功,若是创建成功就会发送ConnectRequest
  • 若是socket没有当即创建成功(socket链接创建是异步的),则发送这个packet要延后到doTransport中

发送ConnectRequest是在下面的方法中

// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                                               sessionTimeout, sessId, sessionPasswd);
        // 省略中间代码...
        // 将conReq封装为packet放入outgoingQueue等待发送
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                                          null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                  + clientCnxnSocket.getRemoteSocketAddress());
    }
}

请求中带的参数

  • lastZxid:上一个事务的id
  • sessionTimeout:client端配置的sessionTimeout
  • sessId:sessionId,若是以前创建过链接取的是上一次链接的sessionId
  • sessionPasswd:session的密码

服务端建立session

和client创建socket链接

在server启动的过程当中除了会启动用于选举的网络组件还会启动用于处理client请求的网络组件

org.apache.zookeeper.server.NIOServerCnxnFactory

主要启动了三个线程:

  • AcceptThread:用于接收client的链接请求,创建链接后交给SelectorThread线程处理
  • SelectorThread:用于处理读写请求
  • ConnectionExpirerThread:检查session链接是否过时

client发起socket链接的时候,server监听了该端口,接收到client的链接请求,而后把创建练级的SocketChannel放入队列里面,交给SelectorThread处理

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    wakeupSelector();
    return true;
}

创建session链接

SelectorThread是一个不断循环的线程,每次循环都会处理刚刚创建的socket链接

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
    try {
        select();
        //  处理对立中的socket
        processAcceptedConnections();
        processInterestOpsUpdateRequests();
    } catch (RuntimeException e) {
        LOG.warn("Ignoring unexpected runtime exception", e);
    } catch (Exception e) {
        LOG.warn("Ignoring unexpected exception", e);
    }
}

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            // 向该socket注册读事件
            key = accepted.register(selector, SelectionKey.OP_READ);
            // 建立一个NIOServerCnxn维护session
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        // 省略中间代码...
    }
}

说了这么久,咱们说的session到底是什么尚未解释,session中文翻译是会话,在这里就是zk的server和client维护的一个具备一些特别属性的网络链接,网络链接这里就是socket链接,一些特别的属性包括

  • sessionId:惟一标示一个会话
  • sessionTimeout:这个链接的超时时间,超过这个时间server就会把链接断开

因此session创建的两步就是

  • 创建socket链接
  • client发起创建session请求,server创建一个实例来维护这个链接

server收到ConnectRequest以后,按照正常处理io的方式处理这个request,server端的主要操做是

  • 反序列化为ConnectRequest
  • 根据request中的sessionId来判断是新的session链接仍是session重连
    • 若是是新链接
      • 生成sessionId
      • 建立新的SessionImpl并放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
      • 封装该请求为新的request在processorChain中传递,最后交给FinalRequestProcessor处理
    • 若是是重连
      • 关闭sessionId对应的原来的session
        • 关闭原来的socket链接
        • sessionImp会在sessionExpiryQueue中因为过时被清理
      • 从新打开一个session
        • 将原来的sessionId设置到当前的NIOServerCnxn实例中,做为新的链接的sessionId
        • 校验密码是否正确密码错误的时候直接返回给客户端,不可用的session
        • 密码正确的话,新建SessionImpl
        • 返回给客户端sessionId

整体流程是

其中有一个session生成算法咱们来看下

public static long initializeNextSession(long id) {
    // sessionId是long类型,共8个字节,64位
    long nextSid;
    // 取时间戳的的低40位做为初始化sessionId的第16-55位,这里使用的是无符号右移,不会出现负数
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    // 使用serverId(配置文件中指定的myid)做为高8位
    nextSid =  nextSid | (id <<56);
    // nextSid为long的最小值,这中状况不可能出现,这里只是做为一个case列在这里
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}

初始化sessionId的组成

myid(1字节)+截取的时间戳低40位(5个字节)+2个字节(初始化都是0)

每一个server再基于这个id不断自增,这样的算法就保证了每一个server的sessionId是全局惟一的。

总结

session在zk框架中是一个重要概念,不少功能都依赖于session,好比临时节点,session关闭后就自动删除了。本文主要介绍了session的创建过程当中client和server各自的处理方式。

相关文章
相关标签/搜索