zookeeper之master选举

master选举

master选举,在分布式系统中是很是场景的一种应用场景,在集群中,有且只有一个master负责主要工做,其余slave做为备份,当master挂的时候,其余slave再竞争master。这样的一个场景,有两个特色:node

  • 只能一个服务器成为master
  • master挂了,其余slave可以知道并竞争master

zookeeper的几个特色,能够知足master选举:缓存

  • 强一致性:知足只能一个服务器成为master
  • 临时节点:master挂了,临时节点随着会话的关闭而消失
  • Watcher:监听节点的变化,master节点被删除了,就会被监听到

Curator流程图

image.png

  1. 建立节点
  2. 建立成功,判断是不是第一个节点,第一个节点是leader
  3. 不是master,重试策略继续监听上一个节点的状况

Curator源码

MasterSelect服务器

public class MasterSelect {
    CuratorFramework client = CuratorConnect.getCuratorClient2();
    private static final String path = "/master";

    public void masterSelect() {
        LeaderSelector leaderSelector = new LeaderSelector(client, path, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("Get master");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("Release master");
            }
        });
        // 自动加入队列
        leaderSelector.autoRequeue();
        leaderSelector.start();
        try {
            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new MasterSelect().masterSelect();
    }
}

运行结果:
image.pngsession

LeaderSelector源码分析

start

public void start()
{
    //cas操做,状态不是从LATENT到STARTED的,抛异常
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
    // 启动后,不能中止线程
    Preconditions.checkState(!executorService.isShutdown(), "Already started");
    // 判断当前是否Leader节点
    Preconditions.checkState(!hasLeadership, "Already has leadership");

   // 增长监听 
   client.getConnectionStateListenable().addListener(listener);
   // 入队列
    requeue();
}

 public boolean requeue()
{
    // 当前状态是不是STARTED
    Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
    // 入队列
    return internalRequeue();
}

private synchronized boolean internalRequeue()
{
    // 若是没有在队列,而且是STARTED状态
    if ( !isQueued && (state.get() == State.STARTED) )
    {
        // 设置已经在队列
        isQueued = true;
        Future<Void> task = executorService.submit(new Callable<Void>()
        {
            @Override
            public Void call() throws Exception
            {
                try
                {
                    doWorkLoop();
                }
                finally
                {
                    // 退出队列
                    clearIsQueued();
                    // 自动入队列
                    if ( autoRequeue.get() )
                    {
                        internalRequeue();
                    }
                }
                return null;
            }
        });
        ourTask.set(task);

        return true;
    }
    return false;
}

doWorkLoop

private void doWorkLoop() throws Exception
{
    KeeperException exception = null;
    try
    {
        doWork();
    }
    catch ( KeeperException.ConnectionLossException e )
    {
        exception = e;
    }
    catch ( KeeperException.SessionExpiredException e )
    {
        exception = e;
    }
    catch ( InterruptedException ignore )
    {
        // 线程中断,则忽略
        Thread.currentThread().interrupt();
    }
    if ( (exception != null) && !autoRequeue.get() )   // autoRequeue should ignore connection loss or session expired and just keep trying
    {
        throw exception;
    }
}

void doWork() throws Exception
{
    // 不是Leader节点
    hasLeadership = false;
    try
    {
        // 获取到锁
        mutex.acquire();
        // 是leader节点
        hasLeadership = true;
        try
        {
            if ( debugLeadershipLatch != null )
            {
                debugLeadershipLatch.countDown();
            }
            if ( debugLeadershipWaitLatch != null )
            {
                debugLeadershipWaitLatch.await();
            }
            // 调用takeLeadership方法
            listener.takeLeadership(client);
        }
        catch ( InterruptedException e )
        {
            Thread.currentThread().interrupt();
            throw e;
        }
        catch ( Throwable e )
        {
            ThreadUtils.checkInterrupted(e);
        }
        finally
        {
            // 退出队列
            clearIsQueued();
        }
    }
    catch ( InterruptedException e )
    {
        Thread.currentThread().interrupt();
        throw e;
    }
    finally
    {
        // 若是抢到leader
        if ( hasLeadership )
        {
            // 设置为非leader
            hasLeadership = false;
            // 是否中断
            boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediately
            try
            {
                // 释放资源
                mutex.release();
            }
            catch ( Exception e )
            {
                if ( failedMutexReleaseCount != null )
                {
                    failedMutexReleaseCount.incrementAndGet();
                }

                ThreadUtils.checkInterrupted(e);
                log.error("The leader threw an exception", e);
                // ignore errors - this is just a safety
            }
            finally
            {
                if ( wasInterrupted )
                {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

mutex.acquire

public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 获取节点
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        // 放入缓存
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            // 获取临时节点,建立失败,在catch中会经过重试机制建立
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }

        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
        {
            // 获取排序后的子节点
            List<String>        children = getSortedChildren();
            // 获取当前的节点
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            // 获取到predicateResults,predicateResults有两个值,是否获取锁,监听节点
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            // 获取到锁
            if ( predicateResults.getsTheLock() )
            {
                haveTheLock = true;
            }
            else
            {
                // 没有获取到锁,则监听
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                synchronized(this)
                {
                    try
                    {
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        // 监听,等到唤醒,watcher中会调用notifyAll
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            wait(millisToWait);
                        }
                        else
                        {
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{   
    // 获取节点的位置
    int             ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
    // maxLeases为1,说明若是不是第一个位置,则没获取到锁
    boolean         getsTheLock = ourIndex < maxLeases;
    String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

    return new PredicateResults(pathToWatch, getsTheLock);
}

mutex.release

public void release() throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

final void releaseLock(String lockPath) throws Exception
{
    // 移除监听
    client.removeWatchers();
    revocable.set(null);
    // 删除节点
    deleteOurPath(lockPath);
}