ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序能够基于它实现同步服务,配置维护和命名服务等。不少中间件,好比kafka、hadoop、hbase,都用到了 Zookeeper来构建集群。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,因为工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,所以须要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。java
数据结构
理解ZooKeeper的一种方法是将其看作一个具备高可用性的文件系统。但这个文件系统中没有文件和目录,而是统一使用节点(node)的概念,称为znode。znode既能够保存数据(如同文件),也能够保存其余znode(如同目录)。全部的znode构成一个层次化的数据结构,。node
集群
生产环境的zookeeper服务,通常都是以集群的方式搭建环境。在zookeeper集群中,不一样节点通常有下面几种角色:算法
这里面提到了“事务请求”和“非事务请求”,这里说明一下。 事务请求能够理解成数据库中包含commit操做的请求,例如:新增、修改和删除。而非事务请求则对应那些查询的请求。spring
可见在zookeeper集群中,真正决策的只有一个Leader节点,全部的事务请求到达其余节点后,都仍是会被转发到Leader节点来处理。这种模式,保障了zookeeper在命令决策端的原子性。docker
Leader选举算法采用了Paxos协议,当多数Server写成功,则任务数据写成功若是有3个Server,则两个写成功便可;若是有4或5个Server,则三个写成功便可。Server数目通常为奇数(三、五、7)若是有3个Server,则最多容许1个Server挂掉;若是有4个Server,则一样最多容许1个Server挂掉由此,咱们看出3台服务器和4台服务器的的容灾能力是同样的,因此为了节省服务器资源,通常咱们采用奇数个数,做为服务器部署个数。shell
原子广播
zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫作Zab协议。Zab协议有两种模式,它们分别是恢复模式
和广播模式
。数据库
当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和leader的状态同步之后,恢复模式就结束了。状态同步保证了leader和server具备相同的系统状态。一旦leader已经和多数的follower进行了状态同步后,他就能够开始广播消息了,即进入广播状态。apache
数据一致性与paxos算法
在一个分布式数据库系统中,若是各节点的初始状态一致,每一个节点都执行相同的操做序列,那么他们最后能获得一个一致的状态。编程
Paxos算法经过投票来对写操做进行全局编号,同一时刻,只有一个写操做被批准,同时并发的写操做要去争取选票,只有得到过半数选票的写操做才会被批准(因此永远只会有一个写操做获得批准),其余的写操做竞争失败只好再发起一轮投票,就这样,在日复一日年复一年的投票中,全部写操做都被严格编号排序。bash
编号严格递增,当一个节点接受了一个编号为100的写操做,以后又接受到编号为99的写操做(由于网络延迟等不少不可预见缘由),它立刻能意识到本身数据不一致了,自动中止对外服务并重启同步过程。任何一个节点挂掉都不会影响整个集群的数据一致性(总2n+1台,除非挂掉大于n台)。
新手不要混淆,在这里的投票选举是针对多个客户端有并发写操做时,争夺该惟一写操做权的选举。和前面说的zookeeper集群中,投票选举master是不一样的概念。虽然它们的选举协议,都是遵循paxos算法。
脑裂和过半选举
脑裂:对于一个集群,想要提升这个集群的可用性,一般会采用多机房部署,好比如今有一个由6台节点所组成的一个集群,部署在了两个机房。正常状况下,此集群只会有一个Leader,那么若是机房之间的网络断了以后,每一个机房都选举本身的Leader,这就至关于本来一个集群,被分红了两个集群,出现了两个“大脑”,这就是脑裂。若是过了一会,断了的网络忽然联通了,那么此时就会出现问题了,两个集群刚刚都对外提供服务了,数据该怎么合并,数据冲突怎么解决等等问题。
过半选举:在领导者选举的过程当中,若是某台zkServer得到了超过半数的选票,则此zkServer就能够成为Leader了。
由于zookeeper的过半选举,所以zookeeper不存在“脑裂”的状况。例如仍是6个节点分布在两个机房,只有当某个节点得到4个节点以上的选票,才能升级为Leader,所以不会出现两个Leader的状况。
docker 安装
这里直接使用zookeeper官方镜像来安装。
docker run -d \ --name zookeeper \ --restart=on-failure:3 \ -p 2181:2181 \ -v /Volumes/zookeeper/data/:/data/ \ zookeeper
启动
执行 docker exec -it 编号 /bin/bash
,进入容器内部。
而后在 /bin
目录,执行 ./zkCli.sh
,运行启动脚本。
zooInspector客户端
zooInspector是zookeeper图形化的客户端工具,可用来查看内部数据状况。
可下载 ZooInspector.zip,解压后在build目录下获取 zookeeper-dev-ZooInspector.jar。经过 java -jar zookeeper-dev-ZooInspector.jar
,便可启动 ZooInspector 图形化客户端。
CAP理论告诉咱们,一个分布式系统不可能同时知足如下三种:
这三个基本需求,最多只能同时知足其中的两项,由于P是必须的,所以每每选择就在CP
或者AP
中。
一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否可以保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操做后,应该保证系统的数据仍然处于一致的状态。例如一个将数据副本分布在不一样分布式节点上的系统来讲,若是对第一个节点的数据进行了更新操做而且更新成功后,其余节点上的数据也应该获得更新,而且全部用户均可以读取到其最新的值,那么这样的系统就被认为具备强一致性(或严格的一致性,最终一致性)。
可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每个操做请求老是可以在有限的时间内返回结果。“有效的时间内”是指,对于用户的一个操做请求,系统必须可以在指定的时间(即响应时间)内返回对应的处理结果,若是超过了这个时间范围,那么系统就被认为是不可用的。
分区容错性(P:Partition Tolerance)
分区容错性约束了一个分布式系统须要具备以下特性:分布式系统在遇到任何网络分区故障的时候,仍然须要可以保证对外提供知足一致性和可用性的服务,除非是整个网络环境都发生了故障。
网络分区是指在分布式系统中,不一样的节点分布在不一样的子网络(机房或异地网络等)中,因为一些特殊的缘由致使这些子网络之间出现网络不连通的情况,但各个子网络的内部网络是正常的,从而致使整个系统的网络环境被切分红了若干个孤立的区域。须要注意的是,组成一个分布式系统的每一个节点的加入与退出均可以看做是一个特殊的网络分区。
eureka保证ap
eureka优先保证可用性。在Eureka平台中,若是某台服务器宕机,Eureka不会有相似于ZooKeeper的选举leader的过程;客户端请求会自动切换 到新的Eureka节点;当宕机的服务器从新恢复后,Eureka会再次将其归入到服务器集群管理之中;而对于它来讲,全部要作的无非是同步一些新的服务 注册信息而已。
Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工做,剩余的节点依然能够提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时若是发现链接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。
zookeeper保证cp
做为一个分布式协同服务,zookeeper是优先保证一致性的。
进行leader选举时集群都是不可用。在使用ZooKeeper获取服务列表时,当master节点由于网络故障与其余节点失去联系时,剩余节点会从新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就致使在选举期间注册服务瘫痪,虽然服务可以最终恢复,可是漫长的选举时间致使的注册长期不可用是不能容忍的。因此说,ZooKeeper不能保证服务可用性。
发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就很是适合使用。
注意:在上面提到的应用场景中,有个默认前提是:数据量很小,可是数据更新可能会比较快的场景。
这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,一般同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就需要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是经过zookeeper来作到生产者、消费者的负载均衡。这里以metaq为例如讲下:
生产者负载均衡
metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,所以metaq在运行过程当中,会把全部broker和对应的分区信息所有注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在经过ZK获取分区列表以后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头至尾循环往复的方式选择一个分区来发送消息。
消费负载均衡
在消费过程当中,一个消费者会消费一个或多个分区中的消息,可是一个分区只会由一个消费者来消费。MetaQ的消费策略是:
在某个消费者故障或者重启等状况下,其余消费者会感知到这一变化(经过 zookeeper watch消费者列表),而后从新进行负载均衡,保证全部的分区都有消费者进行消费。
zookeeper的命名服务有两个应用方向,一个是提供相似JNDI的功能,利用zookeepeer的树型分层结构,能够把系统中各类服务的名称、地址以及目录信息存放在zookeeper,须要的时候去zookeeper中读取。
另外一个,是利用zookeeper顺序节点的特性,制做分布式的ID生成器,写过数据库应用的朋友都知道,咱们在往数据库表中插入记录时,一般须要为该记录建立惟一的ID,在单机环境中咱们能够利用数据库的主键自增功能。但在分布式环境则没法使用,有一种方式可使用UUID,可是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,咱们能够生成有顺序的,容易理解的,同时支持分布式环境的序列号。
ZooKeeper中特有watcher注册与异步通知机制,可以很好的实现分布式环境下不一样系统之间的通知与协调,实现对数据变动的实时处理。使用方法一般是不一样系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode自己内容及子节点的),其中一个系统update了znode,那么另外一个系统可以收到通知,并做出相应处理
总之,使用zookeeper来进行分布式通知和协调可以大大下降系统之间的耦合。
集群机器监控
这一般用于那种对集群中机器状态,机器在线率有较高要求的场景,可以快速对集群中机器变化做出响应。这样的场景中,每每有一个监控系统,实时检测集群机器是否存活。过去的作法一般是:监控系统经过某种手段(好比ping)定时检测每一个机器,或者每一个机器本身定时向监控系统汇报“我还活着”。 这种作法可行,可是存在两个比较明显的问题:
利用ZooKeeper有两个特性,就能够实时另外一种集群机器存活性监控系统:
例如,监控系统在 /clusterServers 节点上注册一个Watcher,之后每动态加机器,那么就往 /clusterServers 下建立一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就可以实时知道机器的增减状况,至于后续处理就是监控系统的业务了。
Master选举
Master选举则是zookeeper中最为经典的应用场景了。
在分布式环境中,相同的业务应用分布在不一样的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),每每只须要让整个集群中的某一台机器进行执行,其他机器能够共享这个结果,这样能够大大减小重复劳动,提升性能,因而这个master选举即是这种场景下的碰到的主要问题。
利用ZooKeeper的强一致性,可以保证在分布式高并发状况下节点建立的全局惟一性,即:同时有多个客户端请求建立 /currentMaster 节点,最终必定只有一个客户端请求可以建立成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。
另外,这种场景演化一下,就是动态Master选举。这就要用到?EPHEMERAL_SEQUENTIAL类型节点的特性了。
上文中提到,全部客户端建立请求,最终只有一个可以建立成功。在这里稍微变化下,就是容许全部请求都可以建立成功,可是得有个建立顺序,因而全部的请求最终在ZK上建立结果的一种可能状况是这样: /currentMaster/{sessionId}-1 ,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器做为Master,若是这个机器挂了,因为他建立的节点会立刻小时,那么以后最小的那个机器就是Master了。
保持独占
所谓保持独占,就是全部试图来获取这个锁的客户端,最终只有一个能够成功得到这把锁。一般的作法是把zk上的一个znode看做是一把锁,经过create znode的方式来实现。全部客户端都去建立 /distribute_lock 节点,最终成功建立的那个客户端也即拥有了这把锁。
控制时序
首先,Zookeeper的每个节点,都是一个自然的顺序发号器。在每个节点下面建立子节点时,只要选择的建立类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一
好比,建立一个用于发号的节点“/test/lock”,而后以他为父亲节点,能够在这个父节点下面建立相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在建立子节点时,同时指明是有序类型。若是是第一个建立的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。
其次,Zookeeper节点的递增性,能够规定节点编号最小的那个得到锁。
一个zookeeper分布式锁,首先须要建立一个父节点,尽可能是持久节点(PERSISTENT类型),而后每一个要得到锁的线程都会在这个节点下建立个临时顺序节点,因为序号的递增性,能够规定排号最小的那个得到锁。因此,每一个线程在尝试占用锁以前,首先判断本身是排号是否是当前最小,若是是,则获取锁。
第三,Zookeeper的节点监听机制,能够保障占有锁的方式有序并且高效。
每一个线程抢占锁以前,先抢号建立本身的ZNode。一样,释放锁的时候,就须要删除抢号的Znode。抢号成功后,若是不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不须要其余人,只须要等前一个Znode 的通知就能够了。当前一个Znode 删除的时候,就是轮到了本身占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
Zookeeper这种首尾相接,后面监听前面的方式,能够避免羊群效应。所谓羊群效应就是每一个节点挂掉,全部节点都去监听,而后作出反映,这样会给服务器带来巨大压力,因此有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才作出反映。
队列方面,简单地讲有两种。
一种是常规的先进先出队列,另外一种是要等到队列成员聚齐以后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里再也不赘述。
第二种队列实际上是在FIFO队列的基础上做了一个加强。一般能够在 /queue 这个znode下预先创建一个/queue/num 节点,而且赋值为n(或者直接给/queue赋值n),表示队列大小,以后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否能够开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,须要在不少子任务完成(或条件就绪)状况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下创建本身的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现本身下面的子节点知足指定个数,就能够进行下一步按序进行处理了。
Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助咱们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经做为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来说,它提供了基于Fluent的编程风格支持。
除此以外,Curator还提供了Zookeeper的各类应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。
如今先让咱们看看Curator的几种锁方案:
接下来看下面一个项目的示例,项目中分别体现了 选举
和分布式锁
的例子。
pom.xml
<!--curator-framework--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency> <!--curator-recipes--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
CuratorConfig.java
@ConfigurationProperties(prefix = "custom.zookeeper") @Data @Configuration public class CuratorConfig { private String connectString; private int baseSleepTimeMs; private int maxRetries; private int sessionTimeoutMs; private int connectionTimeoutMs; /** * 注册 CuratorFramework * @return */ @Bean public CuratorFramework curatorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); return CuratorFrameworkFactory .builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .build(); } }
CuratorStart.java
@Component public class CuratorStart implements ApplicationRunner { private final CuratorFramework curatorFramework; public CuratorStart(CuratorFramework curatorFramework) { this.curatorFramework = curatorFramework; } @Override public void run(ApplicationArguments applicationArguments){ curatorFramework.start(); } }
TimerService.java
@Service @Slf4j public class TimerService { private static final String ZK_NODE_TASK_TIMER="/task-timer"; DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss"); @Value("${server.port}") private String serverPort; private final CuratorFramework curatorFramework; public TimerService(CuratorFramework curatorFramework){ this.curatorFramework=curatorFramework; } @Scheduled(cron = "*/5 * * * * *") public void task(){ LeaderLatch leaderLatch=new LeaderLatch(curatorFramework,ZK_NODE_TASK_TIMER); try { leaderLatch.start(); Thread.sleep(2000); if (leaderLatch.hasLeadership()){ log.warn("* <主服务> 是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter)); }else { log.warn("副服务是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter)); } }catch (Exception e){ e.printStackTrace(); }finally { try { leaderLatch.close(); }catch (Exception e){ e.printStackTrace(); } } } }
这里经过 @Scheduled
实现了定时任务,每5秒钟执行一次。这是咱们常常头疼的问题,spring的定时是基于jvm进程内的现场池执行的,若是扩展节点,多个spring进程同时执行的话,就会重复执行定时任务。
那么这里咱们分别指定port为 800一、800二、8003,分别运行3个程序。当它们每隔5秒同时触发方法执行时,就会在zookeeper中模拟一个选举,最终只有一个程序做为<主服务>
被执行。
EmployeeController.java
@Slf4j @RestController public class EmployeeController { DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss"); private final CuratorFramework curatorFramework; public EmployeeController(CuratorFramework curatorFramework) { this.curatorFramework = curatorFramework; } /** * InterProcessMutex经过在zookeeper的某路径节点下建立临时序列节点来实现分布式锁,即每一个线程(跨进程的线程)获取同一把锁前,都须要在一样的路径下建立一个节点,节点名字由uuid + 递增序列组成。而经过对比自身的序列数是否在全部子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变更状况,而后进行等待状态。直到watcher的事件生效将本身唤醒,或者超时时间异常返回。 * * @param key * @return */ @GetMapping("/demo/{key}") public String save(@PathVariable("key") String key) { // 获取锁 InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + key); try { // 执行加锁操做 balanceLock.acquire(); log.warn("lock《, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter)); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { try { // 释放锁资源 balanceLock.release(); log.warn("unlock》, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter)); } catch (Exception e) { e.printStackTrace(); } } return key; } }
当咱们模拟不一样的请求竞争同一个key时,每次加锁,线程10秒钟后,再解锁,最后其余的请求才能够再加锁,重复以前的操做。在同一个程序内,我能够经过jdk的线程锁来实现相似的功能,但若是咱们想要实如今不一样的程序中均可以如此加锁,就只能经过分布式锁来实现。