zookeeper的开发应用

1. zookeeper

1.1. 基础

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序能够基于它实现同步服务,配置维护和命名服务等。不少中间件,好比kafka、hadoop、hbase,都用到了 Zookeeper来构建集群。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,因为工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,所以须要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。java

数据结构

理解ZooKeeper的一种方法是将其看作一个具备高可用性的文件系统。但这个文件系统中没有文件和目录,而是统一使用节点(node)的概念,称为znode。znode既能够保存数据(如同文件),也能够保存其余znode(如同目录)。全部的znode构成一个层次化的数据结构,。node

  • Persistent Nodes: 永久有效地节点,除非client显式的删除,不然一直存在
  • Ephemeral Nodes: 临时节点,仅在建立该节点client保持链接期间有效,一旦链接丢失,zookeeper会自动删除该节点
  • Sequence Nodes: 顺序节点,client申请建立该节点时,zk会自动在节点路径末尾添加递增序号,这种类型是实现分布式锁,分布式queue等特殊功能的关键
集群

生产环境的zookeeper服务,通常都是以集群的方式搭建环境。在zookeeper集群中,不一样节点通常有下面几种角色:算法

  1. Leader:事务请求的惟一调度者和处理者。保证集群事务处理的顺序性。集群内部各服务器的调度者。
  2. Follower:处理客户端非事务请求,转发事务请求给Leader。参与事务请求Proposal的投票。参与Leader选举投票。
  3. Observer:处理非事务请求,将事务请求交给Leader处理。

这里面提到了“事务请求”和“非事务请求”,这里说明一下。 事务请求能够理解成数据库中包含commit操做的请求,例如:新增、修改和删除。而非事务请求则对应那些查询的请求。spring

可见在zookeeper集群中,真正决策的只有一个Leader节点,全部的事务请求到达其余节点后,都仍是会被转发到Leader节点来处理。这种模式,保障了zookeeper在命令决策端的原子性。docker

Leader选举算法采用了Paxos协议,当多数Server写成功,则任务数据写成功若是有3个Server,则两个写成功便可;若是有4或5个Server,则三个写成功便可。Server数目通常为奇数(三、五、7)若是有3个Server,则最多容许1个Server挂掉;若是有4个Server,则一样最多容许1个Server挂掉由此,咱们看出3台服务器和4台服务器的的容灾能力是同样的,因此为了节省服务器资源,通常咱们采用奇数个数,做为服务器部署个数。shell

原子广播

zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫作Zab协议。Zab协议有两种模式,它们分别是恢复模式广播模式数据库

当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和leader的状态同步之后,恢复模式就结束了。状态同步保证了leader和server具备相同的系统状态。一旦leader已经和多数的follower进行了状态同步后,他就能够开始广播消息了,即进入广播状态。apache

数据一致性与paxos算法

在一个分布式数据库系统中,若是各节点的初始状态一致,每一个节点都执行相同的操做序列,那么他们最后能获得一个一致的状态。编程

Paxos算法经过投票来对写操做进行全局编号,同一时刻,只有一个写操做被批准,同时并发的写操做要去争取选票,只有得到过半数选票的写操做才会被批准(因此永远只会有一个写操做获得批准),其余的写操做竞争失败只好再发起一轮投票,就这样,在日复一日年复一年的投票中,全部写操做都被严格编号排序。bash

编号严格递增,当一个节点接受了一个编号为100的写操做,以后又接受到编号为99的写操做(由于网络延迟等不少不可预见缘由),它立刻能意识到本身数据不一致了,自动中止对外服务并重启同步过程。任何一个节点挂掉都不会影响整个集群的数据一致性(总2n+1台,除非挂掉大于n台)。

新手不要混淆,在这里的投票选举是针对多个客户端有并发写操做时,争夺该惟一写操做权的选举。和前面说的zookeeper集群中,投票选举master是不一样的概念。虽然它们的选举协议,都是遵循paxos算法。

脑裂和过半选举

