舒适提示
:在这里我再次提个小要求,但愿你们能习惯看
官方文档,文档虽然是英文但用词都比较简单,基本都能看懂文档表达的意思。
授之以鱼不如授之以渔的道理相信你们都明白,也但愿经过猿人谷的这个ZooKeeper系列,让你们入门、到熟悉,触类旁通后能精通ZooKeeper。
在前一篇咱们介绍了ZooKeeper单机版、伪集群和集群环境搭建,经过命令行的方式作了节点的建立、删除、更新、获取节点信息的测试。Zookeeper 的目的是为客户端构建复杂的协调功能提供简单、高效的核心 API,这一篇咱们用Java经过ZooKeeper提供的API接口来实现这些增删改查的功能。html
org.apache.zookeeper.Zookeeper
是ZooKeeper客户端的主类,在官方文档中已明确说明(This is the main class of ZooKeeper client library.)。java
This is the main class of ZooKeeper client library. To use a ZooKeeper service, an application must first instantiate an object of ZooKeeper class. All the iterations will be done by calling the methods of ZooKeeper class. The methods of this class are thread-safe unless otherwise noted.
Once a connection to a server is established, a session ID is assigned to the client. The client will send heart beats to the server periodically to keep the session valid.
建立一个ZooKeeper的实例来使用org.apache.zookeeper.Zookeeper
里的方法,官方文档已经指出没有特别声明的话,ZooKeeper类里的方法是线程安全
的。客户端链接到ZooKeeper服务的时候,会给客户端分配一个会话ID(session ID),客户端与服务端会经过心跳来保持会话有效。node
org.apache.zookeeper.Zookeeper
里的方法很是多,就不一一列举了,只列几个增删改查的。git
Method | Description |
---|---|
create(String path, byte[] data, List<ACL> acl, CreateMode createMode) | Create a node with the given path. (建立指定路径的节点) |
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.Create2Callback cb, Object ctx) | The asynchronous version of create.(异步形式建立) |
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Stat stat) | Create a node with the given path and returns the Stat of that node.(按指定路径建立节点并返回节点状态信息) |
delete(String path, int version) | Delete the node with the given path.(删除指定路径的节点) |
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) | The asynchronous version of delete.(异步删除指定路径的节点) |
exists(String path, boolean watch) | Return the stat of the node of the given path.(返回指定路径的节点状态信息) |
getChildren(String path, boolean watch) | Return the list of the children of the node of the given path.(返回指定路径的全部子节点状态信息) |
getData(String path, boolean watch, Stat stat) | Return the data and the stat of the node of the given path.(返回指定路径的节点数据和状态信息) |
setData(String path, byte[] data, int version) | Set the data for the node of the given path if such a node exists and the given version matches the version of the node (if the given version is -1, it matches any node's versions).(给指定路径和版本的节点设置新值,如版本为-1,即给全部版本设置值) |
这里新建一个Spring Boot的项目来进行测试,新建Spring Boot项目的过程很简单,也不是这里的重点,就不作介绍了。shell
项目里会须要额外引入两个包来进行测试:apache
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.5</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.5.2</version> </dependency>
完整测试代码以下:segmentfault
/** * 简单测试示例 * @author 猿人谷 * @date 2019/12/16 */ public class ZooKeeperDemo { private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperDemo.class); private static final int SESSION_TIME_OUT = 10000; // ZooKeeper服务的地址,如为集群,多个地址用逗号分隔 private static final String CONNECT_STRING = "127.0.0.1:2181"; private static final String ZNODE_PATH = "/zk_demo"; private static final String ZNODE_PATH_PARENT = "/app1"; private static final String ZNODE_PATH_CHILDREN = "/app1/app1_1"; private ZooKeeper zk = null; @Before public void init() throws IOException { zk = new ZooKeeper(CONNECT_STRING, SESSION_TIME_OUT, new Watcher(){ @Override public void process(WatchedEvent event) { System.out.println("已经触发了" + event.getType() + "事件!"); } }); } @Test public void testCreate() throws KeeperException, InterruptedException { zk.create(ZNODE_PATH, "anna2019".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void testCreateParentZnode() throws KeeperException, InterruptedException { zk.create(ZNODE_PATH_PARENT, "anna2019".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void testCreateChildrenZnode() throws KeeperException, InterruptedException { zk.create(ZNODE_PATH_CHILDREN, "anna2020".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void testGet() throws KeeperException, InterruptedException { byte[] data1 = zk.getData(ZNODE_PATH, false, null); byte[] data2 = zk.getData(ZNODE_PATH_PARENT, false, null); byte[] data3 = zk.getData(ZNODE_PATH_CHILDREN, false, null); LOGGER.info("{}的信息:{}", ZNODE_PATH, new String(data1) ); LOGGER.info("{}的信息:{}", ZNODE_PATH_PARENT, new String(data2) ); LOGGER.info("{}的信息:{}", ZNODE_PATH_CHILDREN, new String(data3) ); } /** * 删除 * @throws KeeperException * @throws InterruptedException */ @Test public void testDelete() throws KeeperException, InterruptedException { // 指定要删除的版本,-1表示删除全部版本 zk.delete(ZNODE_PATH, -1); } /** * 删除含有子节点 * @throws KeeperException * @throws InterruptedException */ @Test public void testDeleteHasChildrenZnode() throws KeeperException, InterruptedException { // 指定要删除的版本,-1表示删除全部版本 zk.delete(ZNODE_PATH_PARENT, -1); } @Test public void testSet() throws KeeperException, InterruptedException { Stat stat = zk.setData(ZNODE_PATH, "yuanrengu".getBytes(), -1); LOGGER.info(stat.toString()); } }
上面有用到@Before
,简单说明下:api
若是将SESSION_TIME_OUT设置的时间过短,会报API客户端异常:org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /zk_demo
。完整的报错信息以下:安全
09:33:52.139 [main-SendThread(106.12.111.172:2181)] DEBUG org.apache.zookeeper.ClientCnxnSocketNIO - Ignoring exception during shutdown input java.net.SocketException: Socket is not connected at sun.nio.ch.Net.translateToSocketException(Net.java:123) at sun.nio.ch.Net.translateException(Net.java:157) at sun.nio.ch.Net.translateException(Net.java:163) at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:401) at org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:198) at org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1338) at org.apache.zookeeper.ClientCnxn$SendThread.cleanAndNotifyState(ClientCnxn.java:1276) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1254) Caused by: java.nio.channels.NotYetConnectedException: null at sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:782) at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:399) ... 4 common frames omitted 09:33:52.140 [main-SendThread(106.12.111.172:2181)] DEBUG org.apache.zookeeper.ClientCnxnSocketNIO - Ignoring exception during shutdown output java.net.SocketException: Socket is not connected at sun.nio.ch.Net.translateToSocketException(Net.java:123) at sun.nio.ch.Net.translateException(Net.java:157) at sun.nio.ch.Net.translateException(Net.java:163) at sun.nio.ch.SocketAdaptor.shutdownOutput(SocketAdaptor.java:409) at org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:205) at org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1338) at org.apache.zookeeper.ClientCnxn$SendThread.cleanAndNotifyState(ClientCnxn.java:1276) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1254) Caused by: java.nio.channels.NotYetConnectedException: null at sun.nio.ch.SocketChannelImpl.shutdownOutput(SocketChannelImpl.java:799) at sun.nio.ch.SocketAdaptor.shutdownOutput(SocketAdaptor.java:407) ... 4 common frames omitted org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /zk_demo at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:2131) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:2160) at com.yuanrengu.demo.ZooKeeperDemo.testGet(ZooKeeperDemo.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Disconnected from the target VM, address: '127.0.0.1:60454', transport: 'socket' Process finished with exit code -1
起初觉得是ZooKeeper服务部署有问题或服务没启动,经检查确认无误后,debug调试发现,是SESSION_TIME_OUT = 2000;设置的值过小,改成10000后,再也不报错。服务器
SESSION_TIME_OUT 是
会话超时时间
,也就是当一个zookeeper超过该时间没有心跳,则认为该节点故障。因此,若是此值小于zookeeper的建立时间,则当zookeeper还将来得及建立链接,会话时间已到,所以抛出异常认为该节点故障。
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException Create a node with the given path. The node data will be the given data, and node acl will be the given acl. The flags argument specifies whether the created node will be ephemeral or not. An ephemeral node will be removed by the ZooKeeper automatically when the session associated with the creation of the node expires. The flags argument can also specify to create a sequential node. The actual path name of a sequential node will be the given path plus a suffix "i" where i is the current sequential number of the node. The sequence number is always fixed length of 10 digits, 0 padded. Once such a node is created, the sequential number will be incremented by one. If a node with the same actual path already exists in the ZooKeeper, a KeeperException with error code KeeperException.NodeExists will be thrown. Note that since a different actual path is used for each invocation of creating sequential node with the same path argument, the call will never throw "file exists" KeeperException. If the parent node does not exist in the ZooKeeper, a KeeperException with error code KeeperException.NoNode will be thrown. An ephemeral node cannot have children. If the parent node of the given path is ephemeral, a KeeperException with error code KeeperException.NoChildrenForEphemerals will be thrown. This operation, if successful, will trigger all the watches left on the node of the given path by exists and getData API calls, and the watches left on the parent node by getChildren API calls. If a node is created successfully, the ZooKeeper server will trigger the watches on the path left by exists calls, and the watches on the parent of the node by getChildren calls. The maximum allowable size of the data array is 1 MB (1,048,576 bytes). Arrays larger than this will cause a KeeperExecption to be thrown. Parameters: path - the path for the node data - the initial data for the node acl - the acl for the node createMode - specifying whether the node to be created is ephemeral and/or sequential Returns: the actual path of the created node Throws: KeeperException - if the server returns a non-zero error code KeeperException.InvalidACLException - if the ACL is invalid, null, or empty InterruptedException - if the transaction is interrupted IllegalArgumentException - if an invalid path is specified
Talk is cheap. Show me the code.这里咱们不瞎BB,直接上官方文档。官方文档是否是很容易看懂,并且解释的很是清楚(并且稍显啰嗦的感受)?
这里简单列下文档中的几个关键点:
这里要说下CreateMode
,你们可能都说ZooKeeper只有4种形式的节点(持久、临时、持久顺序、临时顺序),看文档的话,实际上是有7种
形式的。
public enum CreateMode { PERSISTENT(0, false, false, false, false), PERSISTENT_SEQUENTIAL(2, false, true, false, false), EPHEMERAL(1, true, false, false, false), EPHEMERAL_SEQUENTIAL(3, true, true, false, false), CONTAINER(4, false, false, true, false), PERSISTENT_WITH_TTL(5, false, false, false, true), PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true); }
CONTAINER
:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在未来某个时候删除的候选节点。PERSISTENT_WITH_TTL
:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间以内没有获得更新而且没有子节点,就会被自动删除。PERSISTENT_SEQUENTIAL_WITH_TTL
:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间以内没有获得更新而且没有子节点,就会被自动删除。临时节点不能有子节点
。若是给临时节点建立子节点会抛KeeperException异常。byte[] data容许的最大数据量为1MB(1,048,576 bytes)
。若是超过,会抛KeeperExecption。运行建立节点的代码:
@Test public void testCreate() throws KeeperException, InterruptedException { zk.create(ZNODE_PATH, "anna2019".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
能够经过日志信息获得节点建立成功:
DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x101402626bb000b, packet:: clientPath:null serverPath:null finished:false header:: 1,1 replyHeader:: 1,12884901937,0 request:: '/zk_demo,#616e6e6132303139,v{s{31,s{'world,'anyone}}},0 response:: '/zk_demo
在服务端查看,/zk_demo节点建立成功:
[zk: 127.0.0.1:2181(CONNECTED) 21] ls / [zookeeper, zk_demo] [zk: 127.0.0.1:2181(CONNECTED) 22] stat /zk_demo cZxid = 0x300000031 ctime = Tue Dec 17 12:52:50 CST 2019 mZxid = 0x300000031 mtime = Tue Dec 17 12:52:50 CST 2019 pZxid = 0x300000031 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 0
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException Return the data and the stat of the node of the given path. If the watch is true and the call is successful (no exception is thrown), a watch will be left on the node with the given path. The watch will be triggered by a successful operation that sets data on the node, or deletes the node. A KeeperException with error code KeeperException.NoNode will be thrown if no node with the given path exists. Parameters: path - the given path watch - whether need to watch this node stat - the stat of the node Returns: the data of the node Throws: KeeperException - If the server signals an error with a non-zero error code InterruptedException - If the server transaction is interrupted.
指定路径的节点不存在时就抛KeeperException.NoNode异常。
运行:
@Test public void testGet() throws KeeperException, InterruptedException { byte[] data1 = zk.getData(ZNODE_PATH, false, null); byte[] data2 = zk.getData(ZNODE_PATH_PARENT, false, null); byte[] data3 = zk.getData(ZNODE_PATH_CHILDREN, false, null); LOGGER.info("{}的信息:{}", ZNODE_PATH, new String(data1) ); LOGGER.info("{}的信息:{}", ZNODE_PATH_PARENT, new String(data2) ); LOGGER.info("{}的信息:{}", ZNODE_PATH_CHILDREN, new String(data3) ); }
结果:
13:51:00.288 [main] INFO com.yuanrengu.demo.ZooKeeperDemo - /zk_demo的信息:anna2019 13:51:00.288 [main] INFO com.yuanrengu.demo.ZooKeeperDemo - /app1的信息:anna2019 13:51:00.289 [main] INFO com.yuanrengu.demo.ZooKeeperDemo - /app1/app1_1的信息:anna2020
public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException Set the data for the node of the given path if such a node exists and the given version matches the version of the node (if the given version is -1, it matches any node's versions). Return the stat of the node. This operation, if successful, will trigger all the watches on the node of the given path left by getData calls. A KeeperException with error code KeeperException.NoNode will be thrown if no node with the given path exists. A KeeperException with error code KeeperException.BadVersion will be thrown if the given version does not match the node's version. The maximum allowable size of the data array is 1 MB (1,048,576 bytes). Arrays larger than this will cause a KeeperException to be thrown. Parameters: path - the path of the node data - the data to set version - the expected matching version Returns: the state of the node Throws: InterruptedException - If the server transaction is interrupted. KeeperException - If the server signals an error with a non-zero error code. IllegalArgumentException - if an invalid path is specified
主要注意如下几点:
byte[] data容许的最大数据量为1MB(1,048,576 bytes)
。若是超过,会抛KeeperExecption。运行:
@Test public void testSet() throws KeeperException, InterruptedException { Stat stat = zk.setData(ZNODE_PATH, "yuanrengu".getBytes(), -1); byte[] data = zk.getData(ZNODE_PATH, false, null); LOGGER.info(new String(data)); }
能够看到数据已经更新:
15:46:16.472 [main] INFO com.yuanrengu.demo.ZooKeeperDemo - yuanrengu
public void delete(String path, int version) throws InterruptedException, KeeperException Delete the node with the given path. The call will succeed if such a node exists, and the given version matches the node's version (if the given version is -1, it matches any node's versions). A KeeperException with error code KeeperException.NoNode will be thrown if the nodes does not exist. A KeeperException with error code KeeperException.BadVersion will be thrown if the given version does not match the node's version. A KeeperException with error code KeeperException.NotEmpty will be thrown if the node has children. This operation, if successful, will trigger all the watches on the node of the given path left by exists API calls, and the watches on the parent node left by getChildren API calls. Parameters: path - the path of the node to be deleted. version - the expected node version. Throws: InterruptedException - IF the server transaction is interrupted KeeperException - If the server signals an error with a non-zero return code. IllegalArgumentException - if an invalid path is specified
节点可能含有子节点,删除节点的操做有几点须要特别注意:
若是节点含有子节点,删除父节点(parent node)时会抛KeeperException.NotEmpty异常。
/app1有子节点,咱们作下删除操做:
/** * 删除含有子节点的父节点 * @throws KeeperException * @throws InterruptedException */ @Test public void testDeleteHasChildrenZnode() throws KeeperException, InterruptedException { // 指定要删除的版本,-1表示删除全部版本 zk.delete(ZNODE_PATH_PARENT, -1); }
能够看到日志:
org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /app1 at org.apache.zookeeper.KeeperException.create(KeeperException.java:132) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:1793) at com.yuanrengu.demo.ZooKeeperDemo.testDeleteHasChildrenZnode(ZooKeeperDemo.java:89)
上面咱们实现了节点的增、删、改、查的测试,后面的篇章会有更多好玩的用法,如实现分布式锁、配置中心等。
基于上面的分析,总结几个注意的点:
7种形式
:CONTAINER
:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在未来某个时候删除的候选节点。PERSISTENT_WITH_TTL
:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间以内没有获得更新而且没有子节点,就会被自动删除。PERSISTENT_SEQUENTIAL_WITH_TTL
:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间以内没有获得更新而且没有子节点,就会被自动删除。byte[] data容许的最大数据量为1MB(1,048,576 bytes)
。