zookeeper实现屏障_Barrier

zookeeper实现屏障_Barrierjava

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed. Barriers are implemented in ZooKeeper by designating a barrier node. The barrier is in place if the barrier node exists. Here's the pseudo code:node

  1. Client calls the ZooKeeper API's exists() function on the barrier node, with watch set to true.shell

  2. If exists() returns false, the barrier is gone and the client proceedsapache

  3. Else, if exists() returns true, the clients wait for a watch event from ZooKeeper for the barrier node.服务器

  4. When the watch event is triggered, the client reissues the exists( ) call, again waiting until the barrier node is removed.dom


根据官网的demo,本身的理解,加了相应的注释,这里把代码贴出来,以下,分布式

Barrieride

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;

/**
 * 继承watcher,实现分布式环境中不一样任务之间的同步处理(利用了Watcher机制的反向推送)。
 * 针对事件的触发使线程作出相应的处理,从而避免无谓的while(true),致使cpu空转。
 */
public class Barrier implements Watcher {

    private static final String addr = "127.0.0.1:2181";
    private ZooKeeper zk = null;
    private Integer mutex;
    private int size = 0;
    private String root;

    public Barrier(String root, int size) {
        this.root = root;
        this.size = size;

        try {
            zk = new ZooKeeper(addr, 10 * 1000, this);
            mutex = new Integer(-1);
            Stat s = zk.exists(root, false);
            if (s == null) {
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 当触发事件后,唤醒在mutex上等待的线程
     * 只要是zk服务器上节点的数据发生改变(无论哪一个zk client改变了数据),
     * 这里都会接收到相应的事件,从而唤醒相应的线程,作出相应的处理
     *
     * @param event
     */
    public synchronized void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 当新建znode时,首先持有mutex监视器才能进入同步代码块。
     * 当znode发生事件后,会触发process,从而唤醒在mutex上等待的线程。
     * 经过while循环判断建立的节点个数,当节点个数大于设定的值时,这个enter方法才执行完成。
     *
     * @param name
     * @return
     * @throws Exception
     */
    public boolean enter(String name) throws Exception {
        zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() < size) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }

    /**
     * 同理。对于leave方法,当delete znode时,触发事件,从而唤醒mutex上等待的线程,经过while循环
     * 判断节点的个数,当节点所有删除后,leave方法结束。
     * 从而使整个添加删除znode的线程结束
     *
     * @param name
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public boolean leave(String name) throws KeeperException, InterruptedException {
        zk.delete(root + "/" + name, 0);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }
}

BarrierTestthis

package com.usfot;

import java.util.Random;

public class BarrierTest {

    /**
     * 启动三个线程,也就对应着三个zk客户端
     *
     * @param args
     * @throws Exception
     */
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 3; i++) {
            Process p = new Process("Thread-" + i, new Barrier("/test_node", 3));
            p.start();
        }
    }
}

class Process extends Thread {

    private String name;
    private Barrier barrier;

    public Process(String name, Barrier barrier) {
        this.name = name;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            barrier.enter(name);
            System.out.println(name + " enter");
            Thread.sleep(1000 + new Random().nextInt(2000));
            barrier.leave(name);
            System.out.println(name + " leave");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

执行这段程序,以下,spa

Thread-1 enter
Thread-2 enter
Thread-0 enter
Thread-0 leave
Thread-1 leave
Thread-2 leave

Process finished with exit code 0

打开zk的client,以下,

[zk: localhost:2181(CONNECTED) 8] ls /
[testRootPath, test_node, mynode, zookeeper, zk_test0000000005, zk_test]
[zk: localhost:2181(CONNECTED) 9] get /test_node

cZxid = 0x800000051
ctime = Tue Mar 17 19:08:49 CST 2015
mZxid = 0x800000051
mtime = Tue Mar 17 19:08:49 CST 2015
pZxid = 0x800000062
cversion = 12
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 10] ls /test_node
[]
[zk: localhost:2181(CONNECTED) 11]

=====================END=====================

相关文章
相关标签/搜索