脑裂:对于一个集群,想要提升这个集群的可用性,一般会采用多机房部署,好比如今有一个由6台节点所组成的一个集群,部署在了两个机房。正常状况下,此集群只会有一个Leader,那么若是机房之间的网络断了以后,每一个机房都选举本身的Leader,这就至关于本来一个集群,被分红了两个集群,出现了两个“大脑”,这就是脑裂。若是过了一会,断了的网络忽然联通了,那么此时就会出现问题了,两个集群刚刚都对外提供服务了,数据该怎么合并,数据冲突怎么解决等等问题。
过半选举:在领导者选举的过程当中,若是某台zkServer得到了超过半数的选票,则此zkServer就能够成为Leader了。

由于zookeeper的过半选举,所以zookeeper不存在“脑裂”的状况。例如仍是6个节点分布在两个机房,只有当某个节点得到4个节点以上的选票,才能升级为Leader,所以不会出现两个Leader的状况。

1.2. 安装使用

docker 安装

这里直接使用zookeeper官方镜像来安装。

docker run -d \
 --name zookeeper \
 --restart=on-failure:3 \
 -p 2181:2181 \
 -v /Volumes/zookeeper/data/:/data/ \
 zookeeper
启动

执行 docker exec -it 编号 /bin/bash ,进入容器内部。

而后在 /bin 目录,执行 ./zkCli.sh ,运行启动脚本。

zooInspector客户端

zooInspector是zookeeper图形化的客户端工具,可用来查看内部数据状况。

可下载 ZooInspector.zip,解压后在build目录下获取 zookeeper-dev-ZooInspector.jar。经过 java -jar zookeeper-dev-ZooInspector.jar,便可启动 ZooInspector 图形化客户端。

2. CAP对比

2.1. CAP理论

CAP理论告诉咱们,一个分布式系统不可能同时知足如下三种:

  • 一致性(C:Consistency)
  • 可用性(A:Available)
  • 分区容错性(P:Partition Tolerance)

这三个基本需求,最多只能同时知足其中的两项,由于P是必须的,所以每每选择就在CP或者AP中。

一致性(C:Consistency)

在分布式环境中,一致性是指数据在多个副本之间是否可以保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操做后,应该保证系统的数据仍然处于一致的状态。例如一个将数据副本分布在不一样分布式节点上的系统来讲,若是对第一个节点的数据进行了更新操做而且更新成功后,其余节点上的数据也应该获得更新,而且全部用户均可以读取到其最新的值,那么这样的系统就被认为具备强一致性(或严格的一致性,最终一致性)。

可用性(A:Available)

可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每个操做请求老是可以在有限的时间内返回结果。“有效的时间内”是指,对于用户的一个操做请求,系统必须可以在指定的时间(即响应时间)内返回对应的处理结果,若是超过了这个时间范围,那么系统就被认为是不可用的。

分区容错性(P:Partition Tolerance)

分区容错性约束了一个分布式系统须要具备以下特性:分布式系统在遇到任何网络分区故障的时候,仍然须要可以保证对外提供知足一致性和可用性的服务,除非是整个网络环境都发生了故障。

网络分区是指在分布式系统中,不一样的节点分布在不一样的子网络(机房或异地网络等)中,因为一些特殊的缘由致使这些子网络之间出现网络不连通的情况,但各个子网络的内部网络是正常的,从而致使整个系统的网络环境被切分红了若干个孤立的区域。须要注意的是,组成一个分布式系统的每一个节点的加入与退出均可以看做是一个特殊的网络分区。

2.2. zookeeper和eureka对比

eureka保证ap

eureka优先保证可用性。在Eureka平台中,若是某台服务器宕机,Eureka不会有相似于ZooKeeper的选举leader的过程;客户端请求会自动切换 到新的Eureka节点;当宕机的服务器从新恢复后,Eureka会再次将其归入到服务器集群管理之中;而对于它来讲,全部要作的无非是同步一些新的服务 注册信息而已。

Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工做,剩余的节点依然能够提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时若是发现链接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

zookeeper保证cp

做为一个分布式协同服务,zookeeper是优先保证一致性的。

