zookeeper2

数据发布订阅/ 配置中心

实现配置信息的集中式管理和数据的动态更新java

实现配置中心有两种模式:push 、pull。mysql

长轮训git

zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册本身须要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端github

发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据redis

  1. 数据量比较小
  2. 数据内容在运行时会发生动态变动
  3. 集群中的各个机器共享配置

image-20190304092212934

负载均衡

请求/数据分摊多个计算机单元上算法

image-20190304092448656

分布式锁

一般实现分布式锁有几种方式sql

  1. redis。 setNX 存在则会返回0, 不存在
  2. 数据方式去实现apache

    建立一个表, 经过索引惟一的方式
    
    create table (id , methodname …)   methodname增长惟一索引
    
    insert 一条数据XXX   delete 语句删除这条记录
    
    mysql  for update 行锁,杜占锁
  3. zookeeper实现

    排他锁api

    image-20190307230710919

    利用路径惟一安全

共享锁(读锁)

​ locks当中是有序节点,控制使用权限,每个客户端写一个节点以后,获取到最小节点,获取数据,有写的操做,优先处理写的节点,利用节点特性

实现共享锁,使用java api的方式

package zk.lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class DistributeLock {
    //根节点
    private static final String ROOT_LOCKS = "/LOCKS";

    private ZooKeeper zooKeeper;

    //节点的数据
    private static final byte[] data = {1, 2};

    //会话超时时间
    private int sessionTimeOut;

    //记录锁节点id
    private String lockID;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public DistributeLock() throws IOException, InterruptedException {
        this.zooKeeper = ZookeeperClient.getInstance();
        this.sessionTimeOut = ZookeeperClient.getSESSIONTIMEOUT();
    }

    /**
     * 获取锁的方法
     *
     * @return
     */
    public synchronized boolean lock() {
        try {
            //LOCKS/000001
            lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + "->" + "成功建立了lock节点[" + lockID + ",开始竞争锁]");

            //获取根节点下的全部子节点
            List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);

            //    排序,从小到大
            TreeSet<String> sortedSet = new TreeSet<>();
            for (String children : childrenNodes) {
                sortedSet.add(ROOT_LOCKS + "/" + children);
            }

            //获取到最小的节点
            String first = sortedSet.first();
            if (lockID.equals(first)) {
                //表示当前就是最小的节点
                System.out.println(Thread.currentThread().getName() + "-->成功得到锁,locak节点为:【" + lockID + "]");
                return true;
            }

            SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);
            if (!lessThanLockId.isEmpty()) {
                //获取到比当前LockId这个节点更小的上一个节点
                String prevLockId = lessThanLockId.last();
                //监控上一个节点
                zooKeeper.exists(prevLockId, new LockWatcher(countDownLatch));
                //若是会话超时或者节点被删除(释放)了
                countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName() + "成功获取锁:【" + lockID + "】");
                return true;
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public synchronized boolean unLock() {
        System.out.println(Thread.currentThread().getName() + "-->开始释放锁");
        try {
            zooKeeper.delete(lockID, -1);
            System.out.println("节点" + lockID + "被释放了");
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }


    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                DistributeLock lock = null;
                try {
                    lock = new DistributeLock();
                    countDownLatch.countDown();
                    countDownLatch.await();
                    lock.lock();
                    Thread.sleep(random.nextInt(500));
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null) {
                        lock.unLock();
                    }
                }
            }).start();
        }

    }
}
package zk.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;


public class LockWatcher implements Watcher {
    private CountDownLatch countDownLatch;

    public LockWatcher(CountDownLatch latch) {
        this.countDownLatch = latch;
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            countDownLatch.countDown();
        }
    }
}

命名服务

master选举

​ 7*24小时可用, 99.999%可用

master-slave模式

​ slave监听master节点,若是master节点挂掉,slave自动接替master,心跳机制维持,出现网络异常,slave认为master挂掉了,可能出现双主节点的状况,重复处理数据

使用zookeeper解决上述问题,往某一个节点注册master节点,只有一个可以注册成功,注册成功的为master,若是失去联系,节点会被删除,不会出现脑裂问题

image-20190304114318988

per实现原理讲解

分布式队列

  1. master选举改为多线程(多进程)模型(master-slave) 建立三个工程,while去抢
  2. 分布式队列

    activeMQ、kafka、….

先进先出队列

  1. 经过getChildren获取指定根节点下的全部子节点,子节点就是任务
  2. 肯定本身节点在子节点中的顺序
  3. 若是本身不是最小的子节点,那么监控比本身小的上一个子节点,不然处于等待
  4. 接收watcher通知,重复流程

Barrier模式

​ 在一个节点下只能容许多少数据,只有子节点达到必定数量,才执行

curator 提供应用场景的封装

​ curator-reciples

master/leader选举

分布式锁(读锁、写锁)

分布式队列

LeaderLatch

写一个master

LeaderSelector

每个应用都写一个临时有序节点,根据最小的节点来得到优先权

curator 提供应用场景的封装

​ curator-reciples

master/leader选举

分布式锁(读锁、写锁)

分布式队列

LeaderLatch

​ 写一个master

LeaderSelector

​ 每个应用都写一个临时有序节点,根据最小的节点来得到优先权

zookeeper集群角色

leader

​ leader是zookeeper集群的核心。

  1. 事务请求的惟一调度者和处理者,保证集群事务处理的顺序性
  2. 集群内部各个服务器的调度者

follower

  1. 处理客户端非事务请求,以及转发事务请求给leader服务器
  2. 参与事务请求提议(proposal)的投票(客户端的一个事务请求,须要半数服务器投票经过之后才能通知leader commit; leader会发起一个提案,要求follower投票)
  3. 参与leader选举的投票

observer

​ 观察zookeeper集群中最新状态的变化并将这些状态同步到observer服务器上。

​ 增长observer不影响集群中事务处理能力,同时还能提高集群的非事务处理能力

zookeeper的集群组成

​ zookeeper通常是由 2n+1台服务器组成

leader选举

选举算法:

​ leaderElection

​ AuthFastLeaderElection

​ FastLeaderElection

QuorumPeer startLeaderElection

源码地址:https://github.com/apache/zoo...

须要的条件: jdk 1.7以上 、ant 、idea

FastLeaderElection

serverid : 在配置server集群的时候,给定服务器的标识id(myid)

zxid : 服务器在运行时产生的数据ID, zxid的值越大,表示数据越新

Epoch: 选举的轮数

server的状态:Looking、 Following、Observering、Leading

第一次初始化启动的时候: LOOKING

  1. 全部在集群中的server都会推荐本身为leader,而后把(myid、zxid、epoch)做为广播信息,广播给集群中的其余server, 而后等待其余服务器返回
  2. 每一个服务器都会接收来自集群中的其余服务器的投票。集群中的每一个服务器在接受到投票后,开始判断投票的有效性

    a) 判断逻辑时钟(Epoch) ,若是Epoch大于本身当前的Epoch,说明本身保存的Epoch是过时。更新Epoch,同时clear其余服务器发送过来的选举数据。判断是否须要更新当前本身的选举状况

    b) 若是Epoch小于目前的Epoch,说明对方的epoch过时了,也就意味着对方服务器的选举轮数是过时的。这个时候,只须要讲本身的信息发送给对方

    c) 若是sid等于当前sid,根据规则来判断是否有资格得到leader

    ​ 接受到来自其余服务器的投票后,针对每个投票,都须要将别人的投票和本身的投票进行对比,zxid,zxid最大的服务器优先

  3. 统计投票

img

ZAB协议

拜占庭问题

一组拜占庭将军分别各率领一支军队共同围困一座城市。为了简化问题,将各支军队的行动策略限定为进攻或撤离两种。由于部分军队进攻部分军队撤离可能会形成灾难性后果,所以各位将军必须经过投票来达成一致策略,即全部军队一块儿进攻或全部军队一块儿撤离。由于各位将军分处城市不一样方向,他们只能经过信使互相联系。在投票过程当中每位将军都将本身投票给进攻仍是撤退的信息经过信使分别通知其余全部将军,这样一来每位将军根据本身的投票和其余全部将军送来的信息就能够知道共同的投票结果而决定行动策略。

系统的问题在于,将军中可能出现叛徒,他们不只可能向较为糟糕的策略投票,还可能选择性地发送投票信息。假设有9位将军投票,其中1名叛徒。8名忠诚的将军中出现了4人投进攻,4人投撤离的状况。这时候叛徒可能故意给4名投进攻的将领送信表示投票进攻,而给4名投撤离的将领送信表示投撤离。这样一来在4名投进攻的将领看来,投票结果是5人投进攻,从而发起进攻;而在4名投撤离的将军看来则是5人投撤离。这样各支军队的一致协同就遭到了破坏。

因为将军之间须要经过信使通信,叛变将军可能经过伪造信件来以其余将军的身份发送假投票。而即便在保证全部将军忠诚的状况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等状况。所以很难经过保证人员可靠性及通信可靠性来解决问题。

假始那些忠诚(或是没有出错)的将军仍然能经过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。

上述的故事映射到计算机系统里,将军便成了计算机,而信差就是通讯系统。虽然上述的问题涉及了电子化的决策支持与信息安全,却没办法单纯的用密码学数字签名来解决。由于电路错误仍可能影响整个加密过程,这不是密码学与数字签名算法在解决的问题。所以计算机就有可能将错误的结果提交去,亦可能致使错误的决策。

paxos协议主要就是如何保证在分布式环网络环境下,各个服务器如何达成一致最终保证数据的一致性问题

ZAB协议,基于paxos协议的一个改进。

zab协议为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议

zookeeper并无彻底采用paxos算法, 而是采用zab Zookeeper atomic broadcast

zab协议的原理

  1. 在zookeeper 的主备模式下,经过zab协议来保证集群中各个副本数据的一致性
  2. zookeeper使用的是单一的主进程来接收并处理全部的事务请求,并采用zab协议,把数据的状态变动以事务请求的形式广播到其余的节点
  3. zab协议在主备模型架构中,保证了同一时刻只能有一个主进程来广播服务器的状态变动
  4. 全部的事务请求必须由全局惟一的服务器来协调处理,这个的服务器叫leader,其余的叫follower

    leader节点主要负责把客户端的事务请求转化成一个事务提议(proposal),并分发给集群中的全部follower节点

    再等待全部follower节点的反馈。一旦超过半数服务器进行了正确的反馈,那么leader就会commit这条消息

崩溃恢复

原子广播

zab协议的工做原理

  1. 什么状况下zab协议会进入崩溃恢复模式(没有接受到的数据进行同步)

    1. 当服务器启动时
    2. 当leader服务器出现网络中断、崩溃或者重启的状况
    3. 集群中已经不存在过半的服务器与该leader保持正常通讯
  2. zab协议进入崩溃恢复模式会作什么

    1. 当leader出现问题,zab协议进入崩溃恢复模式,而且选举出新的leader。当新的leader选举出来之后,若是集群中已经有过半机器完成了leader服务器的状态同(数据同步),退出崩溃恢复,进入消息广播模式
    2. 当新的机器加入到集群中的时候,若是已经存在leader服务器,那么新加入的服务器就会自觉进入数据恢复模式,找到leader进行数据同步

img

问题

​ 假设一个事务在leader服务器被提交了,而且已经有过半的follower返回了ack。 在leader节点把commit消息发送给follower机器以前leader服务器挂了怎么办

zab协议,必定须要保证已经被leader提交的事务也可以被全部follower提交

zab协议须要保证,在崩溃恢复过程当中跳过哪些已经被丢弃的事务

回顾

zookeeper数据模型

​ 临时节点(有序)、 持久化节点(有序)

​ 临时节点其余节点也能看到

zookeeper是一个开源的分布式协调框架; 数据发布订阅、负载均衡、集群、master选举。。。

原子性: 要么同时成功、要么同时失败 (分布式事务)

单一视图: 不管客户端链接到哪一个服务器,所看到的模型都是同样

可靠性:一旦服务器端提交了一个事务而且得到了服务器端返回成功的标识,那么这个事务所引发的服务器端的变动会一直保留

实时性: 近实时

zookeeper并非用来存储数据的,经过监控数据状态的变化,达到基于数据的集群管理。分布式

集群配置

  1. 修改zoo.cfg

    server.id=ip:port:port 第一个Port 数据同步通讯、 第二个port :leader选举(3181)

    id=myid (myid 参与leader选举、 在整个集群中表示惟一服务器的标识)

  2. dataDir目录下 建立一个myid的文件 , 内容: server.id对应当前服务器的id号
  3. 若是增长observer 须要在第一步中, server.id=ip:port:port:observer ; peerType=observer

会话

NOT_CONNECTED - > CONNECTING ->CONNECTED ->ClOSE

数据模型

数据模型是一个树形结构,最小的数据单元是ZNODE

临时节点和持久化节点

临时有序节点

持久化有序节点

状态信息

Stat

cZxid = 0xb0000000f

ctime = Sun Aug 13 20:24:03 CST 2017

mZxid = 0xb0000000f

mtime = Sun Aug 13 20:24:03 CST 2017

pZxid = 0xb0000000f

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x15dda30f72f0000

dataLength = 2

numChildren = 0

zab协议 : 若是客户端发了一个事务请求给到leader, 而leader发送给各个follower之后,而且收到了ack,leader已经commit。 在准备ack给各个follower节点comit的时候,leader挂了,怎么处理的。

  1. 选举新的leader(zxid的最大值)
  2. 同步给其余的folower

watcher

EventyType

None 客户端与服务器端成功创建会话

NodeCreated 节点建立

NodeDeleted 节点删除

NodeDataChanged 数据变动:数据内容

NodeChildrenChanged 子节点发生变动: 子节点删除、新增的时候,才会触发

watcher的特性
一次性触发: 事件被处理一次后,会被移除,若是须要永久监听,则须要反复注册

zkClient ( 永久监听的封装)

curator

java api的话, zk.exists , zk.getData 建立一个watcher监听

zookeeper序列化使用的是Jute

Acl权限的操做

保证存储在zookeeper上的数据安全性问题

schema(ip/Digest/world/super)
受权对象(192.168.1.1/11 , root:root / world:anyone/ super)

数据存储

内存数据和磁盘数据

zookeeper会定时把数据存储在磁盘上。

DataDir = 存储的是数据的快照

快照: 存储某一个时刻全量的内存数据内容

DataLogDir 存储事务日志

image-20190307230812881

zxid :服务器运行时产生的日志id

log.zxid

查看事务日志的命令

java -cp :/mic/data/program/zookeeper-3.4.10/lib/slf4j-api-1.6.1.jar:/mic/data/program/zookeeper-3.4.10/zookeeper-3.4.10.jar org.apache.zookeeper.server.LogFormatter log.200000001

zookeeper 有三种日志

zookeeper.out //运行日志

快照 存储某一时刻的全量数据

事务日志 事务操做的日志记录

相关文章
相关标签/搜索