ZooKeeper学习第五期--ZooKeeper管理分布式环境中的数据(转)

转载来源:https://www.cnblogs.com/sunddenly/p/4092654.html

引言

本节原本是要介绍ZooKeeper的实现原理,可是ZooKeeper的原理比较复杂,它涉及到了paxos算法、Zab协议、通讯协议等相关知识,理解起来比较抽象因此还须要借助一些应用场景,来帮咱们理解。因为内容比较多,一口气吃不成胖子,得慢慢来一步一个脚印,所以我对后期ZooKeeper的学习规划以下:html

第一阶段: java

|---理解ZooKeeper的应用 node

    |---ZooKeeper是什么web

    |---ZooKeeper能干什么算法

    |---ZooKeeper 怎么使用数据库

第二阶段: apache

|---理解ZooKeeper原理准备 服务器

    |---了解paxos网络

    |---理解 zab原理session

    |---理解选举/同步流程

第三阶段:

    |---深刻ZooKeeper原理

        |---分析源码

        |---尝试开发分布式应用

因为内容较多,并且理解较为复杂,因此每一个阶段分开来学习和介绍,那么本文主要介绍的的是第一阶段,该阶段通常应该放在前面介绍,但感受像一些ZooKeeper应用案例,若是没有必定的ZooKeeper基础,理解起来也比较抽象, 因此放在这介绍。你们能够对比一下前面的应用程序,来对比理解一下前面的那些应用到底用到ZooKeeper的那些功能,来进一步理解ZooKeeper的实现理念,因为网上关于这方面的介绍比较多,若是一些可爱的博友对该内容已经比较了解,那么您能够不用往下看了,继续下一步学习。

1、ZooKeeper产生背景

1.1 分布式的发展

分布式这个概念我想你们并不陌生,但真正实战开始还要从google提及,很早之前在实验室中分布式被人提出,但是说是计算机内入行较为复杂学习较为困难的技术,而且市场也并不成熟,所以大规模的商业应用一直未成出现,但从Google 发布了MapReduceDFS 以及Bigtable的论文以后,分布式在计算机界的格局就发生了变化,从架构上实现了分布式的难题,而且成熟的应用在了海量数据存储计算上,其集群的规模也是当前世界上最为庞大的。

以DFS 为基础的分布式计算框架keyvalue 数据高效的解决运算的瓶颈,并且开发人员不用再写复杂的分布式程序,只要底层框架完备开发人员只要用较少的代码就能够完成分布式程序的开发,这使得开发人员只须要关注业务逻辑的便可。Google 在业界技术上的领军地位,让业界可望不可即的技术实力,IT 所以也是对Google 所退出的技术十分推崇。在最近几年中分布式则是成为了海量数据存储以及计算、高并发、高可靠性、高可用性的解决方案。

1.2 ZooKeeper的产生

众所周知一般分布式架构都是中心化的设计,就是一个主控机链接多个处理节点。问题能够从这里考虑,当主控机失效时,整个系统则就没法访问了,因此保证系统的高可用性是很是关键之处,也就是要保证主控机的高可用性。分布式锁就是一个解决该问题的较好方案,多主控机抢一把锁。在这里咱们就涉及到了咱们的重点Zookeeper。

ZooKeeper是什么,chubby 我想你们都不会陌生的,chubby 是实现Google 的一个分布式锁的实现,运用到了paxos 算法解决的一个分布式事务管理的系统。Zookeeper 就是雅虎模仿强大的Google chubby 实现的一套分布式锁管理系统。同时,Zookeeper 分布式服务框架是Apache Hadoop的一个子项目,它是一个针对大型分布式系统的可靠协调系统,它主要是用来解决分布式应用中常常遇到的一些数据管理问题,能够高可靠的维护元数据。提供的功能包括:配置维护、名字服务、分布式同步、组服务等。ZooKeeper的设计目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

1.3 ZooKeeper的使用

Zookeeper 做为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于相似于文件系统的目录节点树方式的数据存储可是 Zookeeper 并非用来专门存储数据的,它的做用主要是用来维护监控你存储的数据的状态变化。经过监控这些数据状态的变化,从而能够达到基于数据的集群管理,后面将 会详细介绍 Zookeeper 可以解决的一些典型问题。

