客户端如何用Watcher在以前已经说过了,这里主要说说服务端的实现机制。node
一、服务端this
ZooKeeperServer接收到getData请求后,首先processPacket方法会被调用,咱们先看看该方法中和Watcher机制相关的代码逻辑。spa
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) { submitRequest(si); } 调用了submitRequest方法处理getData这个Request。再往下: public void submitRequest(Request si) { firstProcessor.processRequest(si); }
Request进入到服务端的调用链,咱们假设此次是Leader做为服务端,则进入LeaderRequestProcessor并沿着调用链最终走到FinalRequestProcessor。这里咱们直接找到和Watcher相关的代码逻辑。线程
最后咱们发如今FinalRequestProcessor中和Watcher相关的代码:code
public void processRequest(Request request) { case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } }
调用到ZKDatabase的getData来处理客户端请求,在参数中也指明了是否设置了Watcher。若是客户端设置了Watcher则把cnxn做为Watcher传给ZKDatabase。Cnzx的定义为:server
ServerCnxn cnxn = request.cnxn;接口
咱们等一会再来看看这个ServerCnxn是怎么做为Watcher机制的,先看看ZKDatabase的getData方法:队列
public byte[] getData(String path, Stat stat, Watcher watcher){ return dataTree.getData(path, stat, watcher); }
走到了DataTree的getData方法:事件
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
在这里咱们看到将Watcher添加到了dataWatches列表中,该列表定义以下:get
private final WatchManager dataWatches = new WatchManager();
二、WatchManager
WatchManager经过addWatch方法添加新的Watcher接口都监视列表中,代码:
synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
将Watcher记录到本地列表中,Watcher和path关联起来了,那么这些Watcher在何时被触发呢?
Watcher是经过triggerWatch方法被触发的。
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
具体在这里就是回调NettyServerCnxn的process方法,这里再提醒一下,NettyServerCnxn是实现了Watcher接口的。
Public Interface Watcher{ public abstract void process(WatchedEvent event); }
那么triggerWatch在哪里被触发呢?在getData这个例子里,triggerWatch在setData中被触发。setData方法是设置数据的,设置数据时触发getData的Watcher。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
三、ClientCnxn
客户端收到WatchedEvent事件,在ClientCnxn的SendThread中,咱们又回到readResponse方法。
void readResponse(ByteBuffer incomingBuffer) throws IOException { if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath()); } } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return } }
将接收到的WatchedEvent事件放入eventThread线程的等待队列中,等待处理。
EventThread的主方法:
private void processEvent(Object event) { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } }
至此客户端在调用getData方法时绑定的Watcher被调用,process方法被执行。