进行leader选举时集群都是不可用。在使用ZooKeeper获取服务列表时,当master节点由于网络故障与其余节点失去联系时,剩余节点会从新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就致使在选举期间注册服务瘫痪,虽然服务可以最终恢复,可是漫长的选举时间致使的注册长期不可用是不能容忍的。因此说,ZooKeeper不能保证服务可用性。

3. 应用场景

3.1. 数据发布与订阅(配置中心)

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就很是适合使用。

  • 应用中用到的一些配置信息放到ZK上进行集中管理。这类场景一般是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,之后每次配置有更新的时候,都会实时通知到订阅的客户端,历来达到获取最新配置信息的目的。
  • 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。
  • 分布式日志收集系统。这个系统的核心工做是收集分布在不一样机器的日志。收集器一般是按照应用来分配收集任务单元,所以须要在ZK上建立一个以应用名做为path的节点P,并将这个应用的全部机器ip,以子节点的形式注册到节点P上,这样一来就可以实现机器变更的时候,可以实时通知到收集器调整任务分配。
  • 系统中有些信息须要动态获取,而且还会存在人工手动去修改这个信息的发问。一般是暴露出接口,例如JMX接口,来获取一些运行时的信息。引入ZK以后,就不用本身实现一套方案了,只要将这些信息存放到指定的ZK节点上便可。

注意:在上面提到的应用场景中,有个默认前提是:数据量很小,可是数据更新可能会比较快的场景。

3.2. 负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,一般同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就需要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是经过zookeeper来作到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡

metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,所以metaq在运行过程当中,会把全部broker和对应的分区信息所有注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在经过ZK获取分区列表以后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头至尾循环往复的方式选择一个分区来发送消息。

消费负载均衡

在消费过程当中,一个消费者会消费一个或多个分区中的消息,可是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

  • 每一个分区针对同一个group只挂载一个消费者。
  • 若是同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  • 若是同一个group的消费者数目小于分区数目,则有部分消费者须要额外承担消费任务。

在某个消费者故障或者重启等状况下,其余消费者会感知到这一变化(经过 zookeeper watch消费者列表),而后从新进行负载均衡,保证全部的分区都有消费者进行消费。

3.3. 命名服务

zookeeper的命名服务有两个应用方向,一个是提供相似JNDI的功能,利用zookeepeer的树型分层结构,能够把系统中各类服务的名称、地址以及目录信息存放在zookeeper,须要的时候去zookeeper中读取。

另外一个,是利用zookeeper顺序节点的特性,制做分布式的ID生成器,写过数据库应用的朋友都知道,咱们在往数据库表中插入记录时,一般须要为该记录建立惟一的ID,在单机环境中咱们能够利用数据库的主键自增功能。但在分布式环境则没法使用,有一种方式可使用UUID,可是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,咱们能够生成有顺序的,容易理解的,同时支持分布式环境的序列号。

3.4. 分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,可以很好的实现分布式环境下不一样系统之间的通知与协调,实现对数据变动的实时处理。使用方法一般是不一样系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode自己内容及子节点的),其中一个系统update了znode,那么另外一个系统可以收到通知,并做出相应处理

  • 另外一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是经过zk上某个节点关联,大大减小系统耦合。
  • 另外一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工做。管理人员在控制台做的一些操做,其实是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,因而,做出相应的推送任务。
  • 另外一种工做汇报模式:一些相似于任务分发系统,子任务启动后,到zk来注册一个临时节点,而且定时将本身的进度进行汇报(将进度写回这个临时节点),这样任务管理者就可以实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调可以大大下降系统之间的耦合。

3.5. 集群管理与Master选举

集群机器监控

这一般用于那种对集群中机器状态,机器在线率有较高要求的场景,可以快速对集群中机器变化做出响应。这样的场景中,每每有一个监控系统,实时检测集群机器是否存活。过去的作法一般是:监控系统经过某种手段(好比ping)定时检测每一个机器,或者每一个机器本身定时向监控系统汇报“我还活着”。 这种作法可行,可是存在两个比较明显的问题:

  1. 集群中机器有变更的时候,牵连修改的东西比较多。
  2. 有必定的延时。