注意一下这里的"数据"是有限制的:

(1) 从数据大小来看:咱们知道ZooKeeper的数据存储在一个叫ReplicatedDataBase 的数据库中,该数据是一个内存数据库,既然是在内存当中,我就应该知道该数据量就应该不会太大,这一点上就与hadoop的HDFS有了很大的区别,HDFS的数据主要存储在磁盘上,所以数据存储主要是HDFS的事,而ZooKeeper主要是协调功能,并非用来存储数据的。

(2) 从数据类型来看:正如前面所说的,ZooKeeper的数据在内存中,因为内存空间的限制,那么咱们就不能在上面为所欲为的存储数据,因此ZooKeeper存储的数据都是咱们所关心的数据并且数据量还不能太大,并且还会根据咱们要以实现的功能来选择相应的数据。简单来讲,干什么事存什么数据,ZooKeeper所实现的一切功能,都是由ZK节点的性质该节点所关联的数据实现的,至于关联什么数据那就要看你干什么事了。

例如:

  ① 集群管理:利用临时节点特性,节点关联的是机器的主机名、IP地址等相关信息,集群单点故障也属于该范畴。

  ② 统一命名:主要利用节点的惟一性和目录节点树结构。

  ③ 配置管理:节点关联的是配置信息。

  ④ 分布式锁:节点关联的是要竞争的资源。

2、ZooKeeper应用场景

ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得zookeeper可以应用于不少场景。须要注意的是,ZK并非生来就为这些场景设计,都是后来众多开发者根据框架的特性,摸索出来的典型使用方法。所以,咱们也能够根据本身的须要来设计相应的场景实现。正如前文所提到的,ZooKeeper 实现的任何功能都离不开ZooKeeper的数据结构,任何功能的实现都是利用"Znode结构特性+节点关联的数据"来实现的,好吧那么咱们就看一下ZooKeeper数据结构有哪些特性。ZooKeeper数据结构以下图所示:

图2.1 ZooKeeper数据结构

Zookeeper 这种数据结构有以下这些特色:

每一个子目录项如 NameService 都被称做为 znode,这个 znode 是被它所在的路径惟一标识,如 Server1 这个 znode 的标识为 /NameService/Server1

znode 能够有子节点目录,而且每一个 znode 能够存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录;

znode 是有版本的,每一个 znode 中存储的数据能够有多个版本,也就是一个访问路径中能够存储多份数据

znode 能够是临时节点,一旦建立这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通讯采用长链接方式,每一个客户端和服务器经过心跳来保持链接,这个链接状态称为 session,若是 znode 是临时节点,这个 session 失效,znode 也就删除了;

znode 的目录名能够自动编号,如 App1 已经存在,再建立的话,将会自动命名为 App2;

znode 能够被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化能够通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的不少功能都是基于这个特性实现的。

2.1数据发布与订阅

(1) 典型场景描述

发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息地址列表等就很是适合使用。集中式的配置管理在应用集群中是很是常见的,通常商业公司内部都会实现一套集中的配置管理中心,应对不一样的应用集群对于共享各自配置的需求,而且在配置变动时可以通知到集群中的每个机器。

(2) 应用

索引信息和集群中机器节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。

系统日志(通过处理后的)存储,这些日志一般2-3天后被清除。

应用中用到的一些配置信息集中管理,在应用启动的时候主动来获取一次,而且在节点上注册一个Watcher,之后每次配置有更新,实时通知到应用,获取最新配置信息。

业务逻辑中须要用到的一些全局变量,好比一些消息中间件的消息队列一般有个offset,这个offset存放在zk上,这样集群中每一个发送者都能知道当前的发送进度

系统中有些信息须要动态获取,而且还会存在人工手动去修改这个信息。之前一般是暴露出接口,例如JMX接口,有了ZK后,只要将这些信息存放到ZK节点上便可。

(3) 应用举例

例如:同一个应用系统须要多台 PC Server 运行,可是它们运行的应用系统的某些配置项是相同的,若是要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样很是麻烦并且容易出错。将配置信息保存在 Zookeeper 的某个目录节点中,而后将全部须要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,而后从 Zookeeper 获取新的配置信息应用到系统中。ZooKeeper配置管理服务以下图所示:

图2.2 配置管理结构图

