以前在公司因为业务须要,对zookeeper进行了一些知识点的梳理进行分享,对一些刚刚接触zookeeper的小伙伴来讲,或许能够借鉴一下
java
简介
Zookeeper致力于提供一个高性能、高可用,且具有严格的顺序访问控制能力的分布式协调服务。node
设计目标apache
应用场景服务器
会话(session):客户端与服务端的一次会话链接,本质是TCP长链接,经过会话能够进行心跳检测和数据传输;session
数据节点(znode)数据结构
对于持久节点和临时节点,同一个znode下,节点的名称是惟一的:[center red 20px]
Watcher 事件监听器:客户端能够在节点上注册监听器,当特定的事件发生后,zk会通知到感兴趣的客户端。
EventType: NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChange负载均衡
ACL:Zk采用ACL(access control lists)策略来控制权限
权限类型:create,read,write,delete,admin分布式
ACL:ide
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
public class App { public static void main(String[] args) throws Exception { String connectString = "211.159.174.226:2181"; RetryPolicy retryPolicy = getRetryPolicy(); CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy); client.start(); //增删改查 client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-nodata"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-data", "test-Curator-PERSISTENT-data".getBytes()); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-nodata"); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-data", "/test-Curator-EPHEMERAL-data".getBytes()); for (int i = 0; i < 5; i++) { client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-Curator-PERSISTENT_SEQUENTIAL-nodata"); } byte[] bytes = client.getData().forPath("/test-Curator-PERSISTENT-data"); System.out.println("----------zk节点数据:" + new String(bytes) + "------------"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-listener", "test-listener".getBytes()); final NodeCache nodeCache = new NodeCache(client, "/test-listener"); nodeCache.start(); NodeCacheListener listener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("node changed : " + nodeCache.getCurrentData()); } }; nodeCache.getListenable().addListener(listener); client.setData().forPath("/test-listener", "/test-listener-change".getBytes()); } /** * RetryOneTime: 只重连一次. * RetryNTime: 指定重连的次数N. * RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者连接成功. * ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的 * BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增长了最大重试次数的控制. */ public static RetryPolicy getRetryPolicy() { return new ExponentialBackoffRetry(1000, 3); } }
public class ZookeeperLock { private final String lockPath = "/distributed-lock"; private String connectString; private RetryPolicy retry; private CuratorFramework client; private InterProcessLock interProcessMutex; public void init() throws Exception { connectString = "211.159.174.226:2181"; retry = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); client.start(); //共享可重入锁 interProcessMutex = new InterProcessMutex(client,lockPath); } public void lock(){ try { interProcessMutex.acquire(); } catch (Exception e) { System.out.println("锁失败了,真惨"); } } public void unlock(){ try { interProcessMutex.release(); } catch (Exception e) { System.out.println("释放失败了,更惨"); } } public static void main(String[] args) throws Exception { final ZookeeperLock zookeeperLock = new ZookeeperLock(); zookeeperLock.init(); Executor executor = Executors.newFixedThreadPool(5); for (int i = 0;i<50;i++) { executor.execute(new Runnable() { @Override public void run() { zookeeperLock.lock(); Long time = System.nanoTime(); System.out.println(time); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time); zookeeperLock.unlock(); } }); } while (true){ } } }
ZAB协议所定义的三种节点状态性能
崩溃恢复
消息广播(相似2P提交):
更多文章关注博客:https://www.zplxjj.com和公众号