利用ZooKeeper有两个特性,就能够实时另外一种集群机器存活性监控系统:

  1. 客户端在节点 x 上注册一个Watcher,那么若是 x?的子节点变化了,会通知该客户端。
  2. 建立EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过时,那么该节点就会消失。

例如,监控系统在 /clusterServers 节点上注册一个Watcher,之后每动态加机器,那么就往 /clusterServers 下建立一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就可以实时知道机器的增减状况,至于后续处理就是监控系统的业务了。

Master选举

Master选举则是zookeeper中最为经典的应用场景了。
在分布式环境中,相同的业务应用分布在不一样的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),每每只须要让整个集群中的某一台机器进行执行,其他机器能够共享这个结果,这样能够大大减小重复劳动,提升性能,因而这个master选举即是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,可以保证在分布式高并发状况下节点建立的全局惟一性,即:同时有多个客户端请求建立 /currentMaster 节点,最终必定只有一个客户端请求可以建立成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到?EPHEMERAL_SEQUENTIAL类型节点的特性了。

上文中提到,全部客户端建立请求,最终只有一个可以建立成功。在这里稍微变化下,就是容许全部请求都可以建立成功,可是得有个建立顺序,因而全部的请求最终在ZK上建立结果的一种可能状况是这样: /currentMaster/{sessionId}-1 ,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器做为Master,若是这个机器挂了,因为他建立的节点会立刻小时,那么以后最小的那个机器就是Master了。

  • 在搜索系统中,若是集群中每一个机器都生成一份全量索引,不只耗时,并且不能保证彼此之间索引数据一致。所以让集群中的Master来进行全量索引的生成,而后同步到集群中其它机器。另外,Master选举的容灾措施是,能够随时进行手动指定master,就是说应用在zk在没法获取master信息时,能够经过好比http方式,向一个地方获取master。
  • 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把本身以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster能够随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会从新选举出一个HMaster来运行,从而避免了HMaster的单点问题

3.6. 分布式锁

保持独占

所谓保持独占,就是全部试图来获取这个锁的客户端,最终只有一个能够成功得到这把锁。一般的作法是把zk上的一个znode看做是一把锁,经过create znode的方式来实现。全部客户端都去建立 /distribute_lock 节点,最终成功建立的那个客户端也即拥有了这把锁。

控制时序

首先,Zookeeper的每个节点,都是一个自然的顺序发号器。在每个节点下面建立子节点时,只要选择的建立类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

好比,建立一个用于发号的节点“/test/lock”,而后以他为父亲节点,能够在这个父节点下面建立相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在建立子节点时,同时指明是有序类型。若是是第一个建立的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。

其次,Zookeeper节点的递增性,能够规定节点编号最小的那个得到锁。

一个zookeeper分布式锁,首先须要建立一个父节点,尽可能是持久节点(PERSISTENT类型),而后每一个要得到锁的线程都会在这个节点下建立个临时顺序节点,因为序号的递增性,能够规定排号最小的那个得到锁。因此,每一个线程在尝试占用锁以前,首先判断本身是排号是否是当前最小,若是是,则获取锁。

第三,Zookeeper的节点监听机制,能够保障占有锁的方式有序并且高效。

每一个线程抢占锁以前,先抢号建立本身的ZNode。一样,释放锁的时候,就须要删除抢号的Znode。抢号成功后,若是不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不须要其余人,只须要等前一个Znode 的通知就能够了。当前一个Znode 删除的时候,就是轮到了本身占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

Zookeeper这种首尾相接,后面监听前面的方式,能够避免羊群效应。所谓羊群效应就是每一个节点挂掉,全部节点都去监听,而后作出反映,这样会给服务器带来巨大压力,因此有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才作出反映。

3.7. 分布式队列

队列方面,简单地讲有两种。

一种是常规的先进先出队列,另外一种是要等到队列成员聚齐以后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里再也不赘述。