Zookeeper很容易实现这种集中式的配置管理,好比将所须要的配置信息放到/Configuration 节点上,集群中全部机器一启动就会经过Client/Configuration这个节点进行监控【zk.exist("/Configuration″,true)】,而且实现Watcher回调方法process(),那么在zookeeper上/Configuration节点下数据发生变化的时候,每一个机器都会收到通知,Watcher回调方法将会被执行,那么应用再取下数据便可【zk.getData("/Configuration″,false,null)】。

2.2统一命名服务(Name Service)

(1) 场景描述

分布式应用中,一般须要有一套完整的命名规则,既可以产生惟一的名称又便于人识别和记住,一般状况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 可以完成的功能是差很少的,它们都是将有层次的目录结构关联到必定资源上,可是Zookeeper的Name Service 更加是普遍意义上的关联,也许你并不须要将名称关联到特定资源上,你可能只须要一个不会重复名称,就像数据库中产生一个惟一的数字主键同样。

(2) 应用

在分布式系统中,经过使用命名服务,客户端应用可以根据指定的名字来获取资源服务的地址提供者等信息。被命名的实体一般能够是集群中的机器,提供的服务地址进程对象等等,这些咱们均可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。经过调用ZK提供的建立节点的API,可以很容易建立一个全局惟一的path,这个path就能够做为一个名称。Name Service 已是Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就能够很容易建立一个目录节点。

(3) 应用举例

阿里开源的分布式服务框架Dubbo中使用ZooKeeper来做为其命名服务,维护全局的服务地址列表。在Dubbo实现中: 服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入本身的URL地址,这个操做就完成了服务的发布。 服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入本身的URL地址。 注意,全部向ZK上注册的地址都是临时节点,这样就可以保证服务提供者和消费者可以自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下全部提供者和消费者的信息。

2.3分布通知/协调(Distribution of notification/coordination)

(1) 典型场景描述

ZooKeeper中特有watcher注册与异步通知机制,可以很好的实现分布式环境下不一样系统之间的通知与协调,实现对数据变动的实时处理。使用方法一般是不一样系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode自己内容及子节点的),其中一个系统update了znode,那么另外一个系统可以收到通知,并做出相应处理。

(2) 应用

另外一种心跳检测机制检测系统被检测系统之间并不直接关联起来,而是经过ZK上某个节点关联,大大减小系统耦合。

另外一种系统调度模式:某系统由控制台推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工做。管理人员在控制台做的一些操做,其实是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,因而,做出相应的推送任务。

另外一种工做汇报模式:一些相似于任务分发系统子任务启动后,到ZK来注册一个临时节点,而且定时将本身的进度进行汇报(将进度写回这个临时节点),这样任务管理者就可以实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调可以大大下降系统之间的耦合。

2.4分布式锁(Distribute Lock)

(1) 场景描述

分布式锁,这个主要得益于ZooKeeper为咱们保证了数据的强一致性,即用户只要彻底相信每时每刻,zk集群中任意节点(一个zk server)上的相同znode的数据是必定是相同的。锁服务能够分为两类,一个是保持独占,另外一个是控制时序。

保持独占,就是全部试图来获取这个锁的客户端,最终只有一个能够成功得到这把锁。一般的作法是把ZK上的一个znode看做是一把锁,经过create znode的方式来实现。全部客户端都去建立 /distribute_lock 节点,最终成功建立的那个客户端也即拥有了这把锁。

控制时序,就是全部试图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。作法和上面基本相似,只是这里 /distribute_lock 已经预先存在,客户端在它下面建立临时有序节点。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点建立的时序性,从而也造成了每一个客户端的全局时序。

(2) 应用

共享锁在同一个进程中很容易实现,可是在跨进程或者在不一样 Server 之间就很差实现了。Zookeeper 却很容易实现这个功能,实现方式也是须要得到锁的 Server 建立一个 EPHEMERAL_SEQUENTIAL 目录节点,而后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是否是就是本身建立的目录节点,若是正是本身建立的,那么它就得到了这个锁,若是不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到本身建立的节点是列表中最小编号的目录节点,从而得到锁,释放锁很简单,只要删除前面它本身所建立的目录节点就好了。

图 2.3 ZooKeeper实现Locks的流程图

