提到Zookeeper,不得不先聊聊分布式协调技术html
1、什么是分布式协调技术java
分布式协调技术 主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止形成"脏数据"的后果。node
那么怎么对这些进程进行调度呢?算法
这时候咱们就须要一个协调器,来让他们有序的来访问这个资源。这个协调器就是咱们常常提到的那个锁。经过这个锁机制,咱们就能保证了分布式系统中多个进程可以有序的访问该临界资源。那么咱们把这个分布式环境下的这个锁叫做分布式锁。可是由于其运行所在的环境存在网络延迟等不可靠因素的,致使对数据的处理存在许多困难。目前处理分布式协调技术比较好的有Chubby(Google产品,收费)和Zookeeper(Apache产品,免费)。数据库
2、什么是Zookeeperapache
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。因为ZooKeeper的开源特性,后来咱们的开发者在分布式锁的基础上,摸索了出了其余的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。编程
ZooKeeper性能上的特色决定了它可以用在大型的、分布式的系统当中。从可靠性方面来讲,它并不会由于一个节点的错误而崩溃。除此以外,它严格的序列访问控制意味着复杂的控制原语能够应用在客户端上。ZooKeeper在一致性、可用性、容错性的保证,也是ZooKeeper的成功之处。vim
3、Zookeeper特性缓存
一、全局数据一致:每一个server保存一份相同的数据副本,client不管链接到哪一个server,展现的数据都是一致性的,当客户端操做一个节点的文件时,其余两个节点会随之更新,这样保证了全局数据的一致性。安全
二、可靠性:若是消息被其中一台服务接受,那么将被全部的服务器接受。
三、顺序性:包括全局有序和偏序两种:全局有序是指若是在一台服务器上消息a在消息b前发布,则在全部Server上消息a都将在消息b前被发布;偏序是指若是一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
四、数据更新原子性:一次数据更新要么成功,要么失败。
4、Zookeeper部署
一、下载相对应的jar包
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
二、安装到指定目录
tail -zxvf zookeeper-3.4.12.tar.gz
三、重命名配置文件
[oracle@bogon java]$ cd zookeeper-3.4.12/ [oracle@bogon zookeeper-3.4.12]$ cd conf [oracle@bogon conf]$ ll 总用量 16 -rw-rw-r--. 1 oracle oracle 535 3月 27 12:32 configuration.xsl -rw-rw-r--. 1 oracle oracle 2161 3月 27 12:32 log4j.properties -rw-rw-r--. 1 oracle oracle 922 3月 27 12:32 zoo_sample.cfg [oracle@bogon conf]$cp zoo_sample.cfg zoo.cfg
zookeeper默认读取zoo.cfg配置文件
其实到这单机版的zookeeper就算安装完毕了,若是为了方便管理,能够为其配置环境变量,这里不作演示
四、启动
[root@bogon bin]# ./zkServer.sh start #启动服务端 [root@bogon bin]# ./zkServer.sh status #查看状态 [root@bogon bin]# ./zkCli.sh #启动客户端
5、关于zookeeper数据模型
在zookeeper下,其文件存储相似于树同样具备层次结构,每一个文件节点被称之为Znode
一、每一个Znode具备原子性
二、每一个Znode存储数据大小具备限制
三、Znode经过路径引用,可是其路径必须是绝对的,所以他们必须由斜杠字符来开头
四、节点类型包含临时节点和永久节点。临时节点会话结束就自动删除,临时节点不容许拥有子节点。永久节点的生命周期不依赖与会话。
五、Znode具备序列化特性。随着其建立,其附带一个序列号。此序列号对于此及节点的父节点是惟一的,这样便记录了每一个子节点建立的前后顺序。
6、关于Zookeeper相关命令
若是直接输入zkCil.sh 他会自动匹配本机的zookeeper客户端,Zookeeper本质就是一个小型的文件存储系统。
启动通常语法:./zkCli.sh -timeout 0 -r -server ip:port
如:./zkCli.sh -timeout 3000 -server h1:2181,表示链接到主机h1 超时时间为3秒
一、查询
ls 语法:ls path [watch] 列出指定节点
[zk: localhost:2181(CONNECTED) 4] ls / #遍历根目录 [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 6] ls /test #遍历根目录下子节点test [test1]
stat 语法:stat path [watch]
列出指定节点的状态信息,或者说是元数据
[zk: localhost:2181(CONNECTED) 8] stat / cZxid = 0x0 #节点被建立时的事务ID ctime = Thu Jan 01 08:00:00 CST 1970 #节点被建立的时间 mZxid = 0x0 #最近一次更新的时的事务ID mtime = Thu Jan 01 08:00:00 CST 1970 #最近一次更新的时间 pZxid = 0x1a5 #该节点的子节点列表最近一次被修改的事务ID cversion = 99 #子节点的版本号 dataVersion = 0 #数据版本 aclVersion = 0 #ACL版本号 ephemeralOwner = 0x0 #建立临时节点的事务ID,若是是持久节点,则该节点为0x0 dataLength = 0 #当前节点的数据长度 numChildren = 3 #当前节点的子节点数目
get 语法:get path [watch]
列出指定节点的数据
[zk: localhost:2181(CONNECTED) 10] get /test
demo
ls2 语法:ls2 path [watch]
是ls的升级版,列出子节点的同时列出节点的状态信息
[zk: localhost:2181(CONNECTED) 11] ls2 /test [test1] cZxid = 0x1a5 ctime = Thu Jun 07 08:26:10 CST 2018 mZxid = 0x1a5 mtime = Thu Jun 07 08:26:10 CST 2018 pZxid = 0x1a6 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 1
history 查看历史
History命令能够查看先前执行过的全部的命令,它通常与redo配合使用
[zk: localhost:2181(CONNECTED) 5] history 0 - ls -l 1 - ls / 2 - delquota -n /testquota 3 - ls / 4 - listquota /testquota 5 - history
redo 从新执行先前命令,根据行数执行
[zk: localhost:2181(CONNECTED) 6] redo 1 #redo从新执行先前命令,根据行数执行 [testquota, zookeeper, hellozk]
二、建立
create 语法:create [-s] [-e] path data acl
其中:括号中时可选的,s表示建立永久节点,e表示建立临时节点,acl表示访问控制列表
[zk: localhost:2181(CONNECTED) 2] create /test demo Created /test [zk: localhost:2181(CONNECTED) 5] create /test/test1 demo1 Created /test/test1
三、修改
set 语法:set path data [version]
[zk: localhost:2181(CONNECTED) 12] ls / [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 13] set /test demo2 cZxid = 0x1a5 ctime = Thu Jun 07 08:26:10 CST 2018 mZxid = 0x1a7 mtime = Thu Jun 07 08:45:54 CST 2018 #修改时间与修改时间不一致 pZxid = 0x1a6 cversion = 1 dataVersion = 1 #数据版本号为1,表示被更改一次 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 1
四、删除
delete 语法:delete path [version]
只能删除不含子节点的节点
[zk: localhost:2181(CONNECTED) 12] ls / [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 14] delete /QQQs [zk: localhost:2181(CONNECTED) 15] ls / [zookeeper, test]
rmr 语法:rmr path
能递归删除节点
[zk: localhost:2181(CONNECTED) 15] ls / [zookeeper, test] [zk: localhost:2181(CONNECTED) 16] rmr /test [zk: localhost:2181(CONNECTED) 17] ls / [zookeeper]
五、增长限制
setquota 语法:setquota -n | -b val path
其中:n表示子节点的最大个数;b表示数据值的最大长度;val表示子节点最大个数或者数据值的最大长度;path表示节点路径
[zk: localhost:2181(CONNECTED) 30] create /testquota 123 #建立具备限制属 性节点 Created /testquota [zk: localhost:2181(CONNECTED) 35] setquota -n 3 /testquota #设置最大子节点为3 数为3 Comment: the parts are option -n val 3 path /testquota [zk: localhost:2181(CONNECTED) 36] listquota /testquota absolute path is /zookeeper/quota/testquota/zookeeper_limits Output quota for /testquota count=3,bytes=-1 #count:子节点最大数 byte:数据长度(-1表示没有限制) Output stat for /testquota count=1,bytes=3 #count:当前子节点数(包含本身,1表示当前他没有子节点)
注意:若是建立的节点数超过了限制数,建立过程当中不会报警告,可是日志中会显示超额
2018-05-05 17:03:56,025 [myid:3] - WARN [CommitProcessor:3:DataTree@302] - Quota exceeded: /testquota count=5 limit=3
Delquota
删除节点限制
[zk: localhost:2181(CONNECTED) 2] delquota -n /testquota #删除限制 [zk: localhost:2181(CONNECTED) 4] listquota /testquota absolute path is /zookeeper/quota/testquota/zookeeper_limits Output quota for /testquota count=-1,bytes=-1 #此时显示-1表示没有限制条件 Output stat for /testquota count=5,bytes=7
7、关于Zookeeper集群
Zookeeper集群搭建指的是Zookeeper分布式模拟安装。一般由2n+1台服务器组成。这是由于为了保证leader选举(基于Paxos算法的实现)可以获得多数的支持,因此Zookeeper集群的数量通常为奇数。
一、编辑相对应的配置文件
[root@bogon conf]# pwd /usr/java/zookeeper-3.4.12/conf [root@bogon conf]# vim zoo.cfg
二、配置相关数据
1) 指定数据存储目录
dataDir=/usr/data/zkdata #指定数据存储目录
2) 添加配置Zookeeper服务器的地址即编号
##(心跳端口、选举端口) server.1=jiaxianseng.host:2888:3888 server.2=jiaxianseng.host1:2888:3888 server.3=jiaxianseng.host2:2888:3888
三、建立数据存储目录,在虚拟机中建立相对应的编号
[root@localhost conf]# mkdir -p /usr/data/zkdata [root@localhost conf]#cd /usr/data/zkdata [root@localhost zkdata]#touch myid [root@localhost zkdata]# echo 1 >myid #此表示选举当前第一台主机为leader
四、分发到另外两台机器
$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host1:/usr/java/
$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host2:/usr/java/
五、分别开启zookeeper,此时第一台服务器当选为leader,其建立的节点另外两台能够收到
扩展:若是在本机上玩伪集群,须要注意:
1) 在当前文件夹下须要进行分包配置,设定为三台zookeeper机器
[oracle@localhost zookeeper-3.4.12]$ ll 总用量 12 drwxr-xr-x 10 root root 4096 5月 5 14:40 zk1 drwxr-xr-x 10 root root 4096 5月 5 14:42 zk2 drwxr-xr-x 10 root root 4096 5月 5 12:46 zk3
2) 配置文件设置
由于是在本机上玩,三台端口号应该不一样
clientPort=2181
8、Zookeeper监听机制
Zookeeper提供了分布式数据发布/订阅功能。它容许客户端向服务端注册一个Watcher监听。当服务端一些时间触发了这个Watcher,那么就会向指定客户端发送一个时间通知实现分布式的通知功能,其中节点的增删改均可以触发事件。
值得注意的是:Zookeeper监听机制严格按照先注册再监听顺序,且触发事件监听是一次性的。触发一次发送回调事件状况后,下次触发不会进行回调,须要从新注册监听。
具体实现以下:
1) 玩监听:前提是先得支持监听机制;其次是要注册监听,使用help命令进行查看
[zk: localhost:2181(CONNECTED) 7] help #使用help命令查看哪行命令支 持监听 ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] stat path [watch]
2) 先注册监听
[zk: localhost:2181(CONNECTED) 8] create /watchtest 123 #先注册监听 Created /watchtest
3) 查看是否被监听
[zk: localhost:2181(CONNECTED) 10] get /watchtest watch #此节点被监听
4) 利用第二台机器改变节点数据
[zk: localhost:2181(CONNECTED) 0] set /watchtest 456789
5) 查看第一台机器返回的信息
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
此时第一台会收到节点被改变的提示,可是当节点再次被其余机器改变时
第一台机器不会收到任何信息,说明监听只会被触发一次。若是想再次收到监听信息,只能从新注册监听
9、关于Zookeeper Java API
1) 建立maven工程,并导入约束
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
2) 编写测试类,链接zookeeper,并建立节点
public class TestZKClient { public static void main(String[] args) throws Exception{ //main快捷:psvm //构造JAVA zookeeper客户端 //参数:1.链接ip+端口(可配置多个,用分号隔开) 2.time链接超时时间:默认值为30000 ZooKeeper zk=new ZooKeeper("192.168.174.133:2181,192.168.174.133:2182",30000,new Watcher(){ //这里就是事件通知的回调方法 public void process(WatchedEvent event) { System.out.println("事件通知类型"+event.getState()); System.out.println("事件发生类型"+event.getType()); System.out.println("事件发生路径"+event.getPath()); } } ); /** * 参数1:节点名 ;参数2:数据 ; 参数3:acl权限控制,这里采用默认值 ;参数4:建立节点类型:这里是持久序列化节点 */ zk.create("/myCirls","性感的".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); zk.close(); } }
10、关于Zookeper分布式锁应用
分布式锁,这个主要得益于Zookeeper保证了数据的强一致性。锁服务能够分为两类,一个保持独占,另外一个是控制时序。
应用:在分布式环境高并发场景下,生产有必定业务含义的惟一的订单编号
一、编写服务类:
/** * 订单编号服务 * @author Administrator * */ public class OrderCodeGenerator { //自增加序列 private static int i=0; //按照“年-月-日-小时-分钟-秒-自增加序列”的规则生成订单编号 public String getOrderCode(){ Date date=new Date(); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"); return sdf.format(date)+ ++i; } public static void main(String[] args) { OrderCodeGenerator ong=new OrderCodeGenerator(); for(int i=0;i<10;i++){ System.out.println(ong.getOrderCode()); } } }
二、编写接口
/** * 定义订单服务接口 * @author Administrator * */ public interface OrderService { void createOrder(); }
三、定义实现类
public class OrderServiceImpl implements OrderService{ //定义为静态变量,保证订单号惟一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //建立订单接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; //获取订单号 orderCode=ocg.getOrderCode(); System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
四、定义线程模拟多线程建立订单
public class DistrbutDemo { public static void main(String[] args) { //模拟多个并发建立订单 //并发数 int currs=10; //方式一:定义一个循环屏障:用于祖塞当前线程的,设置的参数为参与线程数,能保证设置每一个线程统一完成每段步骤, //如线程一完成后进入等待,接着线程二完成后进入,等待其他线程完成,才能进入下一步 final CyclicBarrier cb=new CyclicBarrier(currs); /*方式二:定义一个倒计数储存器:用于等待n个条件到达,每一个线程完成数值减一 CountDownLatch cdl=new CountDownLatch(currs); cdl.countDown(); cdl.await();*/ for(int i=0;i<currs;i++){ new Thread(new Runnable() { @Override public void run() { //模拟建立订单 OrderService os=new OrderServiceImpl(); //穿插线程阻塞 try { cb.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } os.createOrder(); } }).start(); } } }
此时运行代码,可能会出现没有10这个订单号(建立了相同的订单,说明多线程编程下出现了不安全的状况,此时应该加锁)
打印结果以下:
Thread-5========2018-06-06-10-30-25-1 Thread-4========2018-06-06-10-30-25-6 Thread-3========2018-06-06-10-30-25-2 Thread-6========2018-06-06-10-30-25-8 Thread-0========2018-06-06-10-30-25-5 Thread-7========2018-06-06-10-30-25-4 Thread-8========2018-06-06-10-30-25-3 Thread-9========2018-06-06-10-30-25-9 Thread-1========2018-06-06-10-30-25-2 Thread-2========2018-06-06-10-30-25-7
五、方式一,加同步锁
public class OrderServiceImpl implements OrderService{ //定义为静态变量,保证订单号惟一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //建立订单接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; //加同步锁:保证线程安全 synchronized (ocg) { //获取订单号 orderCode=ocg.getOrderCode(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
注意:这里同步锁内参数不能使用this,由于在DistrbutDemo类中每次调用线程都从新new了一次,锁的不是同一个对象,其不带加锁的目的(获取的不是同一个对象的锁),而用ocg是由于它是静态变量,获取的是同一个对象的锁
方式二:加锁lock
public class OrderServiceImpl implements OrderService{ //定义为静态变量,保证订单号惟一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock必定设置为静态变量,保证每一个线程竞争的是同一把锁 private static Lock lock=new ReentrantLock(); //建立订单接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出现异常后锁不能释放,因此加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //记得释放锁 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
此时运行代码,不会出现重复的订单,是个完整的建立了10个订单
分析:以上代码确实可以保证线程安全,可是此只能在单机下玩,若是将服务放在多台机器上调用,这里须要引入分布式锁。
六、分布式锁实现的技术
基于数据库实现分布式锁:
性能较差,容易出现单点故障
锁没有失效时间,容易死锁
非阻塞式的
不可重入
基于缓存实现分布式锁
锁没有失效时间,容易死锁
非阻塞式的
不可重入
基于Zookeeper实现分布式锁
实现相对简单
可靠性高
性能较差
具体实现以下:
七、引入zk客户端依赖
<!--对zookeeper客户端进行了封装 -->
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
八、编写序列化类
public class MyZkSerializer implements ZkSerializer{ String charset="UTF-8"; //反序列化 @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { // TODO Auto-generated method stub try { return new String(bytes,charset); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block throw new ZkMarshallingError(e); } } //序列化 @Override public byte[] serialize(Object obj) throws ZkMarshallingError { // TODO Auto-generated method stub try { return String.valueOf(obj).getBytes(charset); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block throw new ZkMarshallingError(e); } } }
九、编写zk链接监听程序
public class ZkWatcherDemo { public static void main(String[] args) { ZkClient client=new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); client.subscribeDataChanges("/testWatch", new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----监听到节点被删除.."); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----监听到数据变为:"+data); } }); try { //为了方便查看,等待2分钟 Thread.sleep(2 * 60 * 1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
此时若是更改testWatch值,控制台会打印监听的值
----监听到数据变为:55
十、Zookeeper分布式锁实现:
1) 定义分布式锁:
//定义分布式锁 public class ZKDistributeLock implements Lock{ /** * 利用zookeeper的同父子节点不可重名的特色来实现分布式锁 * 加锁的规则:去建立指定名称的节点,若是能建立成功,则得到锁(加锁成功),若是节点已存在,就标识锁被别人获取了 * 你就得阻塞,等待 * 锁释放规则:删除指定名称的节点 */ private String LockPath; private ZkClient client; public ZKDistributeLock(String lockPath) { super(); LockPath = lockPath; client=new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); } @Override public boolean tryLock() { try{ this.client.createPersistent(LockPath);//建立永久节点 }catch(ZkNodeExistsException e){ return false; } return true; } @Override public void lock() { if(!tryLock()){ //阻塞等待 waitForLock(); //再次尝试加锁 lock(); } } private void waitForLock() { //怎么让本身阻塞 final CountDownLatch cdl=new CountDownLatch(1); //注册watch,好通知本身何时被唤醒 IZkDataListener listener=new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----监听到节点被删除.."); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----监听到数据变为:"+data); } }; //注册该事件 client.subscribeDataChanges(LockPath, listener); //这里得判断节点是否存在,不然会永久阻塞 if(this.client.exists(LockPath)){ try { cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } try{ cdl.await(); }catch(InterruptedException e){ e.printStackTrace(); } client.unsubscribeDataChanges(LockPath, listener); } @Override public void unlock() { //删除节点 //this.client.deleteRecursive(arg0)表示递归删除 this.client.delete(this.LockPath); } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
2) 调用分布式锁
public class OrderServiceImpl implements OrderService{ //定义为静态变量,保证订单号惟一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock必定设置为静态变量,保证每一个线程竞争的是同一把锁 private static Lock lock=new ZKDistributeLock("/testW"); //建立订单接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出现异常后锁不能释放,因此加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //记得释放锁 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
打印结果以下:
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread). log4j:WARN Please initialize the log4j system properly. log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Thread-4========2018-06-07-10-52-29-1 ----监听到节点被删除.. ----监听到数据变为:null Thread-7========2018-06-07-10-52-29-2 ----监听到节点被删除.. ----监听到数据变为:null Thread-9========2018-06-07-10-52-29-3 ----监听到节点被删除.. ----监听到数据变为:null Thread-1========2018-06-07-10-52-29-4 ----监听到节点被删除.. ----监听到数据变为:null Thread-3========2018-06-07-10-52-29-5 ----监听到节点被删除.. ----监听到数据变为:null Thread-8========2018-06-07-10-52-29-6 ----监听到节点被删除.. ----监听到节点被删除.. ----监听到数据变为:null Thread-0========2018-06-07-10-52-29-7 ----监听到节点被删除.. ----监听到数据变为:null Thread-2========2018-06-07-10-52-29-8 ----监听到节点被删除.. ----监听到数据变为:null Thread-6========2018-06-07-10-52-29-9 ----监听到节点被删除.. ----监听到数据变为:null Thread-5========2018-06-07-10-52-29-10 ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除.. ----监听到节点被删除..
弊端问题:惊群效应
1、每次唤醒操做将全部线程唤醒了,其中一个抢到锁后,其余没抢到的又将进入阻塞等待状态
二、客户端无端接受了不少与本身无关的事件通知
三、利用持久节点建立的锁存在死锁的可能性!(当加锁后忽然业务重启,释放锁未执行)
在集群规模较大的环境中带来的危害:
一、巨大的服务器性能损耗 二、网络冲击 三、可能形成宕机
如打出不少----监听到节点被删除..这句话
解决:采用建立顺序节点
改进建立的是临时顺序节点,每次都是最小的节点得到锁,其下一个节点(更小)注册watcher
重写分布式锁(升级版)
//定义分布式锁 public class ZKDistributeImproveLock implements Lock { /** * 利用zookeeper的同父子节点不可重名的特色来实现分布式锁 * 加锁的规则:去建立指定名称的节点,若是能建立成功,则得到锁(加锁成功),若是节点已存在,就标识锁被别人获取了 你就得阻塞,等待 * 锁释放规则:删除指定名称的节点 */ private String LockPath; private ZkClient client; // 须要将路径设置为ThreadLocal类型,不然不能被线程并发去使用 private ThreadLocal<String> currentPath = new ThreadLocal<String>(); private ThreadLocal<String> beforePath = new ThreadLocal<String>(); public ZKDistributeImproveLock(String lockPath) { super(); LockPath = lockPath; client = new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); if (!this.client.exists(LockPath)) { this.client.createPersistent(LockPath);// 先建立永久节点 } } @Override public boolean tryLock() { if (this.currentPath.get() == null) { // 注意:加锁时建立的是临时顺序节点 currentPath.set(this.client.createEphemeralSequential(LockPath + "/", "aaa")); } // 得到全部的子节点 List<String> children = this.client.getChildren(LockPath); // 进行下排序 Collections.sort(children); // 判断当前节点是不是最小的 if (currentPath.get().equals(LockPath + "/" + children.get(0))) { return true; } else { // 获取前一个节点 // 获得字节的索引号 int curIndex = children.indexOf(currentPath.get().substring( LockPath.length() + 1)); beforePath.set(LockPath + "/" + children.get(curIndex - 1)); } return false; } @Override public void lock() { if (!tryLock()) { // 阻塞等待 waitForLock(); // 再次尝试加锁 lock(); } } private void waitForLock() { // 怎么让本身阻塞 final CountDownLatch cdl = new CountDownLatch(1); // 注册watch,好通知本身何时被唤醒 IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----监听到节点被删除.."); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----监听到数据变为:" + data); } }; // 注册该事件 client.subscribeDataChanges(this.beforePath.get(), listener); // 这里得判断节点是否存在,不然会永久阻塞 if (this.client.exists(this.beforePath.get())) { try { cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } client.unsubscribeDataChanges(this.beforePath.get(), listener); } @Override public void unlock() { // 删除节点 // this.client.deleteRecursive(arg0)表示递归删除 this.client.delete(this.currentPath.get()); } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
调用:
public class OrderServiceImpl implements OrderService{ //定义为静态变量,保证订单号惟一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock必定设置为静态变量,保证每一个线程竞争的是同一把锁 private static Lock lock=new ZKDistributeImproveLock("/QQQs"); //建立订单接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出现异常后锁不能释放,因此加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //记得释放锁 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
打印结果以下:
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread). log4j:WARN Please initialize the log4j system properly. log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Thread-4========2018-06-07-10-53-36-1 ----监听到节点被删除.. Thread-9========2018-06-07-10-53-36-2 ----监听到节点被删除.. Thread-0========2018-06-07-10-53-36-3 ----监听到节点被删除.. Thread-1========2018-06-07-10-53-36-4 ----监听到节点被删除.. Thread-6========2018-06-07-10-53-36-5 ----监听到节点被删除.. Thread-7========2018-06-07-10-53-36-6 ----监听到节点被删除.. Thread-5========2018-06-07-10-53-36-7 ----监听到节点被删除.. Thread-2========2018-06-07-10-53-36-8 ----监听到节点被删除.. Thread-3========2018-06-07-10-53-36-9 ----监听到节点被删除.. Thread-8========2018-06-07-10-53-37-10