第二种队列实际上是在FIFO队列的基础上做了一个加强。一般能够在 /queue 这个znode下预先创建一个/queue/num 节点,而且赋值为n(或者直接给/queue赋值n),表示队列大小,以后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否能够开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,须要在不少子任务完成(或条件就绪)状况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下创建本身的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现本身下面的子节点知足指定个数,就能够进行下一步按序进行处理了。

4. 示例代码

4.1. curator

Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助咱们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经做为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来说,它提供了基于Fluent的编程风格支持。

除此以外,Curator还提供了Zookeeper的各类应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。

如今先让咱们看看Curator的几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁做为单个实体管理的容器

接下来看下面一个项目的示例,项目中分别体现了 选举分布式锁 的例子。

pom.xml

<!--curator-framework-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>
        <!--curator-recipes-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>

CuratorConfig.java

@ConfigurationProperties(prefix = "custom.zookeeper")
@Data
@Configuration
public class CuratorConfig {
    private String connectString;
    private int baseSleepTimeMs;
    private int maxRetries;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    
    /**
     * 注册 CuratorFramework
     * @return
     */
    @Bean
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        return CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .build();
    }
}

CuratorStart.java

@Component
public class CuratorStart implements ApplicationRunner {
    private final CuratorFramework curatorFramework;

    public CuratorStart(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    @Override
    public void run(ApplicationArguments applicationArguments){
        curatorFramework.start();
    }
}

4.2. 示例(选举)

TimerService.java

@Service
@Slf4j
public class TimerService {
    private static final String ZK_NODE_TASK_TIMER="/task-timer";
    DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss");

    @Value("${server.port}")
    private String serverPort;

    private final CuratorFramework curatorFramework;


    public TimerService(CuratorFramework curatorFramework){
        this.curatorFramework=curatorFramework;
    }

    @Scheduled(cron = "*/5 * * * * *")
    public void task(){
        LeaderLatch leaderLatch=new LeaderLatch(curatorFramework,ZK_NODE_TASK_TIMER);
        try {
            leaderLatch.start();
            Thread.sleep(2000);
            if (leaderLatch.hasLeadership()){
                log.warn("* <主服务> 是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            }else {
                log.warn("副服务是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                leaderLatch.close();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}

这里经过 @Scheduled 实现了定时任务,每5秒钟执行一次。这是咱们常常头疼的问题,spring的定时是基于jvm进程内的现场池执行的,若是扩展节点,多个spring进程同时执行的话,就会重复执行定时任务。

那么这里咱们分别指定port为 800一、800二、8003,分别运行3个程序。当它们每隔5秒同时触发方法执行时,就会在zookeeper中模拟一个选举,最终只有一个程序做为<主服务>被执行。

4.3. 示例(分布式锁)

EmployeeController.java

@Slf4j
@RestController
public class EmployeeController {
    DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss");

    private final CuratorFramework curatorFramework;

    public EmployeeController(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    /**
     * InterProcessMutex经过在zookeeper的某路径节点下建立临时序列节点来实现分布式锁,即每一个线程(跨进程的线程)获取同一把锁前,都须要在一样的路径下建立一个节点,节点名字由uuid + 递增序列组成。而经过对比自身的序列数是否在全部子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变更状况,而后进行等待状态。直到watcher的事件生效将本身唤醒,或者超时时间异常返回。
     *
     * @param key
     * @return
     */
    @GetMapping("/demo/{key}")
    public String save(@PathVariable("key") String key) {
        // 获取锁
        InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + key);
        try {
            // 执行加锁操做
            balanceLock.acquire();
            log.warn("lock《, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));

            Thread.sleep(10000);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // 释放锁资源
                balanceLock.release();
                log.warn("unlock》, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return key;
    }
}

当咱们模拟不一样的请求竞争同一个key时,每次加锁,线程10秒钟后,再解锁,最后其余的请求才能够再加锁,重复以前的操做。在同一个程序内,我能够经过jdk的线程锁来实现相似的功能,但若是咱们想要实如今不一样的程序中均可以如此加锁,就只能经过分布式锁来实现。