代码清单1 TestMainClient 代码

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.xml.DOMConfigurator;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.Watcher;
  6. import org.apache.zookeeper.ZooKeeper;
  7.  
  8. import java.io.IOException;
  9.  
  10. /**
  11.  * TestMainClient
  12.  * <p/>
  13.  * Author By: sunddenly工做室
  14.  * Created Date: 2014-11-13
  15.  */
  16. public class TestMainClient implements Watcher {
  17.     protected static ZooKeeper zk = null;
  18.     protected static Integer mutex;
  19.     int sessionTimeout = 10000;
  20.     protected String root;
  21.     public TestMainClient(String connectString) {
  22.         if(zk == null){
  23.             try {
  24.  
  25.                 String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
  26.                 DOMConfigurator.configure(configFile);
  27.                 System.out.println(" 建立一个新的链接: ");
  28.                 zk = new ZooKeeper(connectString, sessionTimeout, this);
  29.                 mutex = new Integer(-1);
  30.             } catch (IOException e) {
  31.                 zk = null;
  32.             }
  33.         }
  34.     }
  35.    synchronized public void process(WatchedEvent event) {
  36.         synchronized (mutex) {
  37.             mutex.notify();
  38.         }
  39.     }
  40. }

清单 2 Locks 代码

  1. package org.zk.locks;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9. import org.zk.leader.election.TestMainClient;
  10.  
  11. import java.util.Arrays;
  12. import java.util.List;
  13.  
  14. /**
  15.  * locks
  16.  * <p/>
  17.  * Author By: sunddenly工做室
  18.  * Created Date: 2014-11-13 16:49:40
  19.  */
  20. public class Locks extends TestMainClient {
  21.     public static final Logger logger = Logger.getLogger(Locks.class);
  22.     String myZnode;
  23.  
  24.     public Locks(String connectString, String root) {
  25.         super(connectString);
  26.         this.root = root;
  27.         if (zk != null) {
  28.             try {
  29.                 Stat s = zk.exists(root, false);
  30.                 if (s == null) {
  31.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  32.                 }
  33.             } catch (KeeperException e) {
  34.                 logger.error(e);
  35.             } catch (InterruptedException e) {
  36.                 logger.error(e);
  37.             }
  38.         }
  39.     }
  40.     void getLock() throws KeeperException, InterruptedException{
  41.         List<String> list = zk.getChildren(root, false);
  42.         String[] nodes = list.toArray(new String[list.size()]);
  43.         Arrays.sort(nodes);
  44.         if(myZnode.equals(root+"/"+nodes[0])){
  45.             doAction();
  46.         }
  47.         else{
  48.             waitForLock(nodes[0]);
  49.         }
  50.     }
  51.     void check() throws InterruptedException, KeeperException {
  52.         myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  53.         getLock();
  54.     }
  55.     void waitForLock(String lower) throws InterruptedException, KeeperException {
  56.         Stat stat = zk.exists(root + "/" + lower,true);
  57.         if(stat != null){
  58.             mutex.wait();
  59.         }
  60.         else{
  61.             getLock();
  62.         }
  63.     }
  64.     @Override
  65.     public void process(WatchedEvent event) {
  66.         if(event.getType() == Event.EventType.NodeDeleted){
  67.             System.out.println(" 获得通知 ");
  68.             super.process(event);
  69.             doAction();
  70.         }
  71.     }
  72.     /**
  73.      * 执行其余任务
  74.      */
  75.     private void doAction(){
  76.         System.out.println(" 同步队列已经获得同步,能够开始执行后面的任务了 ");
  77.     }
  78.  
  79.     public static void main(String[] args) {
  80.         String connectString = "localhost:2181";
  81.  
  82.         Locks lk = new Locks(connectString, "/locks");
  83.         try {
  84.             lk.check();
  85.         } catch (InterruptedException e) {
  86.             logger.error(e);
  87.         } catch (KeeperException e) {
  88.             logger.error(e);
  89.         }
  90.     }
  91. }

2.5 集群管理(Cluster Management)

(1) 典型场景描述

集群机器监控

这一般用于那种对集群中机器状态,机器在线率有较高要求的场景,可以快速对集群中机器变化做出响应。这样的场景中,每每有一个监控系统,实时检测集群机器是否存活。过去的作法一般是:监控系统经过某种手段(好比ping)定时检测每一个机器,或者每一个机器本身定时向监控系统汇报"我还活着"。 这种作法可行,可是存在两个比较明显的问题:

