poublic void process(WatchedEvent event);node
WatchedEvent 数据结构:缓存
KeeperState(会话状态): Disconnected; SyncConnected; AuthFailed; ConnectedReadOnly; SaslAuthenticated; Expired。 EventType(事件类型): NodeCreated; NodeDeleted; NodeDataChanged; NodeChildrenChanged None。 若是事件类型不是None时,返回一个znode路径。
设置监视点:安全
exists的异步调用的示例代码:服务器
zk.exists("/myZnode", myWatcher, existsCallback, null); Watcher myWatcher = new Watcher() { public void process(WatchedEvent e) { // Process the watch event } } StatCallback existsCallback = new StatCallback() { public void processResult(int rc, String path, Object ctx, Stat stat) { // Process the result of the exists call } };
任务列表,一个组件须要等待处理的变化状况:数据结构
一、管理权变化dom
StringCallback masterCreateCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case OK: isLeader = true; break; case NODEEXISTS: masterExists(); break; default: isLeader = false; break; } System.out.println("I'm " + (isLeader ? "" : "not ") + "the leader"); } }; void masterExists() { zk.exists("/master", masterExistsWatcher, masterExistsCallback, null); } Watcher masterExistsWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType() == EventType.NodeDeleted) { assert "/master".equals(event.getPath()); try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }; StatCallback masterExistsCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: masterExists(); break; case OK: if (stat == null) { //state = MasterStates.RUNNING; try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } break; default: checkMaster(); break; } } };
图4-1:主节点竞选中可能的交错操做异步
二、主节点等待从节点列表的变化
【新的从节点加入进来,或旧的从节点退役】ide
经过在ZooKeeper中的/workers下添加子节点来注册新的从节点。当一个从节点崩溃或从系统中被移除,如会话过时等状况,须要自动将对应的znode节点删除。优雅实现的从节点会显式地关闭其会话,而不须要ZooKeeper等待会话过时。
获取列表并监视变化的示例代码:工具
/** workersChangeWatcher为从节点列表的监视点对象 */ Watcher workersChangeWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert "/workers".equals(event.getPath()); getWorkers(); } } }; private void getWorkers() { zk.getChildren("/workers", workersChangeWatcher, workersGetChildrenCallback, null); } ChildrenCallback workersGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { /** 当CONNECTIONLOSS事件发生时,须要从新获取子节点并设置监视点的操做 */ case CONNECTIONLOSS: getWokerList(); break; case OK: LOG.info("Successfully got a list of workers :" + children.size() + " workers"); /** 从新分配崩溃从节点的任务,并从新设置新的从节点列表 */ reassignAndSet(children); break; default: LOG.error("getChildren failed", KeeperException.create(Code.get(rc), path)); } } private void getWokerList() { // TODO Auto-generated method stub } }; /** 用于保存上次得到的从节点列表的本地缓存 */ ChildrenCache workersCache; void reassignAndSet(List<String> children) { List<String> toProcess; if (workersCache == null) { /** 若是第一次使用本地缓存这个变量,那么初始化该变量 */ workersCache = new ChildrenCache(children); /** 第一次得到全部从节点时,不须要作什么其余事 */ toProcess = null; } else { LOG.info("Removing and setting"); /** 若是不是第一次,那么须要检查是否有从节点已经被移除了 */ toProcess = workersCache.removedAndSet(children); } if (toProcess != null) { for (String worker : toProcess) { /** 若是有从节点被移除了,须要从新分配任务 */ getAbsentWorkerTasks(worker); } } }
三、主节点等待新任务进行分配
assignTasks方法为任务分配的实现:oop
/** 在任务列表变化时,处理通知的监视点实现 */ Watcher tasksChangeWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert "/tasks".equals(event.getPath()); getTasks(); } } }; /** 得到任务列表 */ void getTasks() { zk.getChildren("/tasks", tasksChangeWatcher, tasksGetChildrenCallback, null); } ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { case CONNECTIONLOSS: /** 当收到子节点变化的通知后,得到子节点的列表 */ getTasks(); break; case OK: if (children != null) { /** 分配列表中的任务 */ assignTasks(children); break; default: LOG.error("getChildren failed.", KeeperException.create(Code.get(rc), path)); break; } } }; void assignTasks(List<String> tasks) { for (String task : tasks) { getTaskData(task); } } void getTaskData(String task) { /** 得到任务信息 */ zk.getData("/tasks/" + task, false, taskDataCallback, task); } DataCallback taskDataCallback = new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: getTaskData((String) ctx); break; case OK: /* * Choose worker at random */ int worker = rand.nextInt(workerList.size()); String designatedWorker = workerList.get(worker); /* * Assign task to randomly chosen worker. */ String assignmentPath = "/assign/" + designatedWorker + "/" + (String) ctx; /** 随机选择一个从节点,分配任务给这个从节 */ createAssignment(assignmentPath, data); break; default: LOG.error("Error when trying to get task data.", KeeperException.create(Code.get(rc), path)); break; } } }; void createAssignment(String path, byte[] data) { /** 建立分配节点,路径形式为/assign/worker-id/task-num */ zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, assignTaskCallback, data); } StringCallback assignTaskCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: createAssignment(path, (byte[]) ctx); break; case OK: LOG.info("Task assigned correctly:" + name); /** 删除/tasks下对应的任务节点 */ deleteTask(name.substring(name.lastIndexOf("/") + 1)); case NODEEXISTS: LOG.warn("Task already assigned"); break; default: LOG.error("Error when trying to assign task.", KeeperException.create(Code.get(rc), path)); break; } } private void deleteTask(String substring) { // TODO Auto-generated method stub } };
四、从节点等待分配新任务
StringCallback createWorkerCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { /** 重试,注意再次注册不会有问题,由于若是znode节点已经存在,会收到NODEEXISTS事件 */ case CONNECTIONLOSS: register(); break; case OK: LOG.info("Registered successfully:" + serverId); break; case NODEEXISTS: LOG.warn("Already registered:" + serverId); break; default: LOG.error("Something went wrong:" + KeeperException.create(Code.get(rc), path)); break; } } }; /** 经过建立一个znode节点来注册从节点 */ void register() { zk.create("/workers/worker-" + serverId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null); } 一旦有任务列表分配给从节点,从节点就会从/assign/worker-id获取任务信息并执行任务。从节点从本地列表中获取每一个任务的信息并验证任务是否还在待执行的队列中,从节点保存一个本地待执行任务的列表就是为了这个目的。 注意,为了释放回调方法的线程,咱们在单独的线程对从节点的已分配任务进行循环,不然,会阻塞其余的回调方法的执行。
示例中,使用了Java的ThreadPoolExecutor类分配一个线程,该线程进行任务的循环操做:
/** 当收到子节点变化的通知后,得到子节点的列表 */ Watcher newTaskWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { assert new String("/assign/worker-" + serverId).equals(event.getPath()); getTasks(); } } }; void getTasks() { zk.getChildren("/assign/worker-" + serverId, newTaskWatcher, tasksGetChildrenCallback, null); } ChildrenCallback tasksGetChildrenCallback = new ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children) { switch (Code.get(rc)) { case CONNECTIONLOSS: /** 当收到子节点变化的通知后,得到子节点的列表 */ getTasks(); break; case OK: if (children != null) { /** 分配列表中的任务 */ //assignTasks(children); /** 单独线程中执行 */ executor.execute(new Runnable() { List<String> children; DataCallback cb; private ArrayList<String> noGoingtasks; public Runnable init(List<String> children, DataCallback cb) { this.children = children; this.cb = cb; return this; } @Override public void run() { LOG.info("Looping into tasks"); synchronized (noGoingtasks) { /** 循环子节点列表 */ for (String task : children) { if(!noGoingtasks.contains(task)) { LOG.trace("New task:{}", task); /** 得到任务信息并执行任务 */ zk.getData("assign/worker-" + serverId, false, cb, task); /** 将正在执行的任务添加到执行中列表,防止屡次执行 */ noGoingtasks.add(task); } } } } }.init(children, taskDataCallback)); } break; default: LOG.error("getChildren failed.", KeeperException.create(Code.get(rc), path)); break; } } };
五、客户端等待任务的执行结果
void submitTask(String task, TaskObject taskCtx) { taskCtx.setTask(task); /** 与以前的ZooKeeper调用不一样,传递了一个上下文对象,该对象为实现的Task类的实例 */ zk.create("/tasks/task-", task.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, createTaskCallback, taskCtx); } StringCallback createTaskCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { /** 链接丢失时,再次提交任务,注意从新提交任务可能会致使任务重复。 */ case CONNECTIONLOSS: submitTask(((TaskObject) ctx).getTask(), (TaskObject) ctx); break; case OK: LOG.info("My created task name: + name"); ((TaskObject) ctx).setTaskName(name); /** 为这个任务的znode节点设置一个监视点 */ watchStatus("/status/" + name.replace("/tasks/", ""), ctx); break; default: LOG.error("Something went wrong" + KeeperException.create(Code.get(rc), path)); break; } } };
检查状态节点是否已经存在(也许任务很快处理完成),并设置监视点。
提供了一个收到znode节点建立的通知时进行处理的监视点的实现和一个exists方法的回调实现:
ConcurrentHashMap<String, Object> ctxMap = new ConcurrentHashMap<String, Object>(); private void watchStatus(String path, Object ctx) { ctxMap.put(path, ctx); /** 客户端经过该方法传递上下对象,当收到状态节点的通知时,就能够修改这个表示任务的对象(TaskObject) */ zk.exists(path, statusWatcher, existsCallback, ctx); } Watcher statusWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType() == EventType.NodeCreated) { assert event.getPath().contains("/status/task-"); zk.getData(event.getPath(), false, getDataCallback, ctxMap.get(event.getPath())); } } }; StatCallback existsCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: watchStatus(path, ctx); break; case OK: /** 状态节点已经存在,所以客户端获取这个节点信息 */ if(stat != null) { zk.getData(path, false, getDataCallback, null); } break; /** 若是状态节点不存在,这是常见状况,客户端不进行任何操做 */ case NONODE: break; default: LOG.error("Something went wrong when " + "checking if the status node exists:" + KeeperException.create(Code.get(rc), path)); break; } } };
Multiop能够原子性地执行多个ZooKeeper的操做,执行过程为原子性,即在multiop代码块中的全部操做要不所有成功,要不所有失败。
使用multiop特性:
/** 示例 */ /** ①为delete方法建立Op对象 */ Op deleteZnode(String z) { /** ②经过对应的Op方法返回对象。 */ return Op.delete(z, -1); } ... /** ③以列表方式传入每一个delete操做的元素执行multi方法 */ List<OpResult> results = zk.multi(Arrays.asList(deleteZnode("/a/b"), deleteZnode("/a"));
调用multi方法返回一个OpResult对象的列表,每一个对象对应每一个操做。例如,对于delete操做,咱们使用DeleteResult类,该类继承自OpResult,经过每种操做类型对应的结果对象暴露方法和数据。DeleteResult对象仅提供了equals和hashCode方法,而CreateResult对象暴露出操做的路径(path)和Stat对象。对于错误处理,ZooKeeper返回一个包含错误码的ErrorResult类的实例。
multi方法一样也有异步版本,如下为同步方法和异步方法的定义:
public List<OpResult> multi(Iterator<Op> ops) throws InterruptedException, KeeperException; public void multi(Iterator<Op> ops, MultiCallback cb, Object ctx);
【Transaction】封装了multi方法,提供了简单的接口。咱们能够建立Transaction对象的实例,添加操做,提交事务。
使用Transaction重写上一示例的代码以下:
Transaction t = new Transaction(); t.delete("/a/b", -1); t.delete("/a", -1); List<OpResult> results = t.commit();
【commit】方法一样也有一个异步版本的方法,该方法以MultiCallback对象和上下文对象为输入:
public void commit(MultiCallback cb, Object ctx);
multiop能够简化不止一处的主从模式的实现,当分配一个任务,在以前的例子中,主节点会建立任务分配节点,而后删除/tasks下对应的任务节点。若是在删除/tasks下的节点时,主节点崩溃,就会致使一个已分配的任务还在/tasks下。使用multiop,能够原子化建立任务分配节点和删除/tasks下对应的任务节点这两个操做。使用这个方式,能够保证没有已分配的任务还在/tasks节点下,若是备份节点接管了主节点角色,就不用再区分/tasks下的任务是否是没有分配的。multiop提供的另外一个功能是检查一个znode节点的版本,经过multiop能够同时读取的多个节点的ZooKeeper状态并回写数据——如回写某些读取到的数据信息。当被检查的znode版本号没有变化时,就能够经过multiop调用来检查没有被修改的znode节点的版本号,这个功能很是有用,如在检查一个或多个znode节点的版本号取决于另一个znode节点的版本号时。在咱们的主从模式的示例中,主节点须要让客户端在主节点指定的路径下添加新任务,例如,主节点要求客户端在/task-mid的子节点中添加新任务节点,其中mid为主节点的标识符,主节点在/master-path节点中保存这个路径的数据,客户端在添加新任务前,须要先读取/master-path的数据,并经过Stat获取这个节点的版本号信息,而后,客户端经过multiop的部分调用方式在/task-mid节点下添加新任务节点,同时会检查/master-path的版本号是否与以前读取的相匹配。
check方法的定义与setData方法类似,只是没有data参数:
public static Op check(String path, int version);
若是输入的path的znode节点的版本号不匹配,multi调用会失败。
经过如下简单的示例代码,来讲明如何实现上面所讨论的场景:
/** ①获取/master节点的数据。 */ byte[] masterData = zk.getData("/master-path", false, stat); /** ②从/master节点得到路径信息。*/ String parent = new String(masterData); ... zk.multi(Arrays.asList(Op.check("/master-path", stat.getVersion()), /** ③两个操做的multi调用。 */ Op.create(, modify(z1Data),-1),
从应用的角度来看,客户端每次都是经过访问ZooKeeper来获取给定znode节点的数据、一个znode节点的子节点列表或其余相关的ZooKeeper状态,这种方式并不可取。
更高效的方式为客户端本地缓存数据,并在须要时使用这些数据,一旦这些数据发生变化,你让
ZooKeeper通知客户端,客户端就能够更新缓存的数据。
另外一种方式,客户端透明地缓存客户端访问的全部ZooKeeper状态,并在更新缓存数据时将这些数据置为无效。实现这种缓存一致性的方案代价很是大。
一、写操做的顺序
ZooKeeper状态会在全部服务端所组成的所有安装中进行复制。
服务端对状态变化的顺序达成一致,并使用相同的顺序执行状态的更新。
例如,若是一个ZooKeeper的服务端执行了先创建一个/z节点的状态变化以后再删除/z节点的状态变化这个顺序的操做,全部的在集合中的服务端均需以相同的顺序执行这些变化。
二、读操做的顺序
ZooKeeper客户端老是会观察到相同的更新顺序,即便它们链接到不一样的服务端上。可是客户端多是在不一样时间观察到了更新,若是他们还在ZooKeeper之外通讯,这种差别就会更加明显。
图4-2:隐藏通道问题的例子
为了不读取到过去的数据,建议应用程序使用ZooKeeper进行全部涉及ZooKeeper状态的通讯。
例如,为了不刚刚描述的场景,c 2 能够在/z节点设置监视点来代替从c 1 直接接收消息,经过监视点,c 2就能够知道/z节点的变化,从而消除隐藏通道的问题。
三、通知的顺序
ZooKeeper对通知的排序涉及其余通知和异步响应,以及对系统状态更新的顺序。如ZooKeeper对两个状态更新进行排序,u和u',u'紧随u以后,若是u和u'分别修改了/a节点和/b节点,其中客户端c在/a节点设置了监视点,c只能观察到u'的更新,即接收到u所对应通知后读取/b节点。这种顺序可使应用经过监视点实现安全的参数配置。假设一个znode节点/z被建立或删除表示在ZooKeeper中保存的一些配置信息变为无效的。在对这个配置进行任何实际更新以前,将建立或删除的通知发给客户端,这一保障很是重要,能够确保客户端不会读取到任何无效配置。
更具体一些,假如咱们有一个znode节点/config,其子节点包含应用配置元数据:/config/m1,/config/m2,,/config/m_n。目的只是为了说明这个例子,无论这些znode节点的实际内容是什么。假如主节点应用进程经过setData更新每一个znode节点,且不能让客户端只读取到部分更新,一个解决方案就是在开始更新这些配置前主节点先建立一个/config/invalid节点,其余须要读取这一状态的客户端会监视/config/invalid节点,若是该节点存在就不会读取配置状态,当该节点被删除,就意味着有一个新的有效的配置节点集合可用,客户端能够进行读取该集合的操做。
对于这个具体的例子,咱们还可使用multiop来对/config/m[1-n]这些节点原子地执行全部setData操做,而不是使用一个znode节点来标识部分修改的状态。在例子中的原子性问题,咱们可使用multiop代替对额外znode节点或通知的依赖,不过通知机制很是通用,并且并未约束为原子性的。
由于ZooKeeper根据触发通知的状态更新对通知消息进行排序,客户端就能够经过这些通知感知到真正的状态变化的顺序。
注意:活性与安全性
在本章中,因活性普遍使用了通知机制。活性(liveness)会确保系统最终取得进展。新任务和新的从节点的通知只是关于活性的事件的例子。若是主节点没有对新任务进行通知,这个任务就永远不会被执行,至少从提交任务的客户端的视角来看,已提交的任务没有执行会致使活性缺失。原子更新一组配置节点的例子中,状况不太同样:这个例子涉及安全性,而不是活性。在更新中读取znode节点可能会致使客户端到非一致性配置信息,而invalid节点能够确保只有当合法配置信息有效时,客户端才读取正确状态。
在咱们看到的关于活性的例子中,通知的传送顺序并非特别重要,只要最终客户端最终获知这些事件就能够继续取得进展。不过为了安全性,不按顺序接收通知也许会致使不正确的行为。
避免在一个特定节点设置大量的监视点,最好是每次在特定的znode节点上,只有少许的客户端设置监视点,理想状况下最多只设置一个。
这样,每一个节点上设置的监视点只有最多一个客户端
根据YourKit( http://www.yourkit.com/ )的分析工具所分析,设置一个监视点会使服务端的监视点管理器的内存消耗上增长大约250到300个字节,设置很是多的监视点意味着监视点管理器会消耗大量的服务器内存