转载来源:https://www.cnblogs.com/sunddenly/p/4064992.html
1、配置服务
配置服务是分布式应用所须要的基本服务之一,它使集群中的机器能够共享配置信息中那些公共的部分。简单地说,ZooKeeper能够做为一个具备高可用性的配置存储器,容许分布式应用的参与者检索和更新配置文件。使用ZooKeeper中的观察机制,能够创建一个活跃的配置服务,使那些感兴趣的客户端可以得到配置信息修改的通知。html
下面来编写一个这样的服务。咱们经过两个假设来简化所需实现的服务(稍加修改就能够取消这两个假设)。java
第一,咱们惟一须要存储的配置数据是字符串,关键字是znode的路径,所以咱们在每一个znode上存储了一个键/值对。
第二,在任什么时候候只有一个客户端会执行更新操做。node
除此以外,这个模型看起来就像是有一个主人(相似于HDFS中的namenode)在更新信息,而他的工人则须要遵循这些信息。程序员
在名为ActiveKeyValueStore的类中编写了以下代码:算法

package org.zk; import java.nio.charset.Charset; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class ActiveKeyValueStore extends ConnectionWatcher { private static final Charset CHARSET=Charset.forName("UTF-8"); public void write(String path,String value) throws KeeperException, InterruptedException { Stat stat = zk.exists(path, false); if(stat==null){ zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET),-1); } } public String read(String path,Watcher watch) throws KeeperException, InterruptedException{ byte[] data = zk.getData(path, watch, null); return new String(data,CHARSET); } }
write()方法的任务是将一个关键字及其值写到ZooKeeper。它隐藏了建立一个新的znode和用一个新值更新现有znode之间的区别,而是使用exists操做来检测znode是否存在,而后再执行相应的操做。其余值得一提的细节是须要将字符串值转换为字节数组,由于咱们只用了UTF-8编码的getBytes()方法。☆☆☆apache
read()方法的任务是读取一个节点的配置属性。ZooKeeper的getData()方法有三个参数:数组
(1)路径
(2)一个观察对象
(3)一个Stat对象安全
Stat对象由getData()方法返回的值填充,用来将信息回传给调用者。经过这个方法,调用者能够得到一个znode的数据和元数据,但在这个例子中,因为咱们对元数据不感兴趣,所以将Stat参数设为null。服务器
为了说明ActiveKeyValueStore的用法,咱们编写了一个用来更新配置属性值的类ConfigUpdater,如代码1.1所示。网络
代码1.1 用于随机更新ZooKeeper中的属性

