#0 系列目录#node
Zookeeper系列服务器
Zookeeper源码.net
Zookeeper应用
ZooKeeper提供了Java和C的binding. 本文关注Java相关的API.
#1 准备工做# 拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下.
#2 建立链接和回调接口# 首先须要建立ZooKeeper对象, 后续的一切操做都是基于该对象进行的.
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
如下为各个参数的详细说明:
connectString
:zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其创建链接
. 若是链接创建失败, 则会从列表的剩余项中选择一个server, 并再次尝试创建链接
.
sessionTimeout
:指定链接的超时时间.
watcher
:事件回调接口.
注意, 建立ZooKeeper对象时, 只要对象完成初始化便马上返回. 创建链接是以异步的形式进行的
, 当链接成功创建后, 会回调watcher的process方法. 若是想要同步创建与server的链接
, 须要本身进一步封装.
public class ZKConnection { /** * server列表, 以逗号分割 */ protected String hosts = "localhost:4180,localhost:4181,localhost:4182"; /** * 链接的超时时间, 毫秒 */ private static final int SESSION_TIMEOUT = 5000; private CountDownLatch connectedSignal = new CountDownLatch(1); protected ZooKeeper zk; /** * 链接zookeeper server */ public void connect() throws Exception { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher()); // 等待链接完成 connectedSignal.await(); } public class ConnWatcher implements Watcher { public void process(WatchedEvent event) { // 链接创建, 回调process接口时, 其event.getState()为KeeperState.SyncConnected if (event.getState() == KeeperState.SyncConnected) { // 放开闸门, wait在connect方法上的线程将被唤醒 connectedSignal.countDown(); } } } }
#3 建立znode# ZooKeeper对象的create方法用于建立znode.
String create(String path, byte[] data, List acl, CreateMode createMode);
如下为各个参数的详细说明:
path
:znode的路径.data
:与znode关联的数据.acl
:指定权限信息, 若是不想指定权限, 能够传入Ids.OPEN_ACL_UNSAFE.createMode
:指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入便可;/** * 建立临时节点 */ public void create(String nodePath, byte[] data) throws Exception { zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); }
#4 获取子node列表# ZooKeeper对象的getChildren方法用于获取子node列表.
List getChildren(String path, boolean watch);
watch参数用于指定是否监听path node的子node的增长和删除事件
, 以及path node自己的删除事件
.
#5 判断znode是否存在# ZooKeeper对象的exists方法用于判断指定znode是否存在.
Stat exists(String path, boolean watch);
watch参数用于指定是否监听path node的建立, 删除事件, 以及数据更新事件
. 若是该node存在, 则返回该node的状态信息, 不然返回null.
#6 获取node中关联的数据# ZooKeeper对象的getData方法用于获取node关联的数据.
byte[] getData(String path, boolean watch, Stat stat);
是否监听path node的删除事件, 以及数据更新事件
, 注意, 不监听path node的建立事件
, 由于若是path node不存在, 该方法将抛出KeeperException.NoNodeException异常.getData方法会将path node的状态信息设置到该参数中
.#7 更新node中关联的数据# ZooKeeper对象的setData方法用于更新node关联的数据.
Stat setData(final String path, byte data[], int version);
若是version和真实的版本不一样, 更新操做将失败. 指定version为-1则忽略版本检查
.#8 删除znode# ZooKeeper对象的delete方法用于删除znode.
void delete(final String path, int version);
#9 其他接口# 请查看ZooKeeper对象的API文档.
#10 代码实例#
public class JavaApiSample implements Watcher { private static final int SESSION_TIMEOUT = 10000; // private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181"; private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; private static final String ZK_PATH = "/nileader"; private ZooKeeper zk = null; private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立ZK链接 * * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection(String connectString, int sessionTimeout) { this.releaseConnection(); try { zk = new ZooKeeper(connectString, sessionTimeout, this); connectedSemaphore.await(); } catch (InterruptedException e) { System.out.println("链接建立失败,发生 InterruptedException"); e.printStackTrace(); } catch (IOException e) { System.out.println("链接建立失败,发生 IOException"); e.printStackTrace(); } } /** * 关闭ZK链接 */ public void releaseConnection() { if (null != this.zk) { try { this.zk.close(); } catch (InterruptedException e) { // ignore e.printStackTrace(); } } } /** * 建立节点 * * @param path 节点path * @param data 初始数据内容 * @return */ public boolean createPath(String path, String data) { try { System.out.println("节点建立成功, Path: " + this.zk.create(path, // data.getBytes(), // ZooDefs.Ids.OPEN_ACL_UNSAFE, // CreateMode.EPHEMERAL) + ", content: " + data); } catch (KeeperException e) { System.out.println("节点建立失败,发生KeeperException"); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("节点建立失败,发生 InterruptedException"); e.printStackTrace(); } return true; } /** * 读取指定节点数据内容 * * @param path 节点path * @return */ public String readData(String path) { try { System.out.println("获取数据成功,path:" + path); return new String(this.zk.getData(path, false, null)); } catch (KeeperException e) { System.out.println("读取数据失败,发生KeeperException,path: " + path); e.printStackTrace(); return ""; } catch (InterruptedException e) { System.out.println("读取数据失败,发生 InterruptedException,path: " + path); e.printStackTrace(); return ""; } } /** * 更新指定节点数据内容 * * @param path 节点path * @param data 数据内容 * @return */ public boolean writeData(String path, String data) { try { System.out.println("更新数据成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (KeeperException e) { System.out.println("更新数据失败,发生KeeperException,path: " + path); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("更新数据失败,发生 InterruptedException,path: " + path); e.printStackTrace(); } return false; } /** * 删除指定节点 * * @param path 节点path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println("删除节点成功,path:" + path); } catch (KeeperException e) { System.out.println("删除节点失败,发生KeeperException,path: " + path); e.printStackTrace(); } catch (InterruptedException e) { System.out.println("删除节点失败,发生 InterruptedException,path: " + path); e.printStackTrace(); } } public static void main(String[] args) { JavaApiSample sample = new JavaApiSample(); sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); if (sample.createPath(ZK_PATH, "我是节点初始内容")) { System.out.println(); System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n"); sample.writeData(ZK_PATH, "更新后的数据"); System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n"); sample.deleteNode(ZK_PATH); } sample.releaseConnection(); } /** * 收到来自Server的Watcher通知后的处理。 */ @Override public void process(WatchedEvent event) { System.out.println("收到事件通知:" + event.getState() + "\n"); if (Event.KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } }
#11 须要注意的几个地方#
不能超过1M
. zookeeper的使命是分布式协做, 而不是数据存储
.是否监听相应的事件
. 而create, delete, setData方法则会触发相应的事件的发生
.大多存在其异步的重载方法
, 具体请查看API说明.