一、链接的建立是异步的,须要开发人员自行编码实现等待java
二、链接没有自动的超时重连机制apache
三、ZK自己不提供序列化机制,须要开发人员自行指定,从而实现数据的序列化和反序列化服务器
四、Watcher注册一次只会生效一次,须要不断的重复注册session
五、Watcher的使用方式不符合java自己的术语,若是采用监听方式,更容易理解dom
六、不支持递归建立树形节点异步
Apache基金会得顶级项目之一分布式
1 、解决session会话超时重连源码分析
二、watcher反复注册ui
三、简化开发APIthis
四、遵循Fluent风格Api规范
五、NodeExistsException异常处理
六、共享锁服务,master选举 , 分布式计数器
一、使用CuratorFrameworkFactory工厂的两个静态方法建立客户端
public class CuratorClientDemo { private CuratorFramework client = null; /** * * public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) { * return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy); * } * * public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { * return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build(); * } * * connectStrng 逗号分开的ip:port * retryPolicy 重试策略,默认四种: Exponential BackoffRetry, RetryNtime, RetryOneTime, RetryUntilElapsed * sessionTimeoutMs 会哈超时时间,单位为毫秒,默认60000ms * connectionTimeoutMs 链接建立超时时间,单位为毫秒,默认是15000ms * */ public CuratorClientDemo() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder().connectString("localhost:2181") .sessionTimeoutMs(1).retryPolicy(retryPolicy).build(); client.start(); } }
一、实现接口RetryPolicy能够自定义重试策略
public static void main(String[] args) { /** * retryCount : 已经重试次数,若是第一次重试,此值为0 * elapsedTimeMs : 重试花费的时间,单位为毫秒 * sleeper : 相似于Thread.sleep,用于sleep指定时间 * 返回值 : 若是还会继续重试,则返回true */ // public interface RetryPolicy { // boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper); // } }
// baseSleepTimeMs : 初始sleep时间 // maxRetries : 最大重试次数 // maxSleepMs : 最大重试时间 // 返回值 : 若是还会继续重试,则返回true ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 当前应该sleep的时间 : baseSleepTimeMs * Math.max(1,random.nextInt(1 << (retryConut +1 )))
// RetryNTimes // RetryNTimes(int n, int sleepMsBetweenRetries // 当前应该sleep // 参数名 n : 最大重试数 // 参数名 sleepMsBetweenRetries
// RetryOneTime // 只重试一次 // RetryOneTime(int sleepMsBetweenRetry) // 参数名 sleepMsBetweenRetries
// RetryUntilElapsed // RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) // 重试的时间超过最大时间后,就不在重试 // 参数名 maxElapsedTimeMs : 最大重试时间 // 参数名 sleepMsBetweenRetries
定义 : 一种面向对象的开发方式,目的是提升代码的可读性
实现方式 : 经过方法的级联或者方法链的方式实现
举例:
public CuratorClientTest() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString("localhost:2181,localhost:2182") .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("base").build(); client.start(); }
public void createNode(String path, byte[] data) throws Exception { client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path, data); }
一、构建操做包装类(Builder):CreateBuilder create()--- CuratorFramework
二、CreateBuilder
一、creatingParentsIfNeeded // 递归建立父目录
二、withMode(CreateMode mode) // 设置节点属性 好比:CreateMode.PERSISTENT ,若是是递归建立,建立模式为临时节点,则只有叶子节点是临时界定啊,非叶子节点都为持久化节点
三、withACL(List aclList) // 设置ACL
四、forPath(String path) // 指定路径
构建操做包装类(Builder):DeleteBuilder delete() -----CuratorFramework
DeleteBuilder
一、withVersion (int version) // 特定版本号
二、guaranteed() // 确保节点被删除
三、forPath(String path) // 指定路径
四、deletingChildrenIfNeeded() // 递归删除全部子节点
关于 guaranteed:
Solves edge cases where an operation may succeed on the server but connection failure occurs before a response can be successfully returned to the client
意思是: 解决当某个删除操做在服务器端可能成功,可是此时客户端与服务器端的链接中断,而删除的响 应没有成功返回到客户端 底层的本质:重试
public Void forPath(String path) throws Exception { final String unfixedPath = path; path = this.client.fixForNamespace(path); if (this.backgrounding.inBackground()) { ErrorCallback<String> errorCallback = null; // if (this.guaranteed) { errorCallback = new ErrorCallback<String>() { public void retriesExhausted(OperationAndData<String> operationAndData) { // 删除失败的集合 DeleteBuilderImpl.this.client.getFailedDeleteManager().addFailedDelete(unfixedPath); } }; } this.client.processBackgroundOperation(new OperationAndData(this, path, this.backgrounding.getCallback(), errorCallback, this.backgrounding.getContext()), (CuratorEvent)null); } else { this.pathInForeground(path, unfixedPath); } return null; }
二次执行删除
void addFailedDelete(String path) { if (this.debugListener != null) { this.debugListener.pathAddedForDelete(path); } // 客户端状态属于启动状态 if (this.client.getState() == CuratorFrameworkState.STARTED) { this.log.debug("Path being added to guaranteed delete set: " + path); try { // 再次执行删除 ((ErrorListenerPathable)this.client.delete().guaranteed().inBackground()).forPath(path); } catch (Exception var3) { ThreadUtils.checkInterrupted(var3); this.addFailedDelete(path); } } }
从参数看跟zk的原生异步API相同,多了一个线程池,用于执行回调
public interface Backgroundable<T> { T inBackground(); T inBackground(Object var1); T inBackground(BackgroundCallback var1); T inBackground(BackgroundCallback var1, Object var2); T inBackground(BackgroundCallback var1, Executor var2); T inBackground(BackgroundCallback var1, Object var2, Executor var3); }