在不少时候,咱们均可以在各类框架应用中看到ZooKeeper的身影,好比Kafka中间件,Dubbo框架,Hadoop等等。为何处处都看到ZooKeeper?html
ZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特色,能够实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。java
ZooKeeper能保证:node
ZooKeeper的数据结构和Unix文件系统很相似,整体上能够看作是一棵树,每个节点称之为一个ZNode,每个ZNode默认能存储1M的数据。每个ZNode可经过惟一的路径标识。以下图所示:spring
建立ZNode时,能够指定如下四种类型,包括:apache
Watcher是基于观察者模式实现的一种机制。若是咱们须要实现当某个ZNode节点发生变化时收到通知,就可使用Watcher监听器。服务器
客户端经过设置监视点(watcher)向 ZooKeeper 注册须要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息。session
这种通知机制是一次性的。一旦watcher被触发,ZooKeeper就会从相应的存储中删除。若是须要不断监听ZNode的变化,能够在收到通知后再设置新的watcher注册到ZooKeeper。数据结构
监视点的类型有不少,如监控ZNode数据变化、监控ZNode子节点变化、监控ZNode 建立或删除。负载均衡
ZooKeeper是一个高可用的应用框架,由于ZooKeeper是支持集群的。ZooKeeper在集群状态下,配置文件是不会指定Master和Slave,而是在ZooKeeper服务器初始化时就在内部进行选举,产生一台作为Leader,多台作为Follower,而且遵照半数可用原则。框架
因为遵照半数可用原则,因此5台服务器和6台服务器,实际上最大容许宕机数量都是3台,因此为了节约成本,集群的服务器数量通常设置为奇数。
若是在运行时,若是长时间没法和Leader保持链接的话,则会再次进行选举,产生新的Leader,以保证服务的可用。
首先在官网下载ZooKeeper,我这里用的是3.3.6版本。
而后解压,复制一下/conf目录下的zoo_sample.cfg文件,重命名为zoo.cfg。
修改zoo.cfg中dataDir的值,并建立对应的目录:
最后到/bin目录下启动,我用的是window系统,因此启动zkServer.cmd,双击便可:
启动成功的话就能够看到这个对话框:
可视化界面的话,我推荐使用ZooInspector
,操做比较简便:
首先引入Maven依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
复制代码
接着咱们写一个Main方法,进行操做:
//链接地址及端口号
private static final String SERVER_HOST = "127.0.0.1:2181";
//会话超时时间
private static final int SESSION_TIME_OUT = 2000;
public static void main(String[] args) throws Exception {
//参数一:服务端地址及端口号
//参数二:超时时间
//参数三:监听器
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是不是链接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已链接...");
}
}
}
});
zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("新增ZNode成功");
zooKeeper.close();
}
复制代码
建立一个持久性ZNode,路径是/java,值为"Hello World":
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) 复制代码
参数解释:
ACL权限控制,有三个是ZooKeeper定义的经常使用权限,在ZooDefs.Ids类中:
/** * This is a completely open ACL. * 彻底开放的ACL,任何链接的客户端均可以操做该属性znode */
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
/** * This ACL gives the creators authentication id's all permissions. * 只有建立者才有ACL权限 */
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
/** * This ACL gives the world the ability to read. * 只能读取ACL */
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
复制代码
createMode就是前面讲过的四种ZNode类型:
public enum CreateMode {
/** * 持久性ZNode */
PERSISTENT (0, false, false),
/** * 持久性自动增长顺序号ZNode */
PERSISTENT_SEQUENTIAL (2, false, true),
/** * 临时性ZNode */
EPHEMERAL (1, true, false),
/** * 临时性自动增长顺序号ZNode */
EPHEMERAL_SEQUENTIAL (3, true, true);
}
复制代码
//同步获取节点数据
public byte[] getData(String path, boolean watch, Stat stat){
...
}
//异步获取节点数据
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){
...
}
复制代码
同步getData()方法中的stat参数是用于接收返回的节点描述信息:
public byte[] getData(final String path, Watcher watcher, Stat stat){
//省略...
GetDataResponse response = new GetDataResponse();
//发送请求到ZooKeeper服务器,获取到response
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (stat != null) {
//把response的Stat赋值到传入的stat中
DataTree.copyStat(response.getStat(), stat);
}
}
复制代码
使用同步getData()获取数据:
//数据的描述信息,包括版本号,ACL权限,子节点信息等等
Stat stat = new Stat();
//返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
byte[] bytes = zooKeeper.getData("/java", false, stat);
//打印结果
System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
复制代码
public Stat setData(final String path, byte data[], int version){
...
}
复制代码
值得注意的是第三个参数version,使用CAS机制,这是为了防止多个客户端同时更新节点数据,因此须要在更新时传入版本号,每次更新都会使版本号+1,若是服务端接收到版本号,对比发现不一致的话,则会抛出异常。
因此,在更新前须要先查询获取到版本号,不然你不知道当前版本号是多少,就无法更新:
//获取节点描述信息
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
System.out.println("更新ZNode数据...");
//更新操做,传入路径,更新值,版本号三个参数,返回结果是新的描述信息
Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
System.out.println("更新后的版本号为:" + setData.getVersion());//更新后的版本号为:1
复制代码
更新后,版本号增长了:
若是传入的版本参数是"-1",就是告诉zookeeper服务器,客户端须要基于数据的最新版本进行更新操做。可是-1并非一个合法的版本号,而是一个标识符。
public void delete(final String path, int version){
...
}
复制代码
这里也须要传入版本号,调用getData()方法便可获取到版本号,很简单:
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//删除ZNode
zooKeeper.delete("/java", stat.getVersion());
复制代码
在上面第三点提到,ZooKeeper是可使用通知监听机制,当ZNode发生变化会收到通知消息,进行处理。基于watcher机制,ZooKeeper能玩出不少花样。怎么使用?
ZooKeeper的通知监听机制,总的来讲能够分为三个过程:
①客户端注册 Watcher ②服务器处理 Watcher ③客户端回调 Watcher客户端。
注册 watcher 有 4 种方法,new ZooKeeper()、getData()、exists()、getChildren()。下面演示一下使用exists()方法注册watcher:
首先须要实现Watcher接口,新建一个监听器:
public class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
//获取事件类型
Event.EventType eventType = event.getType();
//通知状态
Event.KeeperState eventState = event.getState();
//节点路径
String eventPath = event.getPath();
System.out.println("监听到的事件类型:" + eventType.name());
System.out.println("监听到的通知状态:" + eventState.name());
System.out.println("监听到的ZNode路径:" + eventPath);
}
}
复制代码
而后调用exists()方法,注册监听器:
zooKeeper.exists("/java", new MyWatcher());
//对ZNode进行更新数据的操做,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), -1);
复制代码
而后在控制台就能够看到打印的信息:
这里咱们能够看到Event.EventType对象就是事件类型,咱们能够对事件类型进行判断,再配合Event.KeeperState通知状态,作相关的业务处理,事件类型有哪些?
打开EventType、KeeperState的源码查看:
//事件类型是一个枚举
public enum EventType {
None (-1),//无
NodeCreated (1),//Watcher监听的数据节点被建立
NodeDeleted (2),//Watcher监听的数据节点被删除
NodeDataChanged (3),//Watcher监听的数据节点内容发生变动
NodeChildrenChanged (4);//Watcher监听的数据节点的子节点列表发生变动
}
//通知状态也是一个枚举
public enum KeeperState {
Unknown (-1),//属性过时
Disconnected (0),//客户端与服务端断开链接
NoSyncConnected (1),//属性过时
SyncConnected (3),//客户端与服务端正常链接
AuthFailed (4),//身份认证失败
ConnectedReadOnly (5),//返回这个状态给客户端,客户端只能处理读请求
SaslAuthenticated(6),//服务器采用SASL作校验时
Expired (-112);//会话session失效
}
复制代码
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的监听器");
}
});
//对ZNode进行更新数据的操做,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), -1);
//企图第二次触发监听器
zooKeeper.setData("/java", "spring".getBytes(), -1);
复制代码
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的监听器");
}
});
Stat stat = new Stat();
zooKeeper.getData("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是getData()方法的监听器");
}
}, stat);
//对ZNode进行更新数据的操做,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());
复制代码
打印结果,说明先调用exists()方法的监听器,再调用getData()方法的监听器。由于exists()方法先注册了。
我记得B站的UP主李永乐说过,只有你让更多的人生活变得更美好时,本身的生活才能变得更美好。
这句话也是我今年开始写技术分享的一个动力源泉,但愿这篇文章对你有用~
著名的飞行家马老师说过:回城是收费的,而点赞是免费的~