初学 Zookeeper 会发现客户端有两种回调方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是离不开这两种方式的,搞清楚它们之间的区别与实现显得尤其重要。本文将围绕下面几个方面展开服务器
咱们先经过一个例子来感觉一下:session
zooKeeper.getData(root, new Watcher() { public void process(WatchedEvent event) { } }, new AsyncCallback.DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { } }, null);异步
1函数 2spa 3线程 4code 5接口 6队列 7事件 8 9 |
zooKeeper.getData(root, new Watcher() { public void process(WatchedEvent event) {
} }, new AsyncCallback.DataCallback() { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
} }, null); |
能够看到,getData
方法能够同时设置两个回调:Watcher 和 AsyncCallback,一样是回调,它们的区别是什么呢?要解决这个问题,咱们就得从这两个接口的功能入手。
Watcher
:Watcher
是用于监听节点,session 状态的,好比getData
对数据节点a
设置了watcher
,那么当a
的数据内容发生改变时,客户端会收到NodeDataChanged
通知,而后进行watcher
的回调。AsyncCallback
:AsyncCallback
是在以异步方式使用 ZooKeeper API 时,用于处理返回结果的。例如:getData
同步调用的版本是:byte[] getData(String path, boolean watch,Stat stat)
,异步调用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)
,能够看到,前者是直接返回获取的结果,后者是经过AsyncCallback
回调处理结果的。Watcher 主要是经过ClientWatchManager
进行管理的。
ClientWatchManager
中有四种Watcher
defaultWatcher
:建立Zookeeper
链接时传入的Watcher
,用于监听 session 状态dataWatches
:存放getData
传入的Watcher
existWatches
:存放exists
传入的Watcher
,若是节点已存在,则Watcher
会被添加到dataWatches
childWatches
:存放getChildren
传入的Watcher
从代码上能够发现,监听器是存在HashMap
中的,key
是节点名称path
,value是
Set<Watcher>
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher;
1 2 3 4 5 6 7 8 |
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher; |
在Watcher
接口中,已经定义了全部的状态类型和事件类型
号dataVersion
。所以,即便使用相同的数据内容来更新,仍是会收到这个事件通知的。不管如何,调用了更新接口,就必定会更新dataVersion
的。ClientWatchManager
只有一个方法,那就是materialize
,它根据事件类型type
和path
返回监听该节点的特定类型的Watcher
。
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
1 2 |
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path); |
核心逻辑以下:
type == None
:返回全部Watcher
,也就是说全部的Watcher
都会被触发。若是disableAutoWatchReset == true
且当前state != SyncConnected
,那么还会清空Watcher
,意味着移除全部在节点上的Watcher
。type == NodeDataChanged | NodeCreated
:返回监听path
节点的dataWatches & existWatches
type == NodeChildrenChanged
:返回监听path
节点的childWatches
type == NodeDeleted
:返回监听path
节点的dataWatches | childWatches
每次返回都会从HashMap
中移除节点对应的Watcher
,例如:addTo(dataWatches.remove(clientPath), result);
,这就是为何Watcher
是一次性的缘由(defaultWatcher
除外)。值得注意的是,因为使用的是HashSet
存储Watcher
,重复添加同一个实例的Watcher
也只会被触发一次。
Zookeeper 的exists
,getData
,getChildren
方法都有异步的版本,它们与同步方法的区别仅仅在因而否等待响应,底层发送都是经过sendThread
异步发送的。下面咱们用一幅图来讲明:
上面的图展现了同步/异步调用getData
的流程,其余方法也是相似的。
Zookeeper 客户端会启动两个常驻线程
SendThread
:负责 IO 操做,包括发送,接受响应,发送 ping 等。EventThread
:负责处理事件,执行回调函数。readResponse
是SendThread
处理响应的核心函数,核心逻辑以下:
ReplyHeader
: 有一个单独的线程SendThread
,负责接收服务器端的响应。假设接受到的服务器传递过来的字节流是incomingBuffer
,那么就将这个incomingBuffer
反序列化为ReplyHeader
。ReplyHeader
是Watcher
响应仍是AsyncCallback
响应:ReplyHeader.getXid()
存储了响应类型。
Watcher
类型响应:从ReplyHeader
中建立WatchedEvent
,WatchedEvent
里面存储了节点的路径,而后去WatcherManager
中找到和这个节点相关联的全部Watcher
,将他们写入到EventThread
的waitingEvents
中。AsyncCallback
类型响应:从ReplyHeader
中读取response
,这个response
描述了是Exists,setData,getData,getChildren,create.....
中的哪个异步回调。从pendingQueue
中拿到Packet
,Packet
中的cb
存储了AsyncCallback
,也就是异步 API 的结果回调。最后将Packet
写入到EventThread
的waitingEvents
中。processEvent
是EventThread
处理事件核心函数,核心逻辑以下:
event instanceof WatcherSetEventPair
,取出pair
中的Watchers
,逐个调用watcher.process(pair.event)
event
为AsyncCallback
,根据p.response
判断为哪一种响应类型,执行响应的回调processResult
。可见,Watcher
和AsyncCallback
都是由EventThread
处理的,经过processEvent
进行区分处理。
Zookeeper 客户端中Watcher
和AsyncCallback
都是异步回调的方式,但它们回调的时机是不同的,前者是由服务器发送事件触发客户端回调,后者是在执行了请求后获得响应后客户端主动触发的。它们的共同点在于都须要在获取了服务器响应以后,由SendThread
写入EventThread
的waitingEvents
中,而后由EventThread
逐个从事件队列中获取并处理。