package org.zk; import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.KeeperException; public class ConfigUpdater { public static final String PATH="/config"; private ActiveKeyValueStore store; private Random random=new Random(); public ConfigUpdater(String hosts) throws IOException, InterruptedException { store = new ActiveKeyValueStore(); store.connect(hosts); } public void run() throws InterruptedException, KeeperException{ while(true){ String value=random.nextInt(100)+""; store.write(PATH, value); System.out.printf("Set %s to %s\n",PATH,value); TimeUnit.SECONDS.sleep(random.nextInt(100)); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ConfigUpdater configUpdater = new ConfigUpdater(args[0]); configUpdater.run(); } }
这个程序很简单,ConfigUpdater中定义了一个ActiveKeyValueStore,它在ConfigUpdater的构造函数中链接到ZooKeeper。run()方法永远在循环,在随机时间以随机值更新/config znode。
做为配置服务的用户,ConfigWatcher建立了一个ActiveKeyValueStore对象store,而且在启动以后经过displayConfig()调用了store的read()方法,显示它所读到的配置信息的初始值,并将自身做为观察传递给store。当节点状态发生变化时,再次经过displayConfig()显示配置信息,并再次将自身做为观察传递给store,参见代码1.2:
例1.2 该用应观察ZooKeeper中属性的更新状况,并将其打印到控制台

package org.zk; import java.io.IOException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; public class ConfigWatcher implements Watcher{ private ActiveKeyValueStore store; @Override public void process(WatchedEvent event) { if(event.getType()==EventType.NodeDataChanged){ try{ dispalyConfig(); }catch(InterruptedException e){ System.err.println("Interrupted. exiting. "); Thread.currentThread().interrupt(); }catch(KeeperException e){ System.out.printf("KeeperException锛?s. Exiting.\n", e); } } } public ConfigWatcher(String hosts) throws IOException, InterruptedException { store=new ActiveKeyValueStore(); store.connect(hosts); } public void dispalyConfig() throws KeeperException, InterruptedException{ String value=store.read(ConfigUpdater.PATH, this); System.out.printf("Read %s as %s\n",ConfigUpdater.PATH,value); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ConfigWatcher configWatcher = new ConfigWatcher(args[0]); configWatcher.dispalyConfig(); //stay alive until process is killed or Thread is interrupted Thread.sleep(Long.MAX_VALUE); } }
当ConfigUpdater更新znode时,ZooKeeper产生一个类型为EventType.NodeDataChanged的事件,从而触发观察。ConfigWatcher在它的process()方法中对这个事件作出反应,读取并显示配置的最新版本。因为观察仅发送单次信号,所以每次咱们调用ActiveKeyValueStore的read()方法时,都将一个新的观察告知ZooKeeper来确保咱们能够看到未来的更新。可是,咱们仍是不能保证接收到每个更新,由于在收到观察事件通知与下一次读之间,znode可能已经被更新过,并且多是不少次,因为客户端在这段时间没有注册任何观察,所以不会收到通知。对于示例中的配置服务,这不是问题,由于客户端只关心属性的最新值,最新值优先于以前的值。可是,通常状况下,这个潜在的问题是不容忽视的。
让咱们看看如何使用这个程序。在一个终端窗口中运行ConfigUpdater,而后在另外一个客户端运行ConfigWatcher,咱们能够预先分别在两个客户端输入命令,先不按回车,等两个客户端的命令输入好后,先在运行ConfigUpdater的客户端按回车,再在另外一个客户端按回车,运行结果以下:
2、可恢复的ZooKeeper应用
关于分布式计算的第一个误区是“网络是可靠的”。按照他们的观点,程序老是有一个可靠的网络,所以当程序运行在真正的网络中时,每每会出现各类备样的故障。让咱们看看各类可能的故障模式,以及可以解决故障的措施,使咱们的程序在面对故障时可以及时复原。
2.1 ZooKeeper异常
在Java API中的每个ZooKeeper操做都在其throws子句中声明了两种类型的异常,分别是InterruptedException和KeeperException。
(一)InterruptedException异常
若是操做被中断,则会有一个InterruptedException异常被抛出。在Java语言中有一个取消阻塞方法的标准机制,即针对存在阻塞方法的线程调用interrupt()。一个成功的取消操做将产生一个InterruptedException异常。
ZooKeeper也遵循这一机制,所以你可使用这种方法来取消一个ZooKeeper操做。使用了ZooKeeper的类或库一般会传播InterruptedException异常,使客户端可以取消它们的操做。InterruptedException异常并不意味着有故障,而是代表相应的操做已经被取消,因此在配置服务的示例中,能够经过传播异常来停止应用程序的运行。
(二)KeeperException异常
(1) 若是ZooKeeper服务器发出一个错误信号或与服务器存在通讯问题,抛出的则是KeeperException异常。
①针对不一样的错误状况,KeeperException异常存在不一样的子类。
例如: KeeperException.NoNodeException是KeeperException的一个子类,若是你试图针对一个不存在的znode执行操做,抛出的则是该异常。
②每个KeeperException异常的子类都对应一个关于错误类型信息的代码。
例如: KeeperException.NoNodeException异常的代码是KeeperException.Code.NONODE
(2) 有两种方法被用来处理KeeperException异常:
①捕捉KeeperException异常,而且经过检测它的代码来决定采起何种补救措施;
②另外一种是捕捉等价的KeeperException子类,而且在每段捕捉代码中执行相应的操做。
(3) KeeperException异常分为三大类
① 状态异常
当一个操做因不能被应用于znode树而致使失败时,就会出现状态异常。状态异常产生的缘由一般是在同一时间有另一个进程正在修改znode。例如,若是一个znode先被另一个进程更新了,根据版本号执行setData操做的进程就会失败,并收到一个KeeperException.BadVersionException异常,这是由于版本号不匹配。程序员一般都知道这种冲突老是存在的,也都会编写代码来进行处理。
一些状态异常会指出程序中的错误,例如KeeperException.NoChildrenForEphemeralsException异常,试图在短暂znode下建立子节点时就会抛出该异常。
② 可恢复异常
可恢复的异常是指那些应用程序可以在同一个ZooKeeper会话中恢复的异常。一个可恢复的异常是经过KeeperException.ConnectionLossException来表示的,它意味着已经丢失了与ZooKeeper的链接。ZooKeeper会尝试从新链接,而且在大多数状况下从新链接会成功,并确保会话是完整的。
可是ZooKeeper不能判断与KeeperException.ConnectionLossException异常相关的操做是否成功执行。这种状况就是部分失败的一个例子。这时程序员有责任来解决这种不肯定性,而且根据应用的状况来采起适当的操做。在这一点上,就须要对“幂等”(idempotent)操做和“非幂等”(Nonidempotent)操做进行区分。幂等操做是指那些一次或屡次执行都会产生相同结果的操做,例如读请求或无条件执行的setData操做。对于幂等操做,只须要简单地进行重试便可。对于非幂等操做,就不能盲目地进行重试,由于它们屡次执行的结果与一次执行是彻底不一样的。程序能够经过在znode的路径和它的数据中编码信息来检测是否非幂等操怍的更新已经完成。
③不可恢复的异常
在某些状况下,ZooKeeper会话会失效——也许由于超时或由于会话被关闭,两种状况下都会收到KeeperException.SessionExpiredException异常,或由于身份验证失败,KeeperException.AuthFailedException异常。不管上述哪一种状况,全部与会话相关联的短暂znode都将丢失,所以应用程序须要在从新链接到ZooKeeper以前重建它的状态。
2.2 可靠地服务配置
首先咱们先回顾一下ActivityKeyValueStore的write()的方法,他由一个exists操做紧跟着一个create操做或setData操做组成:

public class ActiveKeyValueStore extends ConnectionWatcher { private static final Charset CHARSET=Charset.forName("UTF-8"); public void write(String path,String value) throws KeeperException, InterruptedException { Stat stat = zk.exists(path, false); if(stat==null){ zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET),-1); } } public String read(String path,Watcher watch) throws KeeperException, InterruptedException{ byte[] data = zk.getData(path, watch, null); return new String(data,CHARSET); } }
做为一个总体,write()方法是一个“幂等”操做,因此咱们能够对他进行无条件重试。咱们新建一个类ChangedActiveKeyValueStore,代码以下:

package org.zk; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class ChangedActiveKeyValueStore extends ConnectionWatcher{ private static final Charset CHARSET=Charset.forName("UTF-8"); private static final int MAX_RETRIES = 5; private static final long RETRY_PERIOD_SECONDS = 5; public void write(String path,String value) throws InterruptedException, KeeperException{ int retries=0; while(true){ try { Stat stat = zk.exists(path, false); if(stat==null){ zk.create(path, value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET),stat.getVersion()); } } catch (KeeperException.SessionExpiredException e) { throw e; } catch (KeeperException e) { if(retries++==MAX_RETRIES){ throw e; } //sleep then retry TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS); } } } public String read(String path,Watcher watch) throws KeeperException, InterruptedException{ byte[] data = zk.getData(path, watch, null); return new String(data,CHARSET); } }
在该类中,对前面的write()进行了修改,该版本的wirte()可以循环执行重试。其中设置了重试的最大次数MAX_RETRIES和两次重试之间的间隔RETRY_PERIOD_SECONDS.
咱们再新建一个类ResilientConfigUpdater,该类对前面的ConfigUpdater进行了修改,代码以下:

package org.zk; import java.io.IOException; import java.nio.charset.Charset; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class ResilientConfigUpdater extends ConnectionWatcher{ public static final String PATH="/config"; private ChangedActiveKeyValueStore store; private Random random=new Random(); public ResilientConfigUpdater(String hosts) throws IOException, InterruptedException { store=new ChangedActiveKeyValueStore(); store.connect(hosts); } public void run() throws InterruptedException, KeeperException{ while(true){ String value=random.nextInt(100)+""; store.write(PATH,value); System.out.printf("Set %s to %s\n",PATH,value); TimeUnit.SECONDS.sleep(random.nextInt(10)); } } public static void main(String[] args) throws Exception { while(true){ try { ResilientConfigUpdater configUpdater = new ResilientConfigUpdater(args[0]); configUpdater.run(); }catch (KeeperException.SessionExpiredException e) { // start a new session }catch (KeeperException e) { // already retried ,so exit e.printStackTrace(); break; } } } }
在这段代码中没有对KeepException.SeeionExpiredException异常进行重试,由于一个会话过时时,ZooKeeper对象会进入CLOSED状态,此状态下它不能进行重试链接。咱们只能将这个异常简单抛出并让拥有着建立一个新实例,以重试整个write()方法。一个简单的建立新实例的方法是建立一个新的ResilientConfigUpdater用于恢复过时会话。
处理会话过时的另外一种方法是在观察中(在这个例子中应该是ConnectionWatcher)寻找类型为ExpiredKeepState,而后再找到的时候建立一个新链接。即便咱们收到KeeperException.SessionExpiredEception异常,这种方法仍是可让咱们在write()方法内不断重试,由于链接最终是可以从新创建的。无论咱们采用何种机制从过时会话中恢复,重要的是,这种不一样于链接丢失的故障类型,须要进行不一样的处理。
注意:实际上,这里忽略了另外一种故障模式。当ZooKeeper对象被建立时,他会尝试链接另外一个ZooKeeper服务器。若是链接失败或超时,那么他会尝试链接集合体中的另外一台服务器。若是在尝试集合体中的全部服务器以后仍然没法创建链接,它会抛出一个IOException异常。因为全部的ZooKeeper服务器都不可用的可能性很小,因此某些应用程序选择循环重试操做,直到ZooKeeper服务为止。
这仅仅是一种重试处理策略,还有许多其余处理策略,例如使用“指数返回”,每次将重试的间隔乘以一个常数。Hadoop内核中org.apache.hadoop.io.retry包是一组工具,用于能够重用的方式将重试逻辑加入代码,所以他对于构建ZooKeeper应用很是有用。
3、锁服务
3.1分布式锁概述
分布式锁在一组进程之间提供了一种互斥机制。在任什么时候刻,在任什么时候刻只有一个进程能够持有锁。分布式锁能够在大型分布式系统中实现领导者选举,在任什么时候间点,持有锁的那个进程就是系统的领导者。
注意:不要将ZooKeeper本身的领导者选举和使用了ZooKeeper基本操做实现的通常领导者选混为一谈。ZooKeeper本身的领导者选举机制是对外不公开的,咱们这里所描述的通常领导者选举服务则不一样,他是对那些须要与主进程保持一致的分布式系统所设计的。
(1) 为了使用ZooKeeper来实现分布式锁服务,咱们使用顺序znode来为那些竞争锁的进程强制排序。
思路很简单:
① 首先指定一个做为锁的znode,一般用它来描述被锁定的实体,称为/leader;
② 而后但愿得到锁的客户端建立一些短暂顺序znode,做为锁znode的子节点。
③ 在任什么时候间点,顺序号最小的客户端将持有锁。
例如,有两个客户端差很少同时建立znode,分别为/leader/lock-1和/leader/lock-2,那么建立/leader/lock-1的客户端将会持有锁,由于它的znode顺序号最小。ZooKeeper服务是顺序的仲裁者,由于它负责分配顺序号。
④ 经过删除znode /leader/lock-l便可简单地将锁释放;
⑤ 另外,若是客户端进程死亡,对应的短暂znode也会被删除。
⑥ 接下来,建立/leader/lock-2的客户端将持有锁,由于它顺序号紧跟前一个。
⑦ 经过建立一个关于znode删除的观察,可使客户端在得到锁时获得通知。
(2) 以下是申请获取锁的伪代码。
①在锁znode下建立一个名为lock-的短暂顺序znode,而且记住它的实际路径名(create操做的返回值)。
②查询锁znode的子节点而且设置一个观察。
③若是步骤l中所建立的znode在步骤2中所返回的全部子节点中具备最小的顺序号,则获取到锁。退出。
④等待步骤2中所设观察的通知而且转到步骤2。
3.2 当前问题与方案
3.2.1 羊群效应
(1) 问题
虽然这个算法是正确的,但仍是存在一些问题。第一个问题是这种实现会受到“羊群效应”(herd effect)的影响。考虑有成百上千客户端的状况,全部的客户端都在尝试得到锁,每一个客户端都会在锁znode上设置一个观察,用于捕捉子节点的变化。每次锁被释放或另一个进程开始申请获取锁的时候,观察都会被触发而且每一个客户端都会收到一个通知。 “羊群效应“就是指大量客户端收到同一事件的通知,但实际上只有不多一部分须要处理这一事件。在这种状况下,只有一个客户端会成功地获取锁,可是维护过程及向全部客户端发送观察事件会产生峰值流量,这会对ZooKeeper服务器形成压力。
(2) 方案解决方案
为了不出现羊群效应,咱们须要优化通知的条件。关键在于只有在前一个顺序号的子节点消失时才须要通知下一个客户端,而不是删除(或建立)任何子节点时都需要通知。在咱们的例子中,若是客户端建立了znode /leader/lock-一、/leader/lock-2和/leader/lock-3,那么只有当/leader/lock-2消失时才须要通知/leader/lock-3对照的客户端;/leader/lock-1消失或有新的znode /leader/lock-4加入时,不须要通知该客户端。
3.2.2 可恢复的异常
(1) 问题
这个申请锁的算法目前还存在另外一个问题,就是不能处理因链接丢失而致使的create操做失败。如前所述,在这种状况下,咱们不知道操做是成功仍是失败。因为建立一个顺序znode是非幂等操做,因此咱们不能简单地重试,由于若是第一次建立已经成功,重试会使咱们多出一个永远删不掉的孤儿zriode(至少到客户端会话结束前)。不幸的结果是将会出现死锁。
(2) 解决方案
问题在于,在从新链接以后客户端不可以判断它是否已经建立过子节点。解决方案是在znode的名称中嵌入一个ID,若是客户端出现链接丢失的状况,从新链接以后它即可以对锁节点的全部于节点进行检查,看看是否有子节点的名称中包含其ID。若是有一个子节点的名称包含其ID,它便知道建立操做已经成功,不须要再建立子节点。若是没有子节点的名称中包含其ID,则客户端能够安全地建立一个新的顺序子节点。
客户端会话的ID是一个长整数,而且在ZooKeeper服务中是惟一的,所以很是适合在链接丢失后用于识别客户端。能够经过调用Java ZooKeeper类的getSessionld()方法来得到会话的ID。
在建立短暂顺序znode时应当采用lock-<sessionld>-这样的命名方式,ZooKeeper在其尾部添加顺序号以后,znode的名称会形如lock-<sessionld>-<sequenceNumber>。因为顺序号对于父节点来讲是惟一的,但对于子节点名并不惟一,所以采用这样的命名方式能够诖子节点在保持建立顺序的同时可以肯定本身的建立者。
3.2.3 不可恢复的异常
若是一个客户端的ZooKeeper会话过时,那么它所建立的短暂znode将会被删除,已持有的锁会被释放,或是放弃了申请锁的位置。使用锁的应用程序应当意识到它已经再也不持有锁,应当清理它的状态,而后经过建立并尝试申请一个新的锁对象来从新启动。注意,这个过程是由应用程序控制的,而不是锁,由于锁是不能预知应用程序须要如何清理本身的状态。
4、ZooKeeper实现共享锁
实现正确地实现一个分布式锁是一件棘手的事,由于很难对全部类型的故障都进行正确的解释处理。ZooKeeper带有一个JavaWriteLock,客户端能够很方便地使用它。更多分布式数据结构和协议例如“屏障”(bafrier)、队列和两阶段提交协议。有趣的是它们都是同步协议,即便咱们使用异步ZooKeeper基本操做(如通知)来实现它们。使用ZooKeeper能够实现不少不一样的分布式数据结构和协议,ZooKeeper网站(http://hadoop.apache.org/zookeeper/)提供了一些用于实现分布式数据结构和协议的伪代码。ZooKeeper自己也带有一些棕准方法的实现,放在安装位置下的recipes目录中。
4.1 场景描述
你们也许都很熟悉了多个线程或者多个进程间的共享锁的实现方式了,可是在分布式场景中咱们会面临多个Server之间的锁的问题。
假设有这样一个场景:两台server :serverA,serverB须要在C机器上的/usr/local/a.txt文件上进行写操做,若是两台机器同时写该文件,那么该文件的最终结果可能会产生乱序等问题。最早能想到的是serverA在写文件前告诉ServerB “我要开始写文件了,你先别写”,等待收到ServerB的确认回复后ServerA开始写文件,写完文件后再通知ServerB“我已经写完了”。假设在咱们场景中有100台机器呢,中间任意一台机器通讯中断了又该如何处理?容错和性能问题呢?要能健壮,稳定,高可用并保持高性能,系统实现的复杂度比较高,从头开发这样的系统代价也很大。幸运的是,咱们有了基于googlechubby原理开发的开源的ZooKeeper系统。接下来本文将介绍两种ZooKeeper实现分布式共享锁的方法。
4.2 利用节点名称的惟一性来实现共享锁
ZooKeeper表面上的节点结构是一个和unix文件系统相似的小型的树状的目录结构,ZooKeeper机制规定:同一个目录下只能有一个惟一的文件名。
例如:咱们在Zookeeper目录/test目录下建立,两个客户端建立一个名为lock节点,只有一个可以成功。
(1) 算法思路:利用名称惟一性,加锁操做时,只须要全部客户端一块儿建立/Leader/lock节点,只有一个建立成功,成功者得到锁。解锁时,只需删除/test/Lock节点,其他客户端再次进入竞争建立节点,直到全部客户端都得到锁。
基于以上机制,利用节点名称惟一性机制的共享锁算法流程如图所示:
4.3 利用顺序节点实现共享锁
首先介绍一下,Zookeeper中有一种节点叫作顺序节点,故名思议,假如咱们在/lock/目录下建立节3个点,ZooKeeper集群会按照提起建立的顺序来建立节点,节点分别为/lock/0000000001、/lock/0000000002、/lock/0000000003。
ZooKeeper中还有一种名为临时节点的节点,临时节点由某个客户端建立,当客户端与ZooKeeper集群断开链接,。则该节点自动被删除。
算法思路:对于加锁操做,可让全部客户端都去/lock目录下建立临时、顺序节点,若是建立的客户端发现自身建立节点序列号是/lock/目录下最小的节点,则得到锁。不然,监视比本身建立节点的序列号小的节点(当前序列在本身前面一个的节点),进入等待。解锁操做,只须要将自身建立的节点删除便可。具体算法流程以下图所示:
4.4 ZooKeeper提供的一个写锁实现
按照ZooKeeper提供的分布式锁的伪代码,实现了一个分布式锁的简单测试代码以下:
(1)分布式锁,实现了Lock接口 DistributedLock.java

package com.concurrent; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2182","test"); lock.lock(); //do something... } catch (Exception e) { e.printStackTrace(); } finally { if(lock != null) lock.unlock(); } * @author xueliang * */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//竞争资源的标志 private String waitNode;//等待前一个锁 private String myZnode;//当前锁 private CountDownLatch latch;//计数器 private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>(); /** * 建立分布式锁,使用前请确认config配置的zookeeper服务可用 * @param config 127.0.0.1:2181 * @param lockName 竞争资源标志,lockName中不能包含单词lock */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 建立一个与服务器的链接 try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 建立根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //建立临时子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出全部子节点 List<String> subNodes = zk.getChildren(root, false); //取出全部lockName的锁 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //若是是最小的节点,则表示取得锁 return true; } //若是不是最小的节点,找到比本身小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判断比本身小一个数的节点是否存在,若是不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
(2)并发测试工具 ConcurrentTest.java

package com.concurrent; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** ConcurrentTask[] task = new ConcurrentTask[5]; for(int i=0;i<task.length;i++){ task[i] = new ConcurrentTask(){ public void run() { System.out.println("=============="); }}; } new ConcurrentTest(task); * @author xueliang * */ public class ConcurrentTest { private CountDownLatch startSignal = new CountDownLatch(1);//开始阀门 private CountDownLatch doneSignal = null;//结束阀门 private CopyOnWriteArrayList<Long> list = new CopyOnWriteArrayList<Long>(); private AtomicInteger err = new AtomicInteger();//原子递增 private ConcurrentTask[] task = null; public ConcurrentTest(ConcurrentTask... task){ this.task = task; if(task == null){ System.out.println("task can not null"); System.exit(1); } doneSignal = new CountDownLatch(task.length); start(); } /** * @param args * @throws ClassNotFoundException */ private void start(){ //建立线程,并将全部线程等待在阀门处 createThread(); //打开阀门 startSignal.countDown();//递减锁存器的计数,若是计数到达零,则释放全部等待的线程 try { doneSignal.await();//等待全部线程都执行完毕 } catch (InterruptedException e) { e.printStackTrace(); } //计算执行时间 getExeTime(); } /** * 初始化全部线程,并在阀门处等待 */ private void createThread() { long len = doneSignal.getCount(); for (int i = 0; i < len; i++) { final int j = i; new Thread(new Runnable(){ public void run() { try { startSignal.await();//使当前线程在锁存器倒计数至零以前一直等待 long start = System.currentTimeMillis(); task[j].run(); long end = (System.currentTimeMillis() - start); list.add(end); } catch (Exception e) { err.getAndIncrement();//至关于err++ } doneSignal.countDown(); } }).start(); } } /** * 计算平均响应时间 */ private void getExeTime() { int size = list.size(); List<Long> _list = new ArrayList<Long>(size); _list.addAll(list); Collections.sort(_list); long min = _list.get(0); long max = _list.get(size-1); long sum = 0L; for (Long t : _list) { sum += t; } long avg = sum/size; System.out.println("min: " + min); System.out.println("max: " + max); System.out.println("avg: " + avg); System.out.println("err: " + err.get()); } public interface ConcurrentTask { void run(); } }
(3)测试 ZkTest.java

package com.concurrent; import com.concurrent.ConcurrentTest.ConcurrentTask; public class ZkTest { public static void main(String[] args) { Runnable task1 = new Runnable(){ public void run() { DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2182","test1"); //lock = new DistributedLock("127.0.0.1:2182","test2"); lock.lock(); Thread.sleep(3000); System.out.println("===Thread " + Thread.currentThread().getId() + " running"); } catch (Exception e) { e.printStackTrace(); } finally { if(lock != null) lock.unlock(); } } }; new Thread(task1).start(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } ConcurrentTask[] tasks = new ConcurrentTask[60]; for(int i=0;i<tasks.length;i++){ ConcurrentTask task3 = new ConcurrentTask(){ public void run() { DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2183","test2"); lock.lock(); System.out.println("Thread " + Thread.currentThread().getId() + " running"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }; tasks[i] = task3; } new ConcurrentTest(tasks); } }
4.5 更多分布式数据结构和协议
5、BooKeeper
5.1 BooKeeper概述
BooKeeper具备副本功能,目的是提供可靠的日志记录。在BooKeeper中,服务器被称为帐本(Bookies),在帐本之中有不一样的帐户(Ledgers),每个帐户由一条条记录(Entry)组成。若是使用普通的磁盘存储日志数据,那么日志数据可能遭到破坏,当磁盘发生故障的时候,日志也可能被丢失。BooKeeper为每一份日志提供了分布式的存储,并采用了大多数(quorum,相对于全体)的概念。也就是说,只要集群中的大多数机器可用,那么该日志一直有效。
BooKeeper经过客户端进行操做,客户端能够对BooKeeper进行添加帐户、打开帐户、添加帐户记录、读取帐户记录等操做。另外,BooKeeper的服务依赖于ZooKeeper,能够说BooKeeper依赖于ZooKeeper的一致性及其分布式特色,在其之上提供另一种可靠性服务。BooKeeper的架构以下图所示:☆
5.2 BooKeeper角色
从上图中能够看出,BooKeeper中总共包含四类角色:
① 帐本:Bookies
② 帐户:Ledger
③ 客户端:Client
④ 元数据及存储服务:Metadata Storage Service
下面简单介绍这四类角色的功能:
(1) 帐本 BooKies
帐本是BooKeeper的存储服务器,他存储的是一个个的帐本,能够将帐本理解为一个个节点。在一个BooKeeper系统中存在多个帐本(节点),每一个帐户被不一样的帐本所存储。若要写一条记录到指定的帐户中,该记录将被写到维护该帐户全部账本节点中。为了提升系统的性能,这条记录并非真正的被写入到全部的节点中,而是选择集群的一个大多数集进行存储。该系统独有的特性,使得BooKeeper系统有良好的扩展性。即,咱们能够经过简单的添加机器节点的方法提升系统容量。☆☆
(2) 帐户 Ledger
帐户中存储的是一系列记录,每一条记录包含必定的字段。记录经过写操做一次性写入,只能进行附加操做不能进行修改。每条记录包含以下字段:
当知足下列两个条件时,某条记录才被认为是存储成功:
① 以前所记录的数据被帐本节点的大多数集所存储。
② 该记录被帐本节点的大多数集所存储。
(3) 客户端 BooKeeper Client
客户端一般与BooKeeper应用程序进行交互,它容许应用程序在系统上进行操做,包括建立帐户,写帐户等。
(4) 元数据存储服务 Metadata Storage Service
元数据信息存储在ZooKeeper集群当中,它存储关于帐户和帐本的信息。例如,帐本由集群中的哪些节点进行维护,帐户由哪一个帐本进行维护。应用程序在使用帐本的时候,首先要建立一个帐户。在建立帐户时,系统首先将该帐本的Metadata信息写入到ZooKeeper中。每个帐户在某一时刻只能有一个写实例(分布式锁)。在其余实例进行读操做以前首先须要将写实例关闭。若是写操做实例因为故障未能正常关闭,那么下一个尝试打开帐户的实例将须要首先对其进行恢复,并正确关闭写操做。在进行写操做的同时须要将最后一次的写记录存储到ZooKeeper中,所以恢复程序仅须要在ZooKeeper中查看该帐户所对应的最后一条写记录,而后将其正确的写入到帐户中,再在正确关闭写操做。在BooKeeper中该恢复程序有系统自动执行不须要用户参与。