JDK 版本 : OpenJDK 11.0.1java
IDE : idea 2018.3node
Zookeeper Server 版本 : 3.5.4-betaapache
Zookeeper Client 版本 : 3.5.4-betasession
Curator 版本 : 4.2.0异步
Zookeeper Client 是 Zookeeper 的经典原生客户端。使用以前须要在 Maven 中导入依赖:分布式
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version> </dependency>
代码:ide
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.TimeUnit; public class ClientTest { public static void main(String[] args) { /** * 建立一个 Zookeeper 的实例 * 此处为一个集群,Zookeeper 的 ip 之间用逗号隔开 * * 参数解释: * param 1 - Zookeeper 的实例 ip ,此处是一个集群,因此配置了多个 ip,用逗号隔开 * param 2 - session 过时时间,单位秒 (1000) * param 3 - 监视者,用于获取监控事件 (MyWatch) */ ZooKeeper zooKeeper = null; try { Watcher createZkWatch = new MyWatch(); zooKeeper = new ZooKeeper("localhost:2101,localhost:2102,localhost:2103", 1000,createZkWatch); } catch (IOException e) { e.printStackTrace(); } /** * 值得注意的是,Zookeeper 对象去链接中间件实例是异步的 * 因此此处须要作一个死循环等待它链接完毕 * 更加优雅的作法是使用 CownDownLatch 去作,可是 while 比较简单 */ while(zooKeeper.getState() == ZooKeeper.States.CONNECTING){ //返回 zookeeper 的状态 System.out.println(zooKeeper.getState()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } //若是链接不出错的话此处状态应该为 CONNECTED if(zooKeeper.getState() != ZooKeeper.States.CONNECTED) return; /** * 建立 ZooKeeper 节点 * 参数解释: * param 1 - znode 名称 (/zoo) * param 2 - 节点数据 (my first data) * param 3 - 设置权限 (OPEN_ACL_UNSAFE) * param 4 - znode 类型 (PERSISTENT) * * * znode 类型有四种: * PERSISTENT - 持久化目录节点,客户端与zookeeper断开链接后,该节点依旧存在 * PERSISTENT_SEQUENTIAL - 持久化,并带有序列号 * EPHEMERAL - 临时目录节点,客户端与zookeeper断开链接后,该节点被删除 * EPHEMERAL_SEQUENTIAL - 临时,并带有序列号 */ try { String s = zooKeeper.create("/zoo", "my first data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("建立节点:" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 建立一个二级节点,参数同上 * 须要注意的是,必需要有一级节点才能有二级节点,否则会报错 */ try { String s = zooKeeper.create("/zoo/zoo_1", "my first data_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("建立二级节点:" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查询 ZooKeeper 节点的数据 * 参数解释: * param 1 - znode 名称 (/zoo) * param 2 - 监视者,用于获取监控事件 (MyWatch) * param 3 - Zookeeper 实例信息和数据信息 (stat) * * 注意若是后续须要修改该节点的值,能够在此处记录节点版本 version (非必要操做) */ Integer zooVersion = null; try { MyWatch getDataWatch = new MyWatch(); Stat stat = new Stat(); byte[] data = zooKeeper.getData("/zoo",getDataWatch,stat); System.out.println("查询节点数据:" + new String(data)); //从 stat 中能够获取不少 Zookeeper 实例的信息 System.out.println("查询节点数据 czxid:" + stat.getCzxid()); //zxid zooVersion = stat.getVersion(); //此处获取 /zoo 节点的版本号 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 修改 ZooKeeper 节点的数据 * 参数解释: * param 1 - znode 名称 (/zoo) * param 2 - 节点新数据 (my first data change) * param 3 - 该节点的版本 * * 在成功修改了节点的数据以后,版本号会自动加一 * 若是此时不知道节点的版本,也能够输入 -1,会默认取最新的节点版本去修改 */ try { Stat stat = zooKeeper.setData("/zoo", "my first data change".getBytes(), zooVersion); // zooVersion = -1 System.out.println("修改节点数据 czxid:" + stat.getCzxid()); System.out.println("修改节点数据 version:" + stat.getVersion()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查看 ZooKeeper 节点是否存在 * 参数解释: * param 1 - znode 名称 (/zoo) * param 2 - 监视者,用于获取监控事件 (MyWatch) * * 若是不存在,返回的 stat 为 null */ try { Stat stat = zooKeeper.exists("/zoo_not_exist", new MyWatch()); System.out.println("查看节点是否存在 stat:" + stat); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 删除 ZooKeeper 节点 * 参数解释: * param 1 - znode 名称 (/zoo) * param 2 - 该节点的版本 * * 版本号若是不清楚的话能够填入 -1,和上述同理 * 值得注意的是,若是一个节点下属存在子节点,那么它不能被删除 */ try { zooKeeper.delete("/zoo", -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private static class MyWatch implements Watcher{ public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent); } } }
Curator 是 Netfix 开发的 Zookeeper Client,使用起来更方便,功能更增强大,目前应用更加普遍。使用以前须要在 Maven 中导入依赖:测试
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>
代码:ui
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; public class CuratorTest { public static void main(String[] args) { /** * 建立客户端 * * RetryPolicy 接口是重试策略 */ /** * 指定客户端的重连策略 * * RetryOneTime(int ms) * 休眠必定毫秒数以后从新链接一次 * * RetryForever(int ms) * 和第一种策略的差异是会不断尝试重连 * * RetryNTimes(int times,int ms) * 和第一种策略的差异是,第一个参数指定重连次数,第二个参数指定休眠间隔 * * RetryUntilElapsed(int max_sum_ms,int ms) * 第一个参数指定最大休眠时间,第二个参数指定休眠间隔,若是休眠时间超出了就不会继续重连 * * ExponentialBackoffRetry(int ms,int,int max_ms) * 第一个参数表明最初的重连休眠时间,第二个参数表明最大重连次数,第三个参数表明最大重连休眠时间 * 该策略下重连的休眠时间会随着重连次数的增长而增长,从最初休眠时间一直增长到最大休眠时间 * 最大重连次数必须小于等于 29,超过的状况下会被自动修改为 29 * * [其它策略不一一列举] */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(100,3,1000); /** * 采用 buider 模式建立客户端 */ CuratorFramework client = CuratorFrameworkFactory.builder() //Zookeeper 的地址 .connectString("localhost:2101,localhost:2102,localhost:2103") //session 的过时时间(毫秒) .sessionTimeoutMs(5000) //链接的超时时间(毫秒) .connectionTimeoutMs(5000) //拒绝策略 .retryPolicy(retryPolicy) //设置该客户端可以操做的目录权限,不设置的话默承认以操做所有 //好比此处设置为 zoo,即为该客户端对象操做的节点前面默认会添加 /zoo .namespace("zoo") //完成建立 .build(); //启动客户端 client.start(); /** * 建立节点 */ try { String createReturn = client.create() //节点类型 //PERSISTENT - 持久化目录节点,客户端与zookeeper断开链接后,该节点依旧存在 //PERSISTENT_SEQUENTIAL - 持久化,并带有序列号 //EPHEMERAL - 临时目录节点,客户端与zookeeper断开链接后,该节点被删除 //EPHEMERAL_SEQUENTIAL - 临时,并带有序列号 .withMode(CreateMode.PERSISTENT) //因为 namespace 设置为 zoo,因此此处至关于建立 /zoo/zoo_1 节点 .forPath("/zoo_1", "my first data zoo_1".getBytes()); System.out.println("建立节点:" + createReturn); } catch (Exception e) { e.printStackTrace(); } /** * 查询节点 */ try { Stat stat = client.checkExists() //查询 /zoo/zoo_1 节点 .forPath("/zoo_1"); //若是不存在,stat 为 null System.out.println("查询节点:" + stat); } catch (Exception e) { e.printStackTrace(); } /** * 删除节点 */ try { client.delete() //若是该节点下有子节点,会抛出异常且删除失败 .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } /** * 查询节点的值 */ try { Stat stat = new Stat(); byte[] value = client.getData() //获取节点的 stat .storingStatIn(stat) //查询 /zoo/zoo_1 节点 .forPath("/zoo_1"); System.out.println("查询节点的值:" + new String(value)); } catch (Exception e) { e.printStackTrace(); } /** * 更新节点的值 */ try { Stat stat = client.setData() //设置版本值,此选项非必填 .withVersion(10086) .forPath("/zoo_1", "zoo_1 new data".getBytes()); } catch (Exception e) { e.printStackTrace(); } /** * 获取节点的子节点 */ try { //获取全部子节点的节点名称 List<String> nodes = client.getChildren() .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } } }
Zookeeper 中的分布式锁实现原理很简单,就是多个线程一块儿去建立同一个节点,谁建立成功锁就归谁;使用完以后删除该节点,其它节点再进行一次争抢。Curator 中有一个写好的重入锁 InterProcessMutex,简单封装便可使用:this
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Zookeeper 分布式锁实现 */ public class ZkLock implements Lock{ private InterProcessMutex lock; /** * 让使用者方便运用的构造方法 */ public ZkLock(String zkAddrs){ this(zkAddrs, "/lock_node", "lock_base", 2000, new ExponentialBackoffRetry(1000, 10)); } /** * 核心构造方法,根据传入的参数去构造 lock 对象 * @param zkAddrs Zookeeper 的服务地址 * @param lockNode 各个线程要去争抢建立的 Znode,也就是客户端有使用权限的 namespace * @param baseNode lockNode 的上级 Znode * @param sessionOutTimeMs 过时时间 * @param policy 重连策略 */ public ZkLock(String zkAddrs,String lockNode,String baseNode,int sessionOutTimeMs,RetryPolicy policy){ //有效性验证 if(Objects.isNull(zkAddrs) || zkAddrs.trim().equals("") || Objects.isNull(lockNode) || lockNode.trim().equals("") || Objects.isNull(policy)) throw new RuntimeException(); //经过工厂建立链接 CuratorFrameworkFactory.Builder cfBuilder = CuratorFrameworkFactory.builder() .connectString(zkAddrs) .sessionTimeoutMs(sessionOutTimeMs) .retryPolicy(policy); if(baseNode != null && !baseNode.trim().equals("")) cfBuilder.namespace(baseNode); CuratorFramework cf = cfBuilder.build(); //开启链接 cf.start(); //InterProcessMutex 是 Crator 里自带的一个已经实现好的重入锁 //只要对其进行简单封装便可使用 lock = new InterProcessMutex(cf,lockNode); } /** * 上锁方法,死循环调用 tryLock() 去上锁 */ @Override public void lock() { while (!tryLock()) Thread.yield(); } /** * 尝试获取锁,若是没能获取到会超时后报错 */ @Override public boolean tryLock() { try { lock.acquire(); } catch (Exception e) { return Boolean.FALSE; } return Boolean.TRUE; } /** * 尝试获取锁,若是指定时间内获取不到就返回 false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try { return lock.acquire(time,unit); } catch (Exception e) { return Boolean.FALSE; } } /** * 释放锁,若是报错就会递归去释放 */ @Override public void unlock() { try { lock.release(); } catch (Exception e) { unlock(); } } //忽略 @Override public Condition newCondition() { throw new RuntimeException(); } //忽略 @Override public void lockInterruptibly() throws InterruptedException { lock(); } //测试 public static void main(String[] args) throws Exception { //建立一个要被操做的对象 AtomicInteger count = new AtomicInteger(30); //建立一个线程池 Executor executor = Executors.newFixedThreadPool(10); //建立所对象 Lock lock = new ZkLock("localhost:2101,localhost:2102,localhost:2103"); //for 循环,把任务丢进线程池里 for(int i = 0; i < 30; i++){ executor.execute(()->{ try { //加锁 lock.lock(); //此处开启业务逻辑 //demo 中简单模拟,将 count 对象减一 int a = count.decrementAndGet(); System.out.println(a); } catch (Exception e) { e.printStackTrace(); } finally { try { //释放锁 lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); } } }