ZooKeeper系列之(十三):Watcher跟随者工做模式

客户端如何用Watcher在以前已经说过了,这里主要说说服务端的实现机制。node

一、服务端this

ZooKeeperServer接收到getData请求后,首先processPacket方法会被调用,咱们先看看该方法中和Watcher机制相关的代码逻辑。spa

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer)  {
   submitRequest(si);
}
调用了submitRequest方法处理getData这个Request。再往下:
public void submitRequest(Request si)  {
   firstProcessor.processRequest(si);
}

Request进入到服务端的调用链,咱们假设此次是Leader做为服务端,则进入LeaderRequestProcessor并沿着调用链最终走到FinalRequestProcessor。这里咱们直接找到和Watcher相关的代码逻辑。线程

最后咱们发如今FinalRequestProcessor中和Watcher相关的代码:code

public void processRequest(Request request) {
   case OpCode.getData: {
        lastOp = "GETD";
        GetDataRequest getDataRequest = new GetDataRequest();
        ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);
        DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
        if (n == null) {
             throw new KeeperException.NoNodeException();
        }
        Stat stat = new Stat();
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
        rsp = new GetDataResponse(b, stat);
        break;
   }
}

调用到ZKDatabase的getData来处理客户端请求,在参数中也指明了是否设置了Watcher。若是客户端设置了Watcher则把cnxn做为Watcher传给ZKDatabase。Cnzx的定义为:server

ServerCnxn cnxn = request.cnxn;接口

咱们等一会再来看看这个ServerCnxn是怎么做为Watcher机制的,先看看ZKDatabase的getData方法:队列

public byte[] getData(String path, Stat stat, Watcher watcher){
      return dataTree.getData(path, stat, watcher);
}

走到了DataTree的getData方法:事件

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
     DataNode n = nodes.get(path);
     if (n == null) {
          throw new KeeperException.NoNodeException();
     }
     synchronized (n) {
          n.copyStat(stat);
          if (watcher != null) {
              dataWatches.addWatch(path, watcher);
          }
          return n.data;
     }
}

在这里咱们看到将Watcher添加到了dataWatches列表中,该列表定义以下:get

private final WatchManager dataWatches = new WatchManager();

二、WatchManager

WatchManager经过addWatch方法添加新的Watcher接口都监视列表中,代码:

synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

将Watcher记录到本地列表中,Watcher和path关联起来了,那么这些Watcher在何时被触发呢?

Watcher是经过triggerWatch方法被触发的。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
   WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
   HashSet<Watcher> watchers;
   for (Watcher w : watchers) {
       if (supress != null && supress.contains(w)) {
           continue;
       }
       w.process(e);
   }
   return watchers;
}

具体在这里就是回调NettyServerCnxn的process方法,这里再提醒一下,NettyServerCnxn是实现了Watcher接口的。

Public Interface Watcher{
   public abstract void process(WatchedEvent event);
}

那么triggerWatch在哪里被触发呢?在getData这个例子里,triggerWatch在setData中被触发。setData方法是设置数据的,设置数据时触发getData的Watcher。

public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
     Stat s = new Stat();
     DataNode n = nodes.get(path);
     if (n == null) {
         throw new KeeperException.NoNodeException();
     }
     byte lastdata[] = null;
     synchronized (n) {
         lastdata = n.data;
         n.data = data;
         n.stat.setMtime(time);
         n.stat.setMzxid(zxid);
         n.stat.setVersion(version);
         n.copyStat(s);
      }
      String lastPrefix = getMaxPrefixWithQuota(path);
      if(lastPrefix != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
      }
      dataWatches.triggerWatch(path, EventType.NodeDataChanged);
      return s;
 }

 

三、ClientCnxn

客户端收到WatchedEvent事件,在ClientCnxn的SendThread中,咱们又回到readResponse方法。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
if (replyHdr.getXid() == -1) {
       // -1 means notification               
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        if (chrootPath != null) {
           String serverPath = event.getPath();
           if(serverPath.compareTo(chrootPath)==0)
               event.setPath("/");
           else if (serverPath.length() > chrootPath.length())
               event.setPath(serverPath.substring(chrootPath.length()));
           else {
            	  LOG.warn("Got server path " + event.getPath());
           }
       }
      WatchedEvent we = new WatchedEvent(event);
       eventThread.queueEvent( we );
       return
   }
}

将接收到的WatchedEvent事件放入eventThread线程的等待队列中,等待处理。

EventThread的主方法:

private void processEvent(Object event) {
if (event instanceof WatcherSetEventPair) {
         // each watcher will process the event
        WatcherSetEventPair pair = (WatcherSetEventPair) event;
        for (Watcher watcher : pair.watchers) {
             try {
                  watcher.process(pair.event);
             } catch (Throwable t) {
                  LOG.error("Error while calling watcher ", t);
             }
         }
    }
}

至此客户端在调用getData方法时绑定的Watcher被调用,process方法被执行。

相关文章
相关标签/搜索