java并发编程学习之CyclicBarrier这边提到了单机应用的Barrier,在分布式系统中,curator也实现了分布式Barrier。curator提供了DistributedBarrier和DistributedDoubleBarrier两种方式。java
MyDistributedBarrienode
public class MyDistributedBarrier { private static final String path = "/barrier"; static DistributedBarrier distributedBarrier; public static void distributedBarrier() throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework client = CuratorConnect.getCuratorClient2(); distributedBarrier = new DistributedBarrier(client,path); try { distributedBarrier.setBarrier(); System.out.println("准备中"); distributedBarrier.waitOnBarrier(); System.out.println("结束了"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } TimeUnit.SECONDS.sleep(5); distributedBarrier.removeBarrier(); } public static void main(String[] args) throws Exception { MyDistributedBarrier.distributedBarrier(); } }
运行结果以下:编程
大体流程:segmentfault
setBarrier,新增节点并发
public synchronized void setBarrier() throws Exception { try { client.create().creatingParentContainersIfNeeded().forPath(barrierPath); } catch ( KeeperException.NodeExistsException ignore ) { // ignore } }
waitOnBarrier分布式
public synchronized void waitOnBarrier() throws Exception { waitOnBarrier(-1, null); } public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX\_VALUE; boolean result; for(;;) { // 若是result为空,跳出循环,说明节点被删除了。没有为空,就继续监听 result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null); if ( result ) { break; } // 休眠,等待唤醒 if ( hasMaxWait ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { break; } wait(thisWaitMs); } else { wait(); } } return result; }
removeBarrier
删除节点,这个时候,会触发监听,唤醒waitOnBarrier的等待。ide
public synchronized void removeBarrier() throws Exception { try { client.delete().forPath(barrierPath); } catch ( KeeperException.NoNodeException ignore ) { // ignore } }
MyDistributedDoubleBarrier源码分析
public class MyDistributedDoubleBarrier { private static final String path = "double_barrier"; public static void distributedDoubleBarrier() throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework client = CuratorConnect.getCuratorClient2(); DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client,path,5); try { System.out.println("准备中"); doubleBarrier.enter(); System.out.println("开始了"); doubleBarrier.leave(); System.out.println("结束了"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) throws Exception { MyDistributedDoubleBarrier.distributedDoubleBarrier(); } }
运行结果以下:学习
主要思路就是在一个节点下面,建立预约数量的子节点,当子节点数量不超过预约节点时就堵塞,直到知足。
大概流程:this
public void enter() throws Exception { enter(-1, null); } public boolean enter(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; // 判断ready节点是否存在 boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null); // 建立临时节点 client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath); // 若是ready节点存在,就返回true,不存在调用internalEnter boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs)); if ( connectionLost.get() ) { throw new KeeperException.ConnectionLossException(); } return result; } private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception { boolean result = true; do { // 获取子节点的信息 List<String> children = getChildrenForEntering(); // 判断是否已经建立够子节点数量了 int count = (children != null) ? children.size() : 0; if ( count >= memberQty ) { try { //建立够了,则建立ready节点 client.create().forPath(readyPath); } catch ( KeeperException.NodeExistsException ignore ) { // ignore } break; } // 没建立够,休眠等待唤醒 if ( hasMaxWait && !hasBeenNotified.get() ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { result = false; } else { wait(thisWaitMs); } if ( !hasBeenNotified.get() ) { result = false; } } else { wait(); } } while ( false ); return result; }
主要思路,就是子节点删除完以前堵塞,删除完了再删除ready节点。
大概流程:
public synchronized void leave() throws Exception { leave(-1, null); } public synchronized boolean leave(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; return internalLeave(startMs, hasMaxWait, maxWaitMs); } private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception { String ourPathName = ZKPaths.getNodeFromPath(ourPath); boolean ourNodeShouldExist = true; boolean result = true; for(;;) { if ( connectionLost.get() ) { throw new KeeperException.ConnectionLossException(); } List<String> children; try { // 获取节点信息 children = client.getChildren().forPath(barrierPath); } catch ( KeeperException.NoNodeException dummy ) { children = Lists.newArrayList(); } // 过滤ready节点并排序 children = filterAndSortChildren(children); // 已经没有子节点了,则跳出循环 if ( (children == null) || (children.size() == 0) ) { break; } // 当前节点的位置 int ourIndex = children.indexOf(ourPathName); // 小于0,说明当前节点不在子节点里,抛异常 if ( (ourIndex < 0) && ourNodeShouldExist ) { if ( connectionLost.get() ) { break; // connection was lost but we've reconnected. However, our ephemeral node is gone } else { throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName)); } } // 节点数量为1,判断是不是当前节点,是的话则删除 if ( children.size() == 1 ) { if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) ) { throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName)); } checkDeleteOurPath(ourNodeShouldExist); break; } Stat stat; boolean IsLowestNode = (ourIndex == 0); // 当前节点是最小的,则监听最大节点是否被删除 if ( IsLowestNode ) { String highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1)); stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath); } else { // 不是最小的,则监听最小节点是否被删除 String lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0)); stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath); // 删除本身的节点 checkDeleteOurPath(ourNodeShouldExist); ourNodeShouldExist = false; } // 休眠等唤醒 if ( stat != null ) { if ( hasMaxWait ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { result = false; } else { wait(thisWaitMs); } } else { wait(); } } } try { client.delete().forPath(readyPath); } catch ( KeeperException.NoNodeException ignore ) { // ignore } return result; }