Zookeeper 原理与实践

一、Zookeeper 的由来

在Hadoop生态系统中,许多项目的Logo都采用了动物,好比 Hadoop 和 Hive 采用了大象的形象,HBase 采用了海豚的形象,而从字面上来看 ZooKeeper 表示动物园管理员,因此你们能够理解为 ZooKeeper就是对这些动物(项目组件)进行一些管理工做的。java

对于单机环境多线程的竞态资源协调方法,咱们通常经过线程锁来协调对共享数据的访问以保证状态的一致性。 可是分布式环境如何进行协调呢?因而,Google创造了Chubby,而ZooKeeper则是对于Chubby的一个开源实现。 
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。因为ZooKeeper的开源特性,后来咱们的开发者在分布式锁的基础上,摸索了出了其余的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。它被设计为易于编程,使用文件系统目录树做为数据模型。node

二、ZooKeeper集群模式典型架构

2.1 角色

Zookeeper服务自身组成一个集群(2n+1个服务容许n>=1个失效)。Zookeeper集群是一个基于主从复制的高可用集群,每一个服务器承担以下三种角色中的一种python

  • Leader 一个Zookeeper集群同一时间只会有一个实际工做的Leader,它会发起并维护与各Follwer及Observer间的心跳。全部的写操做必需要经过Leader完成再由Leader将写操做广播给其它服务器。
  • Follower 一个Zookeeper集群可能同时存在多个Follower,它会响应Leader的心跳。Follower可直接处理并返回客户端的读请求,同时会将写请求转发给Leader处理,而且负责在Leader处理写请求时对请求进行投票。
  • Observer 角色与Follower相似,可是无投票权。

Zookeeper Architecture

2.2 数据特性

  • 顺序一致性:按照客户端发送请求的顺序更新数据。mysql

  • 原子性:更新要么成功,要么失败,不会出现部分更新。sql

  • 单一性 :不管客户端链接哪一个server,都会看到同一个视图。apache

  • 可靠性:一旦数据更新成功,将一直保持,直到新的更新。编程

  • 及时性:客户端会在一个肯定的时间内获得最新的数据。安全

2.3 数据模型 Znode 

Zookeeper表现为一个分层的文件系统目录树结构(不一样于文件系统的是,节点能够有本身的数据,而文件系统中的目录节点只有子节点)。服务器

  • znode是被它所在的路径惟一标识
  • znode能够有子节点目录,而且每一个znode能够存储数据
  • 每一个znode中存储的数据能够有多个版本
  • znode能够被监控
  • 临时+编号
  • 每次对Zookeeper的状态的改变都会产生一个zxid,全局有序

三、核心原理

3.1 原子广播(ZAB协议)

为了保证写操做的一致性与可用性,Zookeeper专门设计了一种名为原子广播(ZAB)的支持崩溃恢复的一致性协议。基于该协议,Zookeeper实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性。根据ZAB协议,全部的写操做都必须经过Leader完成,leader收到写请求,会将请求转为proposal并广播给全部其它节点,其余节点根据协议进行批准或经过。broadcast阶段事实上就是一个两阶段提交的简化版。其全部过程都跟两阶段提交一致,惟一不一致的是不能作事务的回滚。若是实现完整的两阶段提交,那就解决了一致性问题,不必发明新协议了,因此zab实际上抛弃了两阶段提交的事务回滚,因而一台follower只能回复ACK或者干脆就不回复了,leader只要收到过半的机器回复即经过proposal。网络

一旦Leader节点没法工做,ZAB协议可以自动从Follower节点中从新选出一个合适的替代者,即新的Leader,该过程即为领导选举。该领导选举过程,是ZAB协议中最为重要和复杂的过程。

3.2 Watch机制

  • (一次性触发)One-time trigger 
  • (发送至客户端)Sent to the client 
  • (被设置 watch 的数据)The data for which the watch was set

3.3 出现脑裂怎么办(split-brain)

  • GC、网络假死
  • Fencing

3.4 并发性能问题

  • 读写分离+ZAB

四、应用场景

4.1 统一命名服务

  • 全局惟一的ID

  • 集群管理:动态的服务注册和发现

def test_create():
    nodename = '/zk_test/service'
    index = 1
    while index < 10:
        zk.create(nodename,
                  b'192.168.187.215_{index}'.
                  format(index=index),
                        ephemeral=True,
                        sequence=True)
        time.sleep(3)
        index += 1



@zk.ChildrenWatch('/zk_test')
def my_func(children):
    print children
children = zk.get_children('/zk_test',watch=my_func)

while True:
    time.sleep(3)

4.2 分布式锁

my_id = uuid.uuid4()
lock = zk.Lock("/lockpath", str(my_id))
print "I am {}".format(str(my_id))
def work():
    time.sleep(3)
    print "{} is working! ".format(str(my_id))
while True:
    with lock:
        work()
zk.stop()

4.3 配置管理(数据发布与订阅)

  • 推拉结合
  • 分布式协调/通知
    • 只通知一次

@zk.DataWatch('/zk_test/service')
def my_func(data, stat):
    print("Data is %s, Version is %s" %(data, stat.version))

while True:
    time.sleep(2)
    print '------'

4.4 集群管理之 Master/Leader Election

  • 传统的方案
    • 缺点
  • 分布式方案
    • 争抢

my_id = uuid.uuid4()
def leader_func():
    print "I am the leader {}".format(str(my_id))
    while True:
        print "{} is working! ".format(str(my_id))
        time.sleep(3)

election = zk.Election("/electionpath")

# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)

4.5 队列管理

  • 同步分布式队列
    • 当一个队列的成员都聚齐时,这个队列才可用,不然一直等待全部成员到达。
  • 分布式FIFO队列
    • 入队列有编号,出队列时经过 getChildren( ) 返回全部,消费最小

4.6 心跳检测/故障检测(Failure Detection)

#coding=utf-8
from kazoo.client import KazooClient
import time
import logging

logging.basicConfig()
zk = KazooClient(hosts='bjdhj-187-215.58os.org:2181')
zk.start()
 
# Determine if a node exists
while True:
    if zk.exists("/zk_test/service0000000054"):
        print "the worker is alive!"
    else:
        print "the worker is dead!"
        break
    time.sleep(3)
 
zk.stop()

4.7 其它场景

  • 数据存储(进度汇报)
    • offset → Zookeeper 
    • Zookeeper Write 影响 kafka 集群吞吐量
    • __consumer_offsets Topic( 0.8.2.2  → 0.10.1.1)
      • Group,Topic,Partition 组合 Key
      • acking 级别设置为了 -1
      • 内存三元组
  • 负载均衡
    • 服务端
    • 客户端

五、ZooKeeper在大型分布式系统中的应用

5.1 ZooKeeper在Hadoop中的应用

  • 主备切换

建立锁节点
在ZooKeeper上会有一个/yarn-leader-election/appcluster-yarn的锁节点,全部的ResourceManager在启动的时候,都会去竞争写一个Lock子节点:/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb,该节点是临时节点。ZooKeepr可以为咱们保证最终只有一个ResourceManager可以建立成功建立成功的那个ResourceManager就切换为Active状态没有成功的那些ResourceManager则切换为Standby状态

  • 注册Watcher监听

全部Standby状态的ResourceManager都会向/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb节点注册一个节点变动的Watcher监听,利用临时节点的特性,可以快速感知到Active状态的ResourceManager的运行状况。

  • 主备切换

当Active状态的ResourceManager出现诸如宕机或重启的异常状况时,其在ZooKeeper上链接的客户端会话就会失效,所以/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb节点就会被删除。此时其他各个Standby状态的ResourceManager就都会接收到来自ZooKeeper服务端的Watcher事件通知,而后会重复进行步骤1的操做

以上就是利用ZooKeeper来实现ResourceManager的主备切换的过程,实现了ResourceManager的HA。

HDFS中NameNode的HA的实现原理跟YARN中ResourceManager的HA的实现原理相同。其锁节点为/hadoop-ha/mycluster/ActiveBreadCrumb

  • RM状态存储

在 ResourceManager 中,RMStateStore 可以存储一些 RM 的内部状态信息,包括 Application 以及它们的 Attempts 信息、Delegation Token 及 Version Information 等。须要注意的是,RMStateStore 中的绝大多数状态信息都是不须要持久化存储的,由于很容易从上下文信息中将其重构出来,如资源的使用状况。在存储的设计方案中,提供了三种可能的实现,分别以下。

  • 基于内存实现,通常是用于平常开发测试。
  • 基于文件系统的实现,如HDFS。
  • 基于ZooKeeper实现。

因为这些状态信息的数据量都不是很大,所以Hadoop官方建议基于ZooKeeper来实现状态信息的存储。在ZooKeepr上,ResourceManager 的状态信息都被存储在/rmstore这个根节点下面。

RMAppRoot 节点下存储的是与各个 Application 相关的信息,RMDTSecretManagerRoot 存储的是与安全相关的 Token 等信息。每一个 Active 状态的 ResourceManager 在初始化阶段都会从 ZooKeeper 上读取到这些状态信息,并根据这些状态信息继续进行相应的处理。

小结:

ZooKeepr在Hadoop中的应用主要有:

  1. HDFS中NameNode的HA和YARN中ResourceManager的HA。
  2. 存储RMStateStore状态信息

5.2 ZooKeeper在HBase中的应用

HBase主要用ZooKeeper来实现HMaster选举与主备切换、系统容错、RootRegion管理、Region状态管理和分布式SplitWAL任务管理等。

  • HMaster选举与主备切换

HMaster选举与主备切换的原理和HDFS中NameNode及YARN中ResourceManager的HA原理相同。

  • 系统容错

当HBase启动时,每一个RegionServer都会到ZooKeeper的/hbase/rs节点下建立一个信息节点(下文中,咱们称该节点为”rs状态节点”),例如/hbase/rs/[Hostname],同时,HMaster会对这个节点注册监听。当某个 RegionServer 挂掉的时候,ZooKeeper会由于在一段时间内没法接受其心跳(即 Session 失效),而删除掉该 RegionServer 服务器对应的 rs 状态节点。与此同时,HMaster 则会接收到 ZooKeeper 的 NodeDelete 通知,从而感知到某个节点断开,并当即开始容错工做。

HBase为何不直接让HMaster来负责RegionServer的监控呢?若是HMaster直接经过心跳机制等来管理RegionServer的状态,随着集群愈来愈大,HMaster的管理负担会愈来愈重,另外它自身也有挂掉的可能,所以数据还须要持久化。在这种状况下,ZooKeeper就成了理想的选择。

  • RootRegion管理

对应HBase集群来讲,数据存储的位置信息是记录在元数据region,也就是RootRegion上的。每次客户端发起新的请求,须要知道数据的位置,就会去查询RootRegion,而RootRegion自身位置则是记录在ZooKeeper上的(默认状况下,是记录在ZooKeeper的/hbase/meta-region-server节点中)。当RootRegion发生变化,好比Region的手工移动、从新负载均衡或RootRegion所在服务器发生了故障等是,就可以经过ZooKeeper来感知到这一变化并作出一系列相应的容灾措施,从而保证客户端老是可以拿到正确的RootRegion信息。

  • Region管理

HBase里的Region会常常发生变动,这些变动的缘由来自于系统故障、负载均衡、配置修改、Region分裂与合并等。一旦Region发生移动,它就会经历下线(offline)和从新上线(online)的过程。

下线期间数据是不能被访问的,而且Region的这个状态变化必须让全局知晓,不然可能会出现事务性的异常。对于大的HBase集群来讲,Region的数量可能会多达十万级别,甚至更多,这样规模的Region状态管理交给ZooKeeper来作也是一个很好的选择。

  • 分布式SplitWAL任务管理

当某台RegionServer服务器挂掉时,因为总有一部分新写入的数据尚未持久化到HFile中,所以在迁移该RegionServer的服务时,一个重要的工做就是从WAL中恢复这部分还在内存中的数据,而这部分工做最关键的一步就是SplitWAL,即HMaster须要遍历该RegionServer服务器的WAL,并按Region切分红小块移动到新的地址下,并进行日志的回放(replay)

因为单个RegionServer的日志量相对庞大(可能有上千个Region,上GB的日志),而用户又每每但愿系统可以快速完成日志的恢复工做。所以一个可行的方案是将这个处理WAL的任务分给多台RegionServer服务器来共同处理,而这就又须要一个持久化组件来辅助HMaster完成任务的分配。当前的作法是,HMaster会在ZooKeeper上建立一个SplitWAL节点(默认状况下,是/hbase/SplitWAL节点),将“哪一个RegionServer处理哪一个Region”这样的信息以列表的形式存放到该节点上,而后由各个RegionServer服务器自行到该节点上去领取任务并在任务执行成功或失败后再更新该节点的信息,以通知HMaster继续进行后面的步骤。ZooKeeper在这里担负起了分布式集群中相互通知和信息持久化的角色。

小结:

以上就是一些HBase中依赖ZooKeeper完成分布式协调功能的典型场景。但事实上,HBase对ZooKeepr的依赖还不止这些,好比HMaster还依赖ZooKeeper来完成Table的enable/disable状态记录,以及HBase中几乎全部的元数据存储都是放在ZooKeeper上的。

因为ZooKeeper出色的分布式协调能力及良好的通知机制,HBase在各版本的演进过程当中愈来愈多地增长了ZooKeeper的应用场景,从趋势上来看二者的交集愈来愈多。HBase中全部对ZooKeeper的操做都封装在了org.apache.hadoop.hbase.zookeeper这个包中,感兴趣的同窗能够自行研究。

5.3 ZooKeeper在 canal 中的应用

  • Canal Client
    • 保证get/ack/rollback有序
  • canal server
    • 减小mysql dump的请求
  • 原理
    • zk 分布式锁

六、无锁实现

CAS

  • 原理
  • 缺陷
    • ABA
    • 冲突

AtomicInteger atomicInteger = new AtomicInteger();
for(int b = 0; b < numThreads; b++) {
  new Thread(() -> {
    for(int a = 0; a < iteration; a++) {
      atomicInteger.incrementAndGet();
    }
  }).start();
}

 public final int getAndIncrement() {
       for( ; ; ) {
           int current = get();
           int next = current + 1;
           if ( compareAndSet(current, next) )
                return current;
       }
}
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
//接口调用次数+1
public long increase(String url) {
    Long oldValue = urlCounter.get(url);
    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
    urlCounter.put(url, newValue);
    return newValue;
}
//获取调用次数
public Long getCount(String url){
    return urlCounter.get(url);
}
//模拟并发状况下的接口调用统计
for(int i=0;i<callTime;i++){
    executor.execute(new Runnable() {
        @Override
        public void run() {
            counterDemo.increase(url);
            countDownLatch.countDown();
        }
    });
}



// CAS:
while (true) {
    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
    oldValue = urlCounter.get(url);
    if (oldValue == null) {
        newValue = 1l;
        //初始化成功,退出循环
        if (urlCounter.putIfAbsent(url, 1l) == null) break;
        //若是初始化失败,说明其余线程已经初始化过了
    } else {
        newValue = oldValue + 1;
        //+1成功,退出循环,原子操做
        if (urlCounter.replace(url, oldValue, newValue)) break;
        //若是+1失败,说明其余线程已经修改过了旧值
    }
}
return newValue;

Refer:

[1] ZooKeeper 学习笔记之扫盲

https://my.oschina.net/leejun2005/blog/67250

[2] ZooKeeper 原理及其在 Hadoop 和 HBase 中的应用

http://blog.jobbole.com/110388/

[3] Java 并发实践 — ConcurrentHashMap 与 CAS

https://my.oschina.net/leejun2005/blog/81835

相关文章
相关标签/搜索