实现配置信息的集中式管理和数据的动态更新java
实现配置中心有两种模式:push 、pull。mysql
长轮训git
zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册本身须要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端github
发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据redis
请求/数据分摊多个计算机单元上算法
一般实现分布式锁有几种方式sql
数据方式去实现apache
建立一个表, 经过索引惟一的方式 create table (id , methodname …) methodname增长惟一索引 insert 一条数据XXX delete 语句删除这条记录 mysql for update 行锁,杜占锁
排他锁api
利用路径惟一安全
共享锁(读锁)
locks当中是有序节点,控制使用权限,每个客户端写一个节点以后,获取到最小节点,获取数据,有写的操做,优先处理写的节点,利用节点特性
package zk.lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class DistributeLock { //根节点 private static final String ROOT_LOCKS = "/LOCKS"; private ZooKeeper zooKeeper; //节点的数据 private static final byte[] data = {1, 2}; //会话超时时间 private int sessionTimeOut; //记录锁节点id private String lockID; private CountDownLatch countDownLatch = new CountDownLatch(1); public DistributeLock() throws IOException, InterruptedException { this.zooKeeper = ZookeeperClient.getInstance(); this.sessionTimeOut = ZookeeperClient.getSESSIONTIMEOUT(); } /** * 获取锁的方法 * * @return */ public synchronized boolean lock() { try { //LOCKS/000001 lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + "->" + "成功建立了lock节点[" + lockID + ",开始竞争锁]"); //获取根节点下的全部子节点 List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true); // 排序,从小到大 TreeSet<String> sortedSet = new TreeSet<>(); for (String children : childrenNodes) { sortedSet.add(ROOT_LOCKS + "/" + children); } //获取到最小的节点 String first = sortedSet.first(); if (lockID.equals(first)) { //表示当前就是最小的节点 System.out.println(Thread.currentThread().getName() + "-->成功得到锁,locak节点为:【" + lockID + "]"); return true; } SortedSet<String> lessThanLockId = sortedSet.headSet(lockID); if (!lessThanLockId.isEmpty()) { //获取到比当前LockId这个节点更小的上一个节点 String prevLockId = lessThanLockId.last(); //监控上一个节点 zooKeeper.exists(prevLockId, new LockWatcher(countDownLatch)); //若是会话超时或者节点被删除(释放)了 countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS); System.out.println(Thread.currentThread().getName() + "成功获取锁:【" + lockID + "】"); return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public synchronized boolean unLock() { System.out.println(Thread.currentThread().getName() + "-->开始释放锁"); try { zooKeeper.delete(lockID, -1); System.out.println("节点" + lockID + "被释放了"); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false; } public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(10); Random random = new Random(); for (int i = 0; i < 10; i++) { new Thread(() -> { DistributeLock lock = null; try { lock = new DistributeLock(); countDownLatch.countDown(); countDownLatch.await(); lock.lock(); Thread.sleep(random.nextInt(500)); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock != null) { lock.unLock(); } } }).start(); } } }
package zk.lock; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; public class LockWatcher implements Watcher { private CountDownLatch countDownLatch; public LockWatcher(CountDownLatch latch) { this.countDownLatch = latch; } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDataChanged) { countDownLatch.countDown(); } } }
7*24小时可用, 99.999%可用
master-slave模式
slave监听master节点,若是master节点挂掉,slave自动接替master,心跳机制维持,出现网络异常,slave认为master挂掉了,可能出现双主节点的状况,重复处理数据
使用zookeeper解决上述问题,往某一个节点注册master节点,只有一个可以注册成功,注册成功的为master,若是失去联系,节点会被删除,不会出现脑裂问题
per实现原理讲解
activeMQ、kafka、….
先进先出队列
Barrier模式
在一个节点下只能容许多少数据,只有子节点达到必定数量,才执行
curator 提供应用场景的封装
curator-reciples
master/leader选举
分布式锁(读锁、写锁)
分布式队列
LeaderLatch
写一个master
LeaderSelector
每个应用都写一个临时有序节点,根据最小的节点来得到优先权
curator 提供应用场景的封装
curator-reciples
master/leader选举
分布式锁(读锁、写锁)
分布式队列
…
LeaderLatch
写一个master
LeaderSelector
每个应用都写一个临时有序节点,根据最小的节点来得到优先权
leader是zookeeper集群的核心。
观察zookeeper集群中最新状态的变化并将这些状态同步到observer服务器上。
增长observer不影响集群中事务处理能力,同时还能提高集群的非事务处理能力
zookeeper通常是由 2n+1台服务器组成
选举算法:
leaderElection
AuthFastLeaderElection
FastLeaderElection
QuorumPeer startLeaderElection
源码地址:https://github.com/apache/zoo...
须要的条件: jdk 1.7以上 、ant 、idea
FastLeaderElection
serverid : 在配置server集群的时候,给定服务器的标识id(myid)
zxid : 服务器在运行时产生的数据ID, zxid的值越大,表示数据越新
Epoch: 选举的轮数
server的状态:Looking、 Following、Observering、Leading
第一次初始化启动的时候: LOOKING
a) 判断逻辑时钟(Epoch) ,若是Epoch大于本身当前的Epoch,说明本身保存的Epoch是过时。更新Epoch,同时clear其余服务器发送过来的选举数据。判断是否须要更新当前本身的选举状况
b) 若是Epoch小于目前的Epoch,说明对方的epoch过时了,也就意味着对方服务器的选举轮数是过时的。这个时候,只须要讲本身的信息发送给对方
c) 若是sid等于当前sid,根据规则来判断是否有资格得到leader
接受到来自其余服务器的投票后,针对每个投票,都须要将别人的投票和本身的投票进行对比,zxid,zxid最大的服务器优先
拜占庭问题
一组拜占庭将军分别各率领一支军队共同围困一座城市。为了简化问题,将各支军队的行动策略限定为进攻或撤离两种。由于部分军队进攻部分军队撤离可能会形成灾难性后果,所以各位将军必须经过投票来达成一致策略,即全部军队一块儿进攻或全部军队一块儿撤离。由于各位将军分处城市不一样方向,他们只能经过信使互相联系。在投票过程当中每位将军都将本身投票给进攻仍是撤退的信息经过信使分别通知其余全部将军,这样一来每位将军根据本身的投票和其余全部将军送来的信息就能够知道共同的投票结果而决定行动策略。系统的问题在于,将军中可能出现叛徒,他们不只可能向较为糟糕的策略投票,还可能选择性地发送投票信息。假设有9位将军投票,其中1名叛徒。8名忠诚的将军中出现了4人投进攻,4人投撤离的状况。这时候叛徒可能故意给4名投进攻的将领送信表示投票进攻,而给4名投撤离的将领送信表示投撤离。这样一来在4名投进攻的将领看来,投票结果是5人投进攻,从而发起进攻;而在4名投撤离的将军看来则是5人投撤离。这样各支军队的一致协同就遭到了破坏。
因为将军之间须要经过信使通信,叛变将军可能经过伪造信件来以其余将军的身份发送假投票。而即便在保证全部将军忠诚的状况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等状况。所以很难经过保证人员可靠性及通信可靠性来解决问题。
假始那些忠诚(或是没有出错)的将军仍然能经过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。
上述的故事映射到计算机系统里,将军便成了计算机,而信差就是通讯系统。虽然上述的问题涉及了电子化的决策支持与信息安全,却没办法单纯的用密码学与数字签名来解决。由于电路错误仍可能影响整个加密过程,这不是密码学与数字签名算法在解决的问题。所以计算机就有可能将错误的结果提交去,亦可能致使错误的决策。
paxos协议主要就是如何保证在分布式环网络环境下,各个服务器如何达成一致最终保证数据的一致性问题
ZAB协议,基于paxos协议的一个改进。
zab协议为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议
zookeeper并无彻底采用paxos算法, 而是采用zab Zookeeper atomic broadcast
leader节点主要负责把客户端的事务请求转化成一个事务提议(proposal),并分发给集群中的全部follower节点
再等待全部follower节点的反馈。一旦超过半数服务器进行了正确的反馈,那么leader就会commit这条消息
崩溃恢复
原子广播
什么状况下zab协议会进入崩溃恢复模式(没有接受到的数据进行同步)
zab协议进入崩溃恢复模式会作什么
假设一个事务在leader服务器被提交了,而且已经有过半的follower返回了ack。 在leader节点把commit消息发送给follower机器以前leader服务器挂了怎么办
zab协议,必定须要保证已经被leader提交的事务也可以被全部follower提交
zab协议须要保证,在崩溃恢复过程当中跳过哪些已经被丢弃的事务
zookeeper数据模型
临时节点(有序)、 持久化节点(有序)
临时节点其余节点也能看到
zookeeper是一个开源的分布式协调框架; 数据发布订阅、负载均衡、集群、master选举。。。
原子性: 要么同时成功、要么同时失败 (分布式事务)
单一视图: 不管客户端链接到哪一个服务器,所看到的模型都是同样
可靠性:一旦服务器端提交了一个事务而且得到了服务器端返回成功的标识,那么这个事务所引发的服务器端的变动会一直保留
实时性: 近实时
zookeeper并非用来存储数据的,经过监控数据状态的变化,达到基于数据的集群管理。分布式
server.id=ip:port:port 第一个Port 数据同步通讯、 第二个port :leader选举(3181)
id=myid (myid 参与leader选举、 在整个集群中表示惟一服务器的标识)
NOT_CONNECTED - > CONNECTING ->CONNECTED ->ClOSE
数据模型是一个树形结构,最小的数据单元是ZNODE
临时节点和持久化节点
临时有序节点
持久化有序节点
Stat
cZxid = 0xb0000000f
ctime = Sun Aug 13 20:24:03 CST 2017
mZxid = 0xb0000000f
mtime = Sun Aug 13 20:24:03 CST 2017
pZxid = 0xb0000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15dda30f72f0000
dataLength = 2
numChildren = 0
zab协议 : 若是客户端发了一个事务请求给到leader, 而leader发送给各个follower之后,而且收到了ack,leader已经commit。 在准备ack给各个follower节点comit的时候,leader挂了,怎么处理的。
EventyType
None 客户端与服务器端成功创建会话
NodeCreated 节点建立
NodeDeleted 节点删除
NodeDataChanged 数据变动:数据内容
NodeChildrenChanged 子节点发生变动: 子节点删除、新增的时候,才会触发
watcher的特性
一次性触发: 事件被处理一次后,会被移除,若是须要永久监听,则须要反复注册
zkClient ( 永久监听的封装)
curator
java api的话, zk.exists , zk.getData 建立一个watcher监听
zookeeper序列化使用的是Jute
保证存储在zookeeper上的数据安全性问题
schema(ip/Digest/world/super)
受权对象(192.168.1.1/11 , root:root / world:anyone/ super)
内存数据和磁盘数据
zookeeper会定时把数据存储在磁盘上。
DataDir = 存储的是数据的快照
快照: 存储某一个时刻全量的内存数据内容
DataLogDir 存储事务日志
zxid :服务器运行时产生的日志id
log.zxid
查看事务日志的命令
java -cp :/mic/data/program/zookeeper-3.4.10/lib/slf4j-api-1.6.1.jar:/mic/data/program/zookeeper-3.4.10/zookeeper-3.4.10.jar org.apache.zookeeper.server.LogFormatter log.200000001
zookeeper 有三种日志
zookeeper.out //运行日志
快照 存储某一时刻的全量数据
事务日志 事务操做的日志记录