集群中机器有变更的时候,牵连修改的东西比较多。

有必定的延时。

利用ZooKeeper中两个特性,就能够实施另外一种集群机器存活性监控系统:

客户端在节点 x 上注册一个Watcher,那么若是 x 的子节点变化了,会通知该客户端。

建立EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过时,那么该节点就会消失。

Master选举:

Master选举则是zookeeper中最为经典的使用场景了,在分布式环境中,相同的业务应用分布在不一样的机器上,有些业务逻辑,例如一些耗时的计算,网络I/O处,每每只须要让整个集群中的某一台机器进行执行,其他机器能够共享这个结果,这样能够大大减小重复劳动,提升性能,因而这个master选举即是这种场景下的碰到的主要问题。

利用ZooKeeper中两个特性,就能够实施另外一种集群中Master选举:

利用ZooKeeper的强一致性,可以保证在分布式高并发状况下节点建立的全局惟一性,即:同时有多个客户端请求建立 /Master 节点,最终必定只有一个客户端请求可以建立成功。利用这个特性,就能很轻易的在分布式环境中进行集群选举了。

另外,这种场景演化一下,就是动态Master选举。这就要用到 EPHEMERAL_SEQUENTIAL类型节点的特性了,这样每一个节点会自动被编号。容许全部请求都可以建立成功,可是得有个建立顺序,每次选取序列号最小的那个机器做为Master 。

(2) 应用

在搜索系统中,若是集群中每一个机器都生成一份全量索引,不只耗时,并且不能保证彼此间索引数据一致。所以让集群中的Master来迚行全量索引的生成,而后同步到集群中其它机器。另外,Master选丼的容灾措施是,能够随时迚行手动挃定master,就是说应用在zk在没法获取master信息时,能够经过好比http方式,向一个地方获取master。  在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把本身以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster能够随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会从新选丼出一个HMaster来运行,从而避免了HMaster的单点问题的存活状态,同时,一旦HMaster出现问题,会从新选丼出一个HMaster来运行,从而避免了HMaster的单点问题。

(3) 应用举例

集群监控:

应用集群中,咱们经常须要让每个机器知道集群中或依赖的其余某一个集群中哪些机器是活着的,而且在集群机器由于宕机,网络断链等缘由可以不在人工介入的状况下迅速通知到每个机器,Zookeeper 可以很容易的实现集群管理的功能,若有多台 Server 组成一个服务集群,那么必需要一个"总管"知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而作出调整从新分配服务策略。一样当增长集群的服务能力时,就会增长一台或多台 Server,一样也必须让"总管"知道,这就是ZooKeeper的集群监控功能。

图2.4 集群管理结构图

好比我在zookeeper服务器端有一个znode叫/Configuration,那么集群中每个机器启动的时候都去这个节点下建立一个EPHEMERAL类型的节点,好比server1建立/Configuration /Server1,server2建立/Configuration /Server1,而后Server1和Server2都watch /Configuration 这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。由于EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端链接断掉或者session过时就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消 失,而后集群中全部对/Configuration进行watch的客户端都会收到通知,而后取得最新列表便可。

Master选举:

Zookeeper 不只可以维护当前的集群中机器的服务状态,并且可以选出一个"总管",让这个总管来管理集群,这就是 Zookeeper 的另外一个功能 Leader Election。Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的同样每台 Server 建立一个 EPHEMERAL 目录节点,不一样的是它仍是一个 SEQUENTIAL 目录节点,因此它是个 EPHEMERAL_SEQUENTIAL 目录节点。之因此它是 EPHEMERAL_SEQUENTIAL 目录节点,是由于咱们能够给每台 Server 编号,咱们能够选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,因为是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,因此当前的节点列表中又出现一个最小编号的节点,咱们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题

