笔者为何要讲ZooKeeper的源码,对于程序员来讲,光知道用是成为不了优秀的行家的,还要知道因此然,只有知道了内部实现机制,才能开拓眼界提升自我。而笔者认为ZooKeeper是最好的入门分布式系统的敲门砖。node
好很少说,咱们这里先看看客户端是怎么运转的。程序员
一、概述算法
ZooKeeper客户端是链接到服务端集群,获取zk节点数据,监听zk节点数据变化的。zk节点就是znode,它是相似文件路径的东西,每一个znode能够设置它的文本内容,当znode的文本内容被其余客户端修改后,全部监听该znode的客户端都会实时被通知到,这样的方式实现了分布式一致性存储。缓存
在客户端里有一个ZooKeeper类,注意这里特指类名称。客户端经过Zookeeper类来发送命令给Zookeeper服务器。服务器
ZooKeeper类中还能够设置Watcher,这就是znode监听者。Watcher能够指定监听哪一个znode,当Zookeeper集群的znode节点状态发生变化时,服务端会发送通知消息给客户端的Watcher。网络
Watcher又能够细分为3种Watcher子类:DataWatcher,ExistWatcher和ChildWatcher。根据字面意思就能猜得出来,DataWatcher是znode的数据变化时触发,ExistWatcher是znode的建立删除时触发,ChildWatcher是在znode下建立子目录(也是znode)时触发。实际生产环境中用的最多的仍是DataWatcher。session
下面咱们先分析分析ZooKeeper类的实现,至于Watcher的实现后面会有专门介绍。异步
二、通讯机制分布式
客户端与服务端交互的数据流大体以下:ide
三、ZooKeeper类
客户端在构造函数阶段建立ClientCnxn 与服务端链接,后续命令都经过ClientCntx发送给服务端。ClientCnxn是客户端与服务端通讯的底层接口,它和ClientCnxnSocketNetty一块儿工做提供网络通讯服务。
服务端是ZooKeeperServer类,收到ClientCnxn的Request请求包后调用相应的处理逻辑,返回结果再经过ClientCnxc发送给客户端。
ClientCntx链接时能够同时指定多台服务器地址,根据必定的算法挑选某一个服务器地址进行链接,当某个服务器发生故障没法链接时,会自动链接其余的服务器。实现这一机制的是HostProdiver接口,实现用StaticHostProvider类。
ZooKeeper类的构造函数以下:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
这里的connectString是链接字符串,aHostProvider是管理服务端列表的。watcher是监听器。
为何有aHostProvider?客户端能够配置多个服务端地址,这样当某个服务端挂掉的时候,客户端会自动尝试链接其余的服务端,实现分布式可靠性。
建立了ZooKeeper对象后就能够调用具体的读写数据的方法了,下面列举常见方法的实现机制。
create方法根据输入参数构造出CreateRequest包,而后经过submitRequest方法传递给服务端,submitRequest方法将CreateRequest转换成Packet包并调用sendPacket方法将发送包放入队列,等待发送线程发送给服务端。
服务端响应完成后会将返回结果填充到CreateResponse实体中返回给客户端。
四、发送命令
咱们选取getData方法,来看看客户端的内部机制,其余命令的处理过程是相似的,不一样的只是命令类型不一样而已。
getData方法从服务端读取znode的数据,入参同时包括watcher,这样在znode数据被其余客户端修改后,会实时回调watcher来使得全部客户端同步本次变化。
先给出getData的代码:
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
这里干了几件事情呢?主要干了两件事。
(1)注册watcher,这个很好理解,至于watcher的细节会在其余文章里专栏叙说。
(2)构建完整的znode的路径名,从根目录开始。而后将znode的路径名和GetDataRequest类型打包放到ClientCnxn的发送队列里,等待排队发往服务端。
其余命令的处理过程是相似的,不一样的只是命令类型不一样而已,对应到代码里是不一样的Request对象。getData命令对应GetDataRequest类;Exists方法对应ExistsRequest类。他们的父类倒是同一个。ZooKeeper支持的Request类主要有如下这些:
对于create命令来讲,和GetData有一点不一样。不一样点在于如下两点:
(1)create命令是当即返回结果的,而getData等其余命令是异步返回结果的。getData入参里的DataCallback参数就是异步回调处理方法。
(2)create是调用ClientCnxn的submitRequest方法启动发送命令过程,而getData等其余方法是调用ClientCnxn的queuePacket方法将请求命令缓存在队列里,等待发送线程异步发送。
五、ClientCnxn
前面咱们看到ZooKeeper类的命令发送都是经过ClientCnxn类实现的。这里就谈谈ClientCnxn类干了哪些活。
Clientcnxn将客户端请求加入发送队列,等待sendThread发送。eventThread负责处理Server返回的WatchedEvent,以及回调注册的客户端事件接口处理函数。
5.1 queuePacket
这是ClientCnxn里最重要的一个方法,它将请求包放入发送队列outgoingQueue中,等待发送线程发送给服务端。
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration atchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
最后告诉SendThread数据已经放好了,至于什么时候发送就等SendThread本身来决定了。
5.2 submitRequest
提交客户端请求到服务端,这是当即返回的方法,若是请求包没处理完则一直等待下去。submitRequest方法主要用在create命令。
ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r;
5.3 sendPacket
sendPacket构建Packet,而后调用发送线程SendThread里的同名sendPacket方法来发送数据到服务端。
public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException { int xid = getXid(); RequestHeader h = new RequestHeader(); h.setXid(xid); h.setType(opCode); ReplyHeader r = new ReplyHeader(); r.setXid(xid); Packet p = new Packet(h, r, request, response, null, false); p.cb = cb; sendThread.sendPacket(p); }
5.4 finishPacket
该方法在接收到服务端的响应时,唤醒等待响应的客户端线程,经过调用Packet的notifyAll方法来唤醒wait在该Packet上的线程。
若是客户端请求指定了Watcher,则同时生成WatchedEvent事件并放入事件队列,等待eventThread线程处理。
代码片断:
private void finishPacket(Packet p) { if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
5.5 readResponse
ClientCnxnSocketNetty收到服务端响应后触发ZKClientHandler的messageReceived事件,在该事件处理逻辑中调用sendThread的readResponse方法获取服务端响应。
若是服务端响应的是WatchedEvent事件,则将事件放入eventThread中等候调度执行事件方法。
若是服务端响应的是客户端命令结果,则将Packet从发送队列删除,最后调用CientCnxn的finishPacket方法完成最后的处理,finishPacket方法在前面已经讲过了。
readResponse的主要代码以下:
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } Packet packet; try { packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } } finally { finishPacket(packet); } }