-----------------破镜重圆,坚持不懈!html
Zookeeper是Hadoop的一个子项目,它是分布式系统中的协调系统,可提供的服务主要有:配置服务、名字服务、分布式同步、组服务等。java
它有以下的一些特色:node
Zookeeper的核心是一个精简的文件系统,它支持一些简单的操做和一些抽象操做,例如,排序和通知。apache
Zookeeper的原语操做是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”。api
Zookeeper支持集群模式,能够很容易的解决单点故障问题。tomcat
不一样进程间的交互不须要了解彼此,甚至能够没必要同时存在,某进程在zookeeper中留下消息后,该进程结束后其它进程还能够读这条消息。服务器
Zookeeper实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议。数据结构
Zookeeper的运行环境是须要java的,建议安装oracle的java6.oracle
可去官网下载一个稳定的版本,而后进行安装:http://zookeeper.apache.org/异步
解压后在zookeeper的conf目录下建立配置文件zoo.cfg,里面的配置信息可参考统计目录下的zoo_sample.cfg文件,咱们这里配置为:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper-data/ clientPort=2181
tickTime:指定了ZooKeeper的基本时间单位(以毫秒为单位);
initLimit:指定了启动zookeeper时,zookeeper实例中的随从实例同步到领导实例的初始化链接时间限制,超出时间限制则链接失败(以tickTime为时间单位);
syncLimit:指定了zookeeper正常运行时,主从节点之间同步数据的时间限制,若超过这个时间限制,那么随从实例将会被丢弃;
dataDir:zookeeper存放数据的目录;
clientPort:用于链接客户端的端口。
% zkServer.sh start
检查ZooKeeper是否正在运行
echo ruok | nc localhost 2181
如果正常运行的话会打印“imok”。
默认状况下,zookeeper是支持本地的jmx监控的。若须要远程监控zookeeper,则须要进行进行以下配置。
默认的配置有这么一行:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
我们在$JMXLOCALONLY后边添加jmx的相关参数配置:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY -Djava.rmi.server.hostname=192.168.1.8 -Dcom.sun.management.jmxremote.port=1911 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false org.apache.zookeeper.server.quorum.QuorumPeerMain"
这样就能够远程监控了,能够用jconsole.exe或jvisualvm.exe等工具对其进行监控。
这里没有配置验证信息,若是须要请参见个人博文jvisualvm远程监控tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html
Zookeeper的数据存储采用的是结构化存储,结构化存储是没有文件和目录的概念,里边的目录和文件被抽象成了节点(node),zookeeper里能够称为znode。Znode的层次结构以下图:
最上边的是根目录,下边分别是不一样级别的子目录。
可以使用./zkCli.sh -server localhost来链接到Zookeeper服务上。
使用ls /可查看根节点下有哪些子节点,能够双击Tab键查看更多命令。
可建立org.apache.zookeeper.ZooKeeper对象来做为zk的客户端,注意,java api里建立zk客户端是异步的,为防止在客户端还未完成建立就被使用的状况,这里可使用同步计时器,确保zk对象建立完成再被使用。
可使用zhandle_t指针来表示zk客户端,可用zookeeper_init方法来建立。可在ZK_HOME\src\c\src\ cli.c查看部分示例代码。
Znode有两种类型:短暂的和持久的。短暂的znode在建立的客户端与服务器端断开(不管是明确的断开仍是故障断开)链接时,该znode都会被删除;相反,持久的znode则不会。
public class CreateGroup implements Watcher{ private static final int SESSION_TIMEOUT = 1000;//会话延时 private ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1);//同步计数器 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ countDownLatch.countDown();//计数器减一 } } /** * 建立zk对象 * 当客户端链接上zookeeper时会执行process(event)里的countDownLatch.countDown(),计数器的值变为0,则countDownLatch.await()方法返回。 * @param hosts * @throws IOException * @throws InterruptedException */ public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await();//阻塞程序继续执行 } /** * 建立group * * @param groupName 组名 * @throws KeeperException * @throws InterruptedException */ public void create(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*容许任何客户端对该znode进行读写*/, CreateMode.PERSISTENT/*持久化的znode*/); System.out.println("Created " + createPath); } /** * 关闭zk * @throws InterruptedException */ public void close() throws InterruptedException { if(zk != null){ try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
这里咱们使用了同步计数器CountDownLatch,在connect方法中建立执行了zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);以后,下边接着调用了CountDownLatch对象的await方法阻塞,由于这是zk客户端不必定已经完成了与服务端的链接,在客户端链接到服务端时会触发观察者调用process()方法,咱们在方法里边判断一下触发事件的类型,完成链接后计数器减一,connect方法中解除阻塞。
还有两个地方须要注意:这里建立的znode的访问权限是open的,且该znode是持久化存储的。
测试类以下:
public class CreateGroupTest { private static String hosts = "192.168.1.8"; private static String groupName = "zoo"; private CreateGroup createGroup = null; /** * init * @throws InterruptedException * @throws KeeperException * @throws IOException */ @Before public void init() throws KeeperException, InterruptedException, IOException { createGroup = new CreateGroup(); createGroup.connect(hosts); } @Test public void testCreateGroup() throws KeeperException, InterruptedException { createGroup.create(groupName); } /** * 销毁资源 */ @After public void destroy() { try { createGroup.close(); createGroup = null; System.gc(); } catch (InterruptedException e) { e.printStackTrace(); } } }
因为zk对象的建立和销毁代码是能够复用的,因此这里咱们把它分装成了接口:
/** * 链接的观察者,封装了zk的建立等 * @author leo * */ public class ConnectionWatcher implements Watcher { private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1); public void process(WatchedEvent event) { KeeperState state = event.getState(); if(state == KeeperState.SyncConnected){ countDownLatch.countDown(); } } /** * 链接资源 * @param hosts * @throws IOException * @throws InterruptedException */ public void connection(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await(); } /** * 释放资源 * @throws InterruptedException */ public void close() throws InterruptedException { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
/** * 删除分组 * @author leo * */ public class DeleteGroup extends ConnectionWatcher { public void delete(String groupName) { String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); for(String child : children){ zk.delete(path + "/" + child, -1); } zk.delete(path, -1);//版本号为-1, } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
zk.delete(path,version)方法的第二个参数是znode版本号,若是提供的版本号和znode版本号一致才会删除这个znode,这样能够检测出对znode的修改冲突。经过将版本号设置为-1,能够绕过这个版本检测机制,不管znode的版本号是什么,都会直接将其删除。
测试类:
public class DeleteGroupTest { private static final String HOSTS = "192.168.1.137"; private static final String groupName = "zoo"; private DeleteGroup deleteGroup = null; @Before public void init() throws IOException, InterruptedException { deleteGroup = new DeleteGroup(); deleteGroup.connection(HOSTS); } @Test public void testDelete() throws IOException, InterruptedException, KeeperException { deleteGroup.delete(groupName); } @After public void destroy() throws InterruptedException { if(null != deleteGroup){ try { deleteGroup.close(); } catch (InterruptedException e) { throw e; }finally{ deleteGroup = null; System.gc(); } } } }
ZooKeeper中共有9中操做:
create:建立一个znode
delete:删除一个znode
exists:测试一个znode
getACL,setACL:获取/设置一个znode的ACL(权限控制)
getChildren:获取一个znode的子节点
getData,setData:获取/设置一个znode所保存的数据
sync:将客户端的znode视图与ZooKeeper同步
这里更新数据是必需要提供znode的版本号(也可使用-1强制更新,这里能够执行前经过exists方法拿到znode的元数据Stat对象,而后从Stat对象中拿到对应的版本号信息),若是版本号不匹配,则更新会失败。所以一个更新失败的客户端能够尝试是否重试或执行其它操做。
ZooKeeper的api支持多种语言,在操做时能够选择使用同步api仍是异步api。同步api通常是直接返回结果,异步api通常是经过回调来传送执行结果的,通常方法中有某参数是类AsyncCallback的内部接口,那么该方法应该就是异步调用,回调方法名为processResult。
能够对客户端和服务器端之间的链接设置观察触发器(后边称之为zookeeper的状态观察触发器),也能够对znode设置观察触发器。
zk的整个生命周期以下:
可在建立zk对象时传入一个观察器,在完成CONNECTING状态到CONNECTED状态时,观察器会触发一个事件,该触发的事件类型为NONE,经过event.getState()方法拿到事件状态为SyncConnected。有一点须要注意的就是,在zk调用close方法时不会触发任何事件,由于这类的显示调用是开发者主动执行的,属于可控的,不用使用事件通知来告知程序。这一块在下篇博文还会详细解说。
能够在读操做exists、getChildren和getData上设置观察,在执行写操做create、delete和setData将会触发观察事件,固然,在执行写的操做时,也能够选择是否触发znode上设置的观察器,具体可查看相关的api。
当观察的znode被建立、删除或其数据被更新时,设置在exists上的观察将会被触发;
当观察的znode被删除或数据被更新时,设置在getData上的观察将会被触发;
当观察的znode的子节点被建立、删除或znode自身被删除时,设置在getChildren上的观察将会被触发,可经过观察事件的类型来判断被删除的是znode仍是它的子节点。
对于NodeCreated和NodeDeleted根据路径就能发现是哪一个znode被写;对于NodeChildrenChanged可根据getChildren来获取新的子节点列表。
注意:在收到收到触发事件到执行读操做之间,znode的状态可能会发生状态,这点须要牢记。
至此,编写简单的zookeeper应该是能够的了,下篇博文我们来深刻探讨zookeeper的相关知识