清单 3 Leader Election代码

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9.  
  10. import java.net.InetAddress;
  11. import java.net.UnknownHostException;
  12.  
  13. /**
  14.  * LeaderElection
  15.  * <p/>
  16.  * Author By: sunddenly工做室
  17.  * Created Date: 2014-11-13
  18.  */
  19. public class LeaderElection extends TestMainClient {
  20.     public static final Logger logger = Logger.getLogger(LeaderElection.class);
  21.  
  22.     public LeaderElection(String connectString, String root) {
  23.         super(connectString);
  24.         this.root = root;
  25.         if (zk != null) {
  26.             try {
  27.                 Stat s = zk.exists(root, false);
  28.                 if (s == null) {
  29.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  30.                 }
  31.             } catch (KeeperException e) {
  32.                 logger.error(e);
  33.             } catch (InterruptedException e) {
  34.                 logger.error(e);
  35.             }
  36.         }
  37.     }
  38.  
  39.     void findLeader() throws InterruptedException, UnknownHostException, KeeperException {
  40.         byte[] leader = null;
  41.         try {
  42.             leader = zk.getData(root + "/leader", true, null);
  43.         } catch (KeeperException e) {
  44.             if (e instanceof KeeperException.NoNodeException) {
  45.                 logger.error(e);
  46.             } else {
  47.                 throw e;
  48.             }
  49.         }
  50.         if (leader != null) {
  51.             following();
  52.         } else {
  53.             String newLeader = null;
  54.             byte[] localhost = InetAddress.getLocalHost().getAddress();
  55.             try {
  56.                 newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  57.             } catch (KeeperException e) {
  58.                 if (e instanceof KeeperException.NodeExistsException) {
  59.                     logger.error(e);
  60.                 } else {
  61.                     throw e;
  62.                 }
  63.             }
  64.             if (newLeader != null) {
  65.                 leading();
  66.             } else {
  67.                 mutex.wait();
  68.             }
  69.         }
  70.     }
  71.  
  72.     @Override
  73.     public void process(WatchedEvent event) {
  74.         if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
  75.             System.out.println(" 获得通知 ");
  76.             super.process(event);
  77.             following();
  78.         }
  79.     }
  80.  
  81.     void leading() {
  82.         System.out.println(" 成为领导者 ");
  83.     }
  84.  
  85.     void following() {
  86.         System.out.println(" 成为组成员 ");
  87.     }
  88.  
  89.     public static void main(String[] args) {
  90.         String connectString = "localhost:2181";
  91.  
  92.         LeaderElection le = new LeaderElection(connectString, "/GroupMembers");
  93.         try {
  94.             le.findLeader();
  95.         } catch (Exception e) {
  96.             logger.error(e);
  97.         }
  98.     }
  99. }

2.6 队列管理

Zookeeper 能够处理两种类型的队列:

当一个队列的成员都聚齐时,这个队列才可用,不然一直等待全部成员到达,这种是同步队列

队列按照 FIFO 方式进行入队和出队操做,例如实现生产者消费者模型

(1) 同步队列用 Zookeeper 实现的实现思路以下:

建立一个父目录 /synchronizing,每一个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,而后每一个成员都加入这个队列,加入队列的方式就是建立 /synchronizing/member_i 的临时目录节点,而后每一个成员获取 / synchronizing 目录的全部目录节点,也就是 member_i。判断 i 的值是否已是成员的个数,若是小于成员个数等待 /synchronizing/start 的出现,若是已经相等就建立 /synchronizing/start。

用下面的流程图更容易理解:

图 2.5 同步队列流程图

 

清单 4 Synchronizing 代码

  1. package org.zk.queue;
  2.  
  3. import java.net.InetAddress;
  4. import java.net.UnknownHostException;
  5. import java.util.List;
  6.  
  7. import org.apache.log4j.Logger;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.KeeperException;
  10. import org.apache.zookeeper.WatchedEvent;
  11. import org.apache.zookeeper.Watcher;
  12. import org.apache.zookeeper.ZooKeeper;
  13. import org.apache.zookeeper.ZooDefs.Ids;
  14. import org.apache.zookeeper.data.Stat;
  15. import org.zk.leader.election.TestMainClient;
  16.  
  17. /**
  18.  * Synchronizing
  19.  * <p/>
  20.  * Author By: sunddenly工做室
  21.  * Created Date: 2014-11-13
  22.  */
  23. public class Synchronizing extends TestMainClient {
  24.     int size;
  25.     String name;
  26.     public static final Logger logger = Logger.getLogger(Synchronizing.class);
  27.  
  28.     /**
  29.      * 构造函数
  30.      *
  31.      * @param connectString 服务器链接
  32.      * @param root 根目录
  33.      * @param size 队列大小
  34.      */
  35.     Synchronizing(String connectString, String root, int size) {
  36.         super(connectString);
  37.         this.root = root;
  38.         this.size = size;
  39.  
  40.         if (zk != null) {
  41.             try {
  42.                 Stat s = zk.exists(root, false);
  43.                 if (s == null) {
  44.                     zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  45.                 }
  46.             } catch (KeeperException e) {
  47.                 logger.error(e);
  48.             } catch (InterruptedException e) {
  49.                 logger.error(e);
  50.             }
  51.         }
  52.         try {
  53.             name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
  54.         } catch (UnknownHostException e) {
  55.             logger.error(e);
  56.         }
  57.  
  58.     }
  59.  
  60.     /**
  61.      * 加入队列
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.  
  68.     void addQueue() throws KeeperException, InterruptedException{
  69.         zk.exists(root + "/start",true);
  70.         zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  71.         synchronized (mutex) {
  72.             List<String> list = zk.getChildren(root, false);
  73.             if (list.size() < size) {
  74.                 mutex.wait();
  75.             } else {
  76.                 zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  77.             }
  78.         }
  79.     }
  80.  
  81.     @Override
  82.     public void process(WatchedEvent event) {
  83.         if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
  84.             System.out.println(" 获得通知 ");
  85.             super.process(event);
  86.             doAction();
  87.         }
  88.     }
  89.  
  90.     /**
  91.      * 执行其余任务
  92.      */
  93.     private void doAction(){
  94.         System.out.println(" 同步队列已经获得同步,能够开始执行后面的任务了 ");
  95.     }
  96.  
  97.     public static void main(String args[]) {
  98.         // 启动 Server
  99.         String connectString = "localhost:2181";
  100.         int size = 1;
  101.         Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);
  102.         try{
  103.             b.addQueue();
  104.         } catch (KeeperException e){
  105.             logger.error(e);
  106.         } catch (InterruptedException e){
  107.             logger.error(e);
  108.         }
  109.     }
  110. }

(2) FIFO 队列用 Zookeeper 实现思路以下:

实现的思路也很是简单,就是在特定的目录下建立 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证全部成员加入队列时都是有编号的,出队列时经过 getChildren( ) 方法能够返回当前全部的队列中的元素,而后消费其中最小的一个,这样就能保证 FIFO。

下面是生产者和消费者这种队列形式的示例代码

清单 5 FIFOQueue 代码

  1. import org.apache.log4j.Logger;
  2. import org.apache.zookeeper.CreateMode;
  3. import org.apache.zookeeper.KeeperException;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.ZooDefs;
  6. import org.apache.zookeeper.data.Stat;
  7.  
  8. import java.nio.ByteBuffer;
  9. import java.util.List;
  10.  
  11. /**
  12.  * FIFOQueue
  13.  * <p/>
  14.  * Author By: sunddenly工做室
  15.  * Created Date: 2014-11-13
  16.  */
  17. public class FIFOQueue extends TestMainClient{
  18.     public static final Logger logger = Logger.getLogger(FIFOQueue.class);
  19.  
  20.     /**
  21.      * Constructor
  22.      *
  23.      * @param connectString
  24.      * @param root
  25.      */
  26.     FIFOQueue(String connectString, String root) {
  27.         super(connectString);
  28.         this.root = root;
  29.         if (zk != null) {
  30.             try {
  31.                 Stat s = zk.exists(root, false);
  32.                 if (s == null) {
  33.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  34.                 }
  35.             } catch (KeeperException e) {
  36.                 logger.error(e);
  37.             } catch (InterruptedException e) {
  38.                 logger.error(e);
  39.             }
  40.         }
  41.     }
  42.     /**
  43.      * 生产者
  44.      *
  45.      * @param i
  46.      * @return
  47.      */
  48.  
  49.     boolean produce(int i) throws KeeperException, InterruptedException{
  50.         ByteBuffer b = ByteBuffer.allocate(4);
  51.         byte[] value;
  52.         b.putInt(i);
  53.         value = b.array();
  54.         zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  55.                     CreateMode.PERSISTENT_SEQUENTIAL);
  56.         return true;
  57.     }
  58.  
  59.  
  60.     /**
  61.      * 消费者
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.     int consume() throws KeeperException, InterruptedException{
  68.         int retvalue = -1;
  69.         Stat stat = null;
  70.         while (true) {
  71.             synchronized (mutex) {
  72.                 List<String> list = zk.getChildren(root, true);
  73.                 if (list.size() == 0) {
  74.                     mutex.wait();
  75.                 } else {
  76.                     Integer min = new Integer(list.get(0).substring(7));
  77.                     for(String s : list){
  78.                         Integer tempValue = new Integer(s.substring(7));
  79.                         if(tempValue < min) min = tempValue;
  80.                     }
  81.                     byte[] b = zk.getData(root + "/element" + min,false, stat);
  82.                     zk.delete(root + "/element" + min, 0);
  83.                     ByteBuffer buffer = ByteBuffer.wrap(b);
  84.                     retvalue = buffer.getInt();
  85.                     return retvalue;
  86.                 }
  87.             }
  88.         }
  89.     }
  90.  
  91.     @Override
  92.     public void process(WatchedEvent event) {
  93.         super.process(event);
  94.     }
  95.  
  96.     public static void main(String args[]) {
  97.         // 启动 Server
  98.         TestMainServer.start();
  99.         String connectString = "localhost:"+TestMainServer.CLIENT_PORT;
  100.  
  101.         FIFOQueue q = new FIFOQueue(connectString, "/app1");
  102.         int i;
  103.         Integer max = new Integer(5);
  104.  
  105.         System.out.println("Producer");
  106.         for (i = 0; i < max; i++)
  107.             try{
  108.                 q.produce(10 + i);
  109.             } catch (KeeperException e){
  110.                 logger.error(e);
  111.             } catch (InterruptedException e){
  112.                 logger.error(e);
  113.             }
  114.  
  115.         for (i = 0; i < max; i++) {
  116.             try{
  117.                 int r = q.consume();
  118.                 System.out.println("Item: " + r);
  119.             } catch (KeeperException e){
  120.                 i--;
  121.                 logger.error(e);
  122.             } catch (InterruptedException e){
  123.                 logger.error(e);
  124.             }
  125.         }
  126.  
  127.     }
  128. }

3、ZooKeeper实际应用

假设咱们的集群有:

(1) 20个搜索引擎的服务器:每一个负责总索引中的一部分的搜索任务。

搜索引擎的服务器中的15个服务器如今提供搜索服务

5个服务器正在生成索引

这20个搜索引擎的服务器,常常要让正在提供搜索服务的服务器中止提供服务开始生成索引,或生成索引的服务器已经把索引生成完成能够搜索提供服务了。

(2) 一个总服务器:负责向这20个搜索引擎的服务器发出搜索请求并合并结果集。

(3) 一个备用的总服务器:负责当总服务器宕机时替换总服务器。

(4) 一个web的cgi:向总服务器发出搜索请求。

使用Zookeeper能够保证:

(1) 总服务器:自动感知有多少提供搜索引擎的服务器,并向这些服务器发出搜索请求。

(2) 备用的总服务器:宕机时自动启用备用的总服务器。

(3) web的cgi:可以自动地获知总服务器的网络地址变化

(4) 实现以下:

提供搜索引擎的服务器都在Zookeeper中建立znode,zk.create("/search/nodes/node1", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

② 总服务器能够从Zookeeper中获取一个znode的子节点的列表,zk.getChildren("/search/nodes", true);

总服务器遍历这些子节点,并获取子节点的数据生成提供搜索引擎的服务器列表

当总服务器接收到子节点改变的事件信息,从新返回第二步;

总服务器在Zookeeper中建立节点,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

⑥ 备用的总服务器监控Zookeeper中的"/search/master"节点。当这个znode的节点数据改变时,把本身启动变成总服务器,并把本身的网络地址数据放进这个节点。

web的cgi从Zookeeper中"/search/master"节点获取总服务器的网络地址数据,并向其发送搜索请求。

web的cgi监控Zookeeper中的"/search/master"节点,当这个znode的节点数据改变时,从这个节点获取总服务器的网络地址数据,并改变当前的总服务器的网络地址。

相关文章
相关标签/搜索