分布式任务&分布式锁

  目前系统中存在批量审批、批量受权等各个操做,批量操做中可能由于处理机器、线程不一样,形成刷新缓存丢失受权等信息,如批量审批同一用户权限多个权限申请后,流程平台并发的发送多个http请求到acl不一样服务器,a机器处理了受权a,b机器同时处理了受权b,而后刷新用户缓存。由于在事务里彼此看不见对方提交的数据,刷新时又彻底从db中读取要刷新的数据,就形成了互相丢失对方的数据。所以,须要一个分布式锁工具,来协调各个机器、线上的工做同步问题。html

  分布式锁千万不能用ReentrantLock,由于它的lock和unlock要求在同一线程中处理。而咱们的锁是分布式的,不必定在同一个机器上执行。java

  Zookeeper做为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于相似于文件系统的目录节点树方式的数据存储,可是 Zookeeper并非用来专门存储数据的,它的做用主要是用来维护和监控你存储的数据的状态变化。经过监控这些数据状态的变化,从而能够达到基于数据的集群管理,后面将会详细介绍 Zookeeper 可以解决的一些典型问题,这里先介绍一下,Zookeeper 的操做接口和简单使用示例。node

1、经常使用接口列表数据库

  客户端要链接Zookeeper服务器能够经过建立org.apache.zookeeper.ZooKeeper的一个实例对象,而后调用这个类提供的接口来和服务器交互。前面说了ZooKeeper主要是用来维护和监控一个目录节点树中存储的数据的状态,全部咱们可以操做 ZooKeeper 的也和操做目录节点树大致同样,如建立一个目录节点,给某个目录节点设置数据,获取某个目录节点的全部子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。apache

  org.apache.zookeeper.ZooKeeper方法列表以下,方法名及方法功能描述:设计模式

String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) 建立一个给定的目录节点 path, 并给它设置数据,CreateMode 标识有四种形式的目录节点,分别是 PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,而后返回给客户端已经成功建立的目录节点名;EPHEMERAL:临时目录节点,一旦建立这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;EPHEMERAL_SEQUENTIAL:临时自动编号节点
Stat exists(String path, boolean watch) 判断某个 path 是否存在,并设置是否监控这个目录节点,这里的 watcher 是在建立 ZooKeeper 实例时指定的 watcher,exists方法还有一个重载方法,能够指定特定的watcher
Stat exists(String path,Watcher watcher) 重载方法,这里给某个目录节点设置特定的 watcher,Watcher 在 ZooKeeper 是一个核心功能,Watcher 能够监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知全部设置在这个目录节点上的 Watcher,从而每一个客户端都很快知道它所关注的目录节点的状态发生变化,而作出相应的反应
void delete(String path, int version) 删除 path 对应的目录节点,version 为 -1 能够匹配任何版本,也就删除了这个目录节点全部数据
List<String>getChildren(String path, boolean watch) 获取指定 path 下的全部子目录节点,一样 getChildren方法也有一个重载方法能够设置特定的 watcher 监控子节点的状态
Stat setData(String path, byte[] data, int version) 给 path 设置数据,能够指定这个数据的版本号,若是 version 为 -1 怎能够匹配任何版本
byte[] getData(String path, boolean watch, Stat stat) 获取这个 path 对应的目录节点存储的数据,数据的版本等信息能够经过 stat 来指定,同时还能够设置是否监控这个目录节点数据的状态
voidaddAuthInfo(String scheme, byte[] auth) 客户端将本身的受权信息提交给服务器,服务器将根据这个受权信息验证客户端的访问权限。
Stat setACL(String path,List<ACL> acl, int version) 给某个目录节点从新设置访问权限,须要注意的是 Zookeeper 中的目录节点权限不具备传递性,父目录节点的权限不能传递给子目录节点。目录节点 ACL 由两部分组成:perms 和 id。
Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 几种 
而 id 标识了访问目录节点的身份列表,默认状况下有如下两种:
ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分别表示任何人均可以访问和建立者拥有访问权限。
List<ACL>getACL(String path,Stat stat) 获取某个目录节点的访问权限列表

 除以上这些上表中列出的方法以外还有一些重载方法,如都提供了一个回调类的重载方法以及能够设置特定 Watcher 的重载方法,具体的方法能够参考 org.apache.zookeeper.ZooKeeper 类的 API 说明。api

2、基本操做缓存

  下面给出基本的操做ZooKeeper的示例代码,这样你就能对ZooKeeper有直观的认识了。下面的清单包括了建立与 ZooKeeper 服务器的链接以及最基本的数据操做:服务器

//建立一个与服务器的链接
ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT, 
     ClientBase.CONNECTION_TIMEOUT, new Watcher() { 
         // 监控全部被触发的事件
         public void process(WatchedEvent event) { 
             System.out.println("已经触发了" + event.getType() + "事件!"); 
         } 
     }); 
// 建立一个目录节点
zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); 
// 建立一个子目录节点
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
System.out.println(new String(zk.getData("/testRootPath",false,null))); 
// 取出子目录节点列表
System.out.println(zk.getChildren("/testRootPath",true)); 
// 修改子目录节点数据
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1); 
System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]"); 
// 建立另一个子目录节点
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(), 
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null))); 
// 删除子目录节点
zk.delete("/testRootPath/testChildPathTwo",-1); 
zk.delete("/testRootPath/testChildPathOne",-1); 
// 删除父目录节点
zk.delete("/testRootPath",-1); 
// 关闭链接
zk.close(); 

// #输出的结果以下:
// 已经触发了 None 事件!
// testRootData
// [testChildPathOne]
// 目录节点状态:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6]
// 已经触发了 NodeChildrenChanged 事件!
// testChildDataTwo
// 已经触发了 NodeDeleted 事件!
// 已经触发了 NodeDeleted 事件!session

当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法就会被调用。

3、Zookeeper的应用场景

  Zookeeper从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理你们都关心的数据,而后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在 Zookeeper上注册的那些观察者作出相应的反应,从而实现集群中相似 Master/Slave 管理模式,关于 Zookeeper的详细架构等内部细节能够阅读 Zookeeper的源码。下面详细介绍这些典型的应用场景,也就是 Zookeeper 到底能帮咱们解决那些问题?下面将给出答案。

  统一命名服务(Name Service),分布式应用中,一般须要有一套完整的命名规则,既可以产生惟一的名称又便于人识别和记住,一般状况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 可以完成的功能是差很少的,它们都是将有层次的目录结构关联到必定资源上,可是 Zookeeper 的 Name Service 更加是普遍意义上的关联,也许你并不须要将名称关联到特定资源上,你可能只须要一个不会重复名称,就像数据库中产生一个惟一的数字主键同样。Name Service已是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就能够很容易建立一个目录节点。

  配置管理,配置的管理在分布式应用环境中很常见,例如同一个应用系统须要多台 PC Server 运行,可是它们运行的应用系统的某些配置项是相同的,若是要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样很是麻烦并且容易出错。像这样的配置信息彻底能够交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,而后将全部须要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,而后从 Zookeeper 获取新的配置信息应用到系统中。配置管理结构图以下:

  集群管理,Zookeeper 可以很容易的实现集群管理的功能,若有多台 Server 组成一个服务集群,那么必需要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而作出调整从新分配服务策略。一样当增长集群的服务能力时,就会增长一台或多台 Server,一样也必须让“总管”知道。Zookeeper 不只可以帮你维护当前的集群中机器的服务状态,并且可以帮你选出一个“总管”,让这个总管来管理集群,这就是 Zookeeper 的另外一个功能 Leader Election。它们的实现方式都是在 Zookeeper 上建立一个 EPHEMERAL 类型的目录节点,而后每一个 Server 在它们建立目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,因为是 EPHEMERAL 目录节点,当建立它的 Server 死去,这个目录节点也随之被删除,因此 Children 将会变化,这时 getChildren上的 Watch 将会被调用,因此其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是一样的原理。

  Zookeeper如何实现 Leader Election,也就是选出一个 Master Server。和前面的同样每台 Server 建立一个 EPHEMERAL 目录节点,不一样的是它仍是一个 SEQUENTIAL 目录节点,因此它是个 EPHEMERAL_SEQUENTIAL 目录节点。之因此它是 EPHEMERAL_SEQUENTIAL 目录节点,是由于咱们能够给每台 Server 编号,咱们能够选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,因为是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,因此当前的节点列表中又出现一个最小编号的节点,咱们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题。下图是集群管理结构图:

  这部分的示例代码以下:

 void findLeader() throws InterruptedException { 
        byte[] leader = null; 
        try { 
            leader = zk.getData(root + "/leader", true, null); 
        } catch (Exception e) { 
            logger.error(e); 
        } 
        if (leader != null) { 
            following(); 
        } else { 
            String newLeader = null; 
            try { 
                byte[] localhost = InetAddress.getLocalHost().getAddress(); 
                newLeader = zk.create(root + "/leader", localhost, 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 
            } catch (Exception e) { 
                logger.error(e); 
            } 
            if (newLeader != null) { 
                leading(); 
            } else { 
                mutex.wait(); 
            } 
        } 
    } 

  共享锁,共享锁在同一个进程中很容易实现,可是在跨进程或者在不一样 Server 之间就很差实现了。Zookeeper 却很容易实现这个功能,实现方式也是须要得到锁的 Server 建立一个 EPHEMERAL_SEQUENTIAL 目录节点,而后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是否是就是本身建立的目录节点,若是正是本身建立的,那么它就得到了这个锁,若是不是那么它就调用exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到本身建立的节点是列表中最小编号的目录节点,从而得到锁,释放锁很简单,只要删除前面它本身所建立的目录节点就好了。Zookeeper实现Locks的流程图。

  同步锁的关键代码以下:

 void getLock() throws KeeperException, InterruptedException{ 
        List<String> list = zk.getChildren(root, false); 
        String[] nodes = list.toArray(new String[list.size()]); 
        Arrays.sort(nodes); 
        if(myZnode.equals(root+"/"+nodes[0])){ 
            doAction(); 
        } 
        else{ 
            waitForLock(nodes[0]); 
        } 
    } 
    void waitForLock(String lower) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true); 
        if(stat != null){ 
            mutex.wait(); 
        } 
        else{ 
            getLock(); 
        } 
    }

  同步锁的关键思路以下:

//#加锁:
//ZooKeeper 将按照以下方式实现加锁的操做:
//1 ) ZooKeeper 调用 create ()方法来建立一个路径格式为“ _locknode_/lock- ”的节点,此节点类型为sequence (连续)和 ephemeral (临时)。
    也就是说,建立的节点为临时节点,而且全部的节点连续编号,即“ lock-i ”的格式。
//2 )在建立的锁节点上调用 getChildren ()方法,来获取锁目录下的最小编号节点,而且不设置 watch 。 //3 )步骤 2 中获取的节点刚好是步骤 1 中客户端建立的节点,那么此客户端得到此种类型的锁,而后退出操做。 //4 )客户端在锁目录上调用 exists ()方法,而且设置 watch 来监视锁目录下比本身小一个的连续临时节点的状态。 //5 )若是监视节点状态发生变化,则跳转到第 2 步,继续进行后续的操做,直到退出锁竞争。 // //#解锁: //ZooKeeper 解锁操做很是简单,客户端只须要将加锁操做步骤 1 中建立的临时节点删除便可。

  队列管理,Zookeeper 能够处理两种类型的队列:当一个队列的成员都聚齐时,这个队列才可用,不然一直等待全部成员到达,这种是同步队列;队列按照 FIFO 方式进行入队和出队操做,例如实现生产者和消费者模型。

  同步队列用 Zookeeper 实现的实现思路以下:建立一个父目录 /synchronizing,每一个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,而后每一个成员都加入这个队列,加入队列的方式就是建立 /synchronizing/member_i 的临时目录节点。而后每一个成员获取 / synchronizing 目录的全部目录节点,也就是 member_i。判断 i 的值是否已是成员的个数,若是小于成员个数等待 /synchronizing/start 的出现,若是已经相等就建立 /synchronizing/start。同步队列流程图以下:

  同步队列的关键代码:

 void addQueue() throws KeeperException, InterruptedException{ 
        zk.exists(root + "/start",true); 
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, 
        CreateMode.EPHEMERAL_SEQUENTIAL); 
        synchronized (mutex) { 
            List<String> list = zk.getChildren(root, false); 
            if (list.size() < size) { 
                mutex.wait(); 
            } else { 
                zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT); 
            } 
        } 
 }

  当队列没尽是进入 wait(),而后会一直等待 Watch 的通知,Watch 的代码以下:

 public void process(WatchedEvent event) { 
        if(event.getPath().equals(root + "/start") &&
         event.getType() == Event.EventType.NodeCreated){ 
            System.out.println("获得通知"); 
            super.process(event); 
            doAction(); 
        } 
    } 

  FIFO 队列用 Zookeeper 实现思路以下:实现的思路也很是简单,就是在特定的目录下建立 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证全部成员加入队列时都是有编号的,出队列时经过 getChildren()方法能够返回当前全部的队列中的元素,而后消费其中最小的一个,这样就能保证 FIFO。下面是生产者和消费者这种队列形式的示例代码:

#生产者代码
boolean
produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; }

 

 
#消费者代码
int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() == 0) { mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } byte[] b = zk.getData(root + "/element" + min,false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }

4、总结

  Zookeeper 做为 Hadoop 项目中的一个子项目,是 Hadoop 集群管理的一个必不可少的模块,它主要用来控制集群中的数据,如它管理 Hadoop 集群中的 NameNode,还有 Hbase 中 Master Election、Server 之间状态同步等。本文介绍的 Zookeeper 的基本知识,以及介绍了几个典型的应用场景。这些都是 Zookeeper 的基本功能,最重要的是 Zoopkeeper 提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,并对树中的节点进行有效管理,从而能够设计出多种多样的分布式的数据管理模型,而不只仅局限于上面提到的几个经常使用应用场景。

参考博客:

1.http://www.cnblogs.com/ggjucheng/p/3370359.html

相关文章
相关标签/搜索