ZooKeeper版本:3.4.10。java
ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。服务器
ZooKeeper中能够设置Watcher,每一个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动做。网络
ZooKeeper有三种Watcher列表:session
(1)DataWatcher异步
(2)ExistWatcheride
(3)ChildWatcher.this
protected final ClientCnxn cnxn;// 成员变量cnxn,链接服务器,经过cnxn发送命令给服务端 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { ......// 打印log watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString);// 从传入的服务器地址字符串中解析出服务器地址 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());// 提供服务器地址,当服务器发生故障没法链接时,会自动链接其它的服务器 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);// 构建和服务器通讯的对象cnxn cnxn.start(); }
ClientCnxn是客户端和服务端通讯的底层接口,和ClientCnxnSocket一块儿工做提供网络通讯服务。spa
调用create在ZooKeeper中建立一个Node,返回值是成功建立的路径名称:线程
首先看看 create 方法:code
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create);// 设置操做代码为create CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data);// 使用输入参数构造CreateRequest请求 request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 将请求提交发送给服务器 if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath();// 从返回的CreateResponse中获取建立成功后的路径 } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
在 create 中经过 submitRequest 来提交请求:
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);// 将CreateRequest转换成Packet包 synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
queuePacket 将CreateRequest转换成Packet包:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration);// 将CreateRequest转换成Packet包 packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet);// 将发送包放入队列,等待发送线程发送给服务器 } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
删除节点操做,提供同步和异步两种接口方式:
public void delete(final String path, int version, VoidCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath);// 校验传入的路径是否合法 final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained serverPath = clientPath; } else { serverPath = prependChroot(clientPath); } RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.delete);// 设置操做代码为delete DeleteRequest request = new DeleteRequest(); request.setPath(serverPath);// 使用输入参数构造DeleteRequest请求 request.setVersion(version); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null);// 和create操做同样,调用queuePacket方法,将DeleteRequest转换成Packet包 }
exists:判断节点是否存在,异步方式。构造ExistsRequest请求对象,设置操做码ZooDefs.OpCode.exists;
getData:获取节点关联数据。构造GetDataRequest请求对象,设置操做码ZooDefs.OpCode.getData;
setData:设置节点关联数据。构造SetDataRequest请求对象,设置操做码ZooDefs.OpCode.setData;
getChildren:获取子节点路径列表。构造GetChildrenRequest请求对象,设置操做码ZooDefs.OpCode.getChildren。