master选举,在分布式系统中是很是场景的一种应用场景,在集群中,有且只有一个master负责主要工做,其余slave做为备份,当master挂的时候,其余slave再竞争master。这样的一个场景,有两个特色:node
zookeeper的几个特色,能够知足master选举:缓存
MasterSelect服务器
public class MasterSelect { CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "/master"; public void masterSelect() { LeaderSelector leaderSelector = new LeaderSelector(client, path, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("Get master"); TimeUnit.SECONDS.sleep(3); System.out.println("Release master"); } }); // 自动加入队列 leaderSelector.autoRequeue(); leaderSelector.start(); try { TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new MasterSelect().masterSelect(); } }
运行结果:session
public void start() { //cas操做,状态不是从LATENT到STARTED的,抛异常 Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); // 启动后,不能中止线程 Preconditions.checkState(!executorService.isShutdown(), "Already started"); // 判断当前是否Leader节点 Preconditions.checkState(!hasLeadership, "Already has leadership"); // 增长监听 client.getConnectionStateListenable().addListener(listener); // 入队列 requeue(); } public boolean requeue() { // 当前状态是不是STARTED Preconditions.checkState(state.get() == State.STARTED, "close() has already been called"); // 入队列 return internalRequeue(); } private synchronized boolean internalRequeue() { // 若是没有在队列,而且是STARTED状态 if ( !isQueued && (state.get() == State.STARTED) ) { // 设置已经在队列 isQueued = true; Future<Void> task = executorService.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { doWorkLoop(); } finally { // 退出队列 clearIsQueued(); // 自动入队列 if ( autoRequeue.get() ) { internalRequeue(); } } return null; } }); ourTask.set(task); return true; } return false; }
private void doWorkLoop() throws Exception { KeeperException exception = null; try { doWork(); } catch ( KeeperException.ConnectionLossException e ) { exception = e; } catch ( KeeperException.SessionExpiredException e ) { exception = e; } catch ( InterruptedException ignore ) { // 线程中断,则忽略 Thread.currentThread().interrupt(); } if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying { throw exception; } } void doWork() throws Exception { // 不是Leader节点 hasLeadership = false; try { // 获取到锁 mutex.acquire(); // 是leader节点 hasLeadership = true; try { if ( debugLeadershipLatch != null ) { debugLeadershipLatch.countDown(); } if ( debugLeadershipWaitLatch != null ) { debugLeadershipWaitLatch.await(); } // 调用takeLeadership方法 listener.takeLeadership(client); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); throw e; } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); } finally { // 退出队列 clearIsQueued(); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); throw e; } finally { // 若是抢到leader if ( hasLeadership ) { // 设置为非leader hasLeadership = false; // 是否中断 boolean wasInterrupted = Thread.interrupted(); // clear any interrupted tatus so that mutex.release() works immediately try { // 释放资源 mutex.release(); } catch ( Exception e ) { if ( failedMutexReleaseCount != null ) { failedMutexReleaseCount.incrementAndGet(); } ThreadUtils.checkInterrupted(e); log.error("The leader threw an exception", e); // ignore errors - this is just a safety } finally { if ( wasInterrupted ) { Thread.currentThread().interrupt(); } } } } }
public void acquire() throws Exception { if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } } private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } // 获取节点 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // 放入缓存 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { // 获取临时节点,建立失败,在catch中会经过重试机制建立 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; } private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { // 获取排序后的子节点 List<String> children = getSortedChildren(); // 获取当前的节点 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash // 获取到predicateResults,predicateResults有两个值,是否获取锁,监听节点 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 获取到锁 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { // 没有获取到锁,则监听 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak // 监听,等到唤醒,watcher中会调用notifyAll client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 获取节点的位置 int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); // maxLeases为1,说明若是不是第一个位置,则没获取到锁 boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
public void release() throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } final void releaseLock(String lockPath) throws Exception { // 移除监听 client.removeWatchers(); revocable.set(null); // 删除节点 deleteOurPath(lockPath); }