Zookeeper是一个分布式服务协调组件,是Hadoop、Hbase、Kafka重要的依赖组件,它是一个为分布式应用提供一致性服务的组件。java
Zookeeper的目标就是封装好复杂易出错的服务,为使用者提供高效、稳定的服务。node
Zookeeper的使用场景:算法
1.Hadoop、Hbase、Kafka的依赖组件。数据库
2.做为注册中心,用于维护服务列表。apache
3.做为项目的配置中心,将一些重要的能够动态配置的信息放入zk中,利用zk的通知机制,当状态发生改变时通知客户端,将其变量放入静态变量中。缓存
4.做为本地缓存的更新策略,当信息更新时更新节点的值,利用zk的通知机制,各个应用获取到节点的值,再从数据库中查询,而后更新到本地缓存。服务器
5.中间件的高可用性。 session
Zookeeper维护了一个相似文件系统的数据结构,有根目录 (/) 和若干个子目录 (树形结构 , 与Linux相似 )数据结构
*每一个目录都称为一个znode,每一个znode都包含自身节点的数据且每一个znode下能够包含多个znode。框架
*在建立znode时必须指定znode的数据,能够为null。
*znode下的数据有版本号,当进行更新操做时版本号会+1。
*当使用JAVA进行更新和删除操做时,须要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操做(并不是每一个znode下包含多个版本的数据,传递版本号只是用来作CAS,默认值为-1,表示不进行CAS判断)
*删除znode时,若该znode包含子znode,那么必须先删除全部的子znode,不然没法删除。
持久化节点:不管客户端的链接是否断开,该节点依然存在。
持久化节点并顺序编号:在持久化节点的基础上,Zookeeper动态的为znode进行自动编号,即建立/p节点,那么Zookeeper将把其节点命名为/p1,当再次建立/p节点,那么Zookeeper将把该节点命名为/p2。
临时节点:当客户端的链接断开,临时节点及其节点中的数据将会被删除。
临时节点并顺序编号:在临时节点的基础上,Zookeeper动态的为znode进行自动编号,与持久化节点并顺序编号的区别是,临时节点并顺序编号会在客户端断开链接以后会自动删除节点以及节点中的数据。
客户端能够监听它关心的节点,当目标节点发生变化时 (数据版本号改变、被删除、子目录节点增长和删除),Zookeeper会通知客户端,客户端再做出相应的处理。
*zk并非根据节点的值改变而触发通知的,而是根据节点中数据的版本号。
*当节点中的值重复时,但因为数据的版本号发生改变,所以仍然能够经过通知机制通知客户端。
Zookeeper通常是经过集群的方式进行使用,即多台Zookeeper服务构成一个有关系的组。
当搭建了一个Zookeeper集群,Zookeeper会根据选举算法,从多个Zookeeper节点中选取一个做为Leader,剩余的节点做为Follower,Leader会与各个Follower创建一个有效的长链接,保证各个节点的通讯正常 (每台服务器都有可能被选取为Leader)
当Zookeeper集群搭建完成后,就能够启动不少个客户端与zk节点进行链接 (长链接方式,保证客户端与服务器能有效持久的链接)
当某个节点收到修改的操做时,首先会把请求转发给Leader,Leader内有处理机制,它会操做修改而且同步操做给全部的Follower节点。
*一旦选取的Leader节点宕机,则会从新组织Zookeeper集群,选取新的Leader,从新与各个节点创建链接 (从新选取的时间很短,大概200ms)
*因为Zookeeper是由java语言编写的,所以在安装Zookeeper前须要安装好JDK,而且配置环境变量JAVA_HOME
从Zookeeper官网下载zk进行解压安装:
zkEnv.sh:用于配置zk服务启动时的环境变量 (包括加载配置文件的路径等)
zkServer.sh:用于启动zk服务,默认监听2181端口。
zkCli.sh:用于启动zk客户端。
zookeeper.out:用于存放zk运行时的日志。
log4j.properties文件:zk运行时的日志配置文件,默认日志信息都将打印到bin目录下的zookeeper.out文件 (当使用Zookeeper遇到异常时应该查看此文件下的内容)
zoo_sample.conf文件:zk服务的配置文件,由Zookeeper官方提供 (默认zk服务启动时将加载conf目录下的zoo.cfg配置文件)
Zookeeper启动时默认加载conf目录下的zoo.cfg配置文件,所以将conf目录下的zoo_sample.conf配置文件改名为zoo.cfg。
配置文件
#基础配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata
dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog
clientPort=2181
autopurge.purgeInterval=1
tickTime:initLimit、syncLimit属性的时间单位,值是毫秒。
initLimit:Zookeeper集群搭建前所容许的初始化时间。
syncLimit:Leader发送心跳给Follower,Follower向Leader回复心跳这一过程所容许的最大时长 (rtt,往返时间),一旦超过了这个时间,则Leader认为该Follower宕机。
dataDir:Zookeeper快照日志的存放目录。
dataLogDir:Zookeeper事务日志的存放目录。
clientPort:Zookeeper服务监听的端口,默认为2181。
*当其中一台zk服务启动后,剩余的zk服务必须在initLimit规定的时间内全都启动,不然zk进行集群的搭建时会认为未启动的zk服务已经失效。
*若是不配置dataLogDir,那么Zookeeper的事务日志将写入到dataDir目录下 (会严重影响zk的性能)
使用zkServer.sh命令启动Zookeeper服务。
使用jps命令查询zk进程是否启动成功,当出现QuorumPeerMain表示zk启动成功。
#基础配置
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog clientPort=2181 autopurge.purgeInterval=1
#集群配置 server.1=192.168.1.119:2888:3888 server.2=192.168.1.122:2888:3888 server.3=192.168.1.125:2888:3888
在conf文件下使用server.标识属性配置zk集群,使多个zk服务构成一个组 (标识必须为整数)
server.本机zk标识 = zk服务地址:leader和follower之间的通讯端口:leader选举端口
server.其余zk标识 = zk服务地址:leader和follower之间的通讯端口:leader选举端口
*标识与zk服务进行绑定,所以同一个集群下的zk服务的标识不能相同。
*leader和follower之间的通讯端口默认是2888,leader选举端口默认是3888。
#将1输入到myid文件中
echo "1" > myid
*须要修改要构成集群的其余zk节点的配置文件以及设置其myid文件。
*在 initLimit * tickTime的时间内启动集群中的全部zk节点。
*搭建Zookeeper集群时务必遵循2n+1个节点,由于根据Zookeeper的工做原理,只要有大于一半的节点存活,则Zookeeper集群就可以对外提供服务。
*搭建zk集群时需关闭每台zk服务器上的防火墙或者开放对应的端口,不然集群中的zk间没法进行通信。
*zk集群在高负荷的工做时会产生大量的事务日志,若是日志长期不进行清理容易将分区中的空间占满最终致使zk服务没法运行,所以须要按期清理zk产生的事务日志 (能够配合Linux的crontab命令设置天天定时去执行清除日志文件的脚本)
能够经过Apache提供的Zookeeper API对zk进行操做 (提供基本功能),也可使用Apache提供的Curator框架操做zk (提供更全面的功能)
*Curator框架除了基本的节点添加、删除、修改、查询,监听节点功能外,还提供了session超时重连、主从选举、分布式计算器、分布式锁等等适用于各类复杂的zk场景的API封装。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<type>pom</type>
</dependency>
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
connectString:zookeeper server列表,多个以逗号隔开。
sessionTimeout:指定链接Zookeeper的超时时间。
watcher:事件回调接口。
*ZooKeeper实例将从服务列表中选择一个server创建链接,若链接失败则选择另一个server从新进行链接。
*ZooKeeper实例是经过异步的方式创建链接,当链接创建完毕后回调指定Watcher的process方法,所以程序为了保证同步创建链接,可使用JAVA提供的CountDownLatch同步辅助类进行控制。
//建立节点,指定节点路径、数据、节点的类型 public String create(final String path, byte data[], List<ACL> acl,CreateMode createMode) //获取子节点 public List<String> getChildren(final String path, boolean watch) //判断节点是否存在 public Stat exists(String path, boolean watch) //获取节点中的数据 public byte[] getData(String path, boolean watch, Stat stat) //设置或更新节点中的数据 public Stat setData(final String path, byte data[], int version) //删除节点 public void delete(final String path, int version)
*建立节点时须要指定节点的类型,Apache Zookeeper API中提供了CreateMode枚举类,用于指定节点的类型。
CreateMode.PERSISTENT:持久化节点
CreateMode.PERSISTENT_SEQUENTIAL:持久化节点并顺序编号
CreateMode.EPHEMERAL:临时节点
CreateMode.EPHEMERAL_SEQUENTIAL:临时节点并顺序编号
*当进行更新和删除操做时,须要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操做(并不是每一个znode下包含多个版本的数据,传递版本号只是用来作CAS,默认值-1,表示不进行CAS判断)
*Apache Zookeeper API中有不少方法都支持Watcher类型参数,Watcher可用于监听事件的发生以及链接状态的改变,包括节点的建立、删除、节点中数据的改变、节点的子节点发生改变等事件,失去链接、异步链接、认证失败、只读链接、链接过时等链接状态。
/** * @Auther: ZHUANGHAOTANG * @Date: 2018/11/12 14:55 * @Description: */ public class ZKUtils { /** * 日志输出 */ private static Logger logger = LoggerFactory.getLogger(ZKUtils.class); /** * ZK服务列表 */ private static final String URLS = "192.168.1.80:2181,192.168.1.81:2181,192.168.1.83:2181"; /** * 链接Zookeeper的超时时长(单位:毫秒) */ private static final int SESSION_TIMEOUT = 3000; /** * Zookeeper链接对象 */ private static ZooKeeper zk = null; static { try { CountDownLatch countDownLatch = new CountDownLatch(1);//锁存器(同步辅助类) zk = new ZooKeeper(URLS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { countDownLatch.countDown();//倒数器-1 } } }); countDownLatch.await(); } catch (Exception e) { logger.info("Zookeeper获取链接失败,{}", e); } } /** * 建立节点 * * @param path * @param data * @param createMode * @throws Exception */ public static void createPath(String path, String data, CreateMode createMode) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } if (!exists(path)) { zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode); } } /** * 获取子节点 * * @param path * @return * @throws Exception */ public static List<String> getSubNode(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } return zk.getChildren(path, false); } /** * 判断节点是否存在 * * @param path * @return * @throws Exception */ public static boolean exists(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } if (zk.exists(path, false) != null) { return true; } return false; } /** * 获取节点中的数据 * * @param path * @return * @throws Exception */ public static String getData(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } return new String(zk.getData(path, false, null)); } /** * 更新节点中的数据 * * @param path * @param data * @throws Exception */ public static void setData(String path, String data) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } zk.setData(path, data.getBytes(), -1); } /** * 删除节点 * * @param path * @throws Exception */ public static void deletePath(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } zk.delete(path, -1); } }