zookeeper -- 第八章 zk开源客户端 Curator介绍 (上)

一、原生API的不足

一、链接的建立是异步的,须要开发人员自行编码实现等待java

二、链接没有自动的超时重连机制apache

三、ZK自己不提供序列化机制,须要开发人员自行指定,从而实现数据的序列化和反序列化服务器

四、Watcher注册一次只会生效一次,须要不断的重复注册session

五、Watcher的使用方式不符合java自己的术语,若是采用监听方式,更容易理解dom

六、不支持递归建立树形节点异步

二、开源客户端---Curator介绍

Apache基金会得顶级项目之一分布式

1 、解决session会话超时重连源码分析

二、watcher反复注册ui

三、简化开发APIthis

四、遵循Fluent风格Api规范

五、NodeExistsException异常处理

六、共享锁服务,master选举 , 分布式计数器

七、http://curator.apache.org/

三、建立会话

一、使用CuratorFrameworkFactory工厂的两个静态方法建立客户端

public class CuratorClientDemo {

    private CuratorFramework client = null;

    /**
     *
     * public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
     *      return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
     * }
     *
     * public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
     *      return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
     * }
     *
     * connectStrng  逗号分开的ip:port
     * retryPolicy   重试策略,默认四种: Exponential BackoffRetry, RetryNtime, RetryOneTime, RetryUntilElapsed
     * sessionTimeoutMs 会哈超时时间,单位为毫秒,默认60000ms
     * connectionTimeoutMs 链接建立超时时间,单位为毫秒,默认是15000ms
     *
     */
    public CuratorClientDemo() {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
                .sessionTimeoutMs(1).retryPolicy(retryPolicy).build();

        client.start();
    }
}

四、重试策略

一、实现接口RetryPolicy能够自定义重试策略

public static void main(String[] args) {
        /**
         *  retryCount : 已经重试次数,若是第一次重试,此值为0
         * elapsedTimeMs : 重试花费的时间,单位为毫秒
         * sleeper : 相似于Thread.sleep,用于sleep指定时间
         * 返回值 : 若是还会继续重试,则返回true
         */
//        public interface RetryPolicy {
//            boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
//        }
    }

一、ExponentialBackoffRetry

  • ExponentialBackoffRetry
// baseSleepTimeMs : 初始sleep时间
// maxRetries : 最大重试次数
// maxSleepMs : 最大重试时间
// 返回值 : 若是还会继续重试,则返回true

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

当前应该sleep的时间 : baseSleepTimeMs * Math.max(1,random.nextInt(1 << (retryConut +1 )))

二、RetryNTimes

// RetryNTimes
        // RetryNTimes(int n, int sleepMsBetweenRetries
        // 当前应该sleep
        // 参数名 n                        :    最大重试数
        // 参数名 sleepMsBetweenRetries

三、RetryOneTime

// RetryOneTime
        // 只重试一次
        // RetryOneTime(int sleepMsBetweenRetry)
        // 参数名 sleepMsBetweenRetries

四、RetryUntilElapsed

// RetryUntilElapsed
        // RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
        // 重试的时间超过最大时间后,就不在重试
        // 参数名 maxElapsedTimeMs         :    最大重试时间
        // 参数名 sleepMsBetweenRetries

五、Fluent风格的API

  • 定义 : 一种面向对象的开发方式,目的是提升代码的可读性

  • 实现方式 : 经过方法的级联或者方法链的方式实现

  • 举例:

public CuratorClientTest() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		client = CuratorFrameworkFactory.builder()
				.connectString("localhost:2181,localhost:2182")
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("base").build();
		client.start();
	}

六、建立节点

一、代码举例

public void createNode(String path, byte[] data) throws Exception {
        client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(path, data);
    }

二、参数说明

  • 一、构建操做包装类(Builder):CreateBuilder create()--- CuratorFramework

  • 二、CreateBuilder

一、creatingParentsIfNeeded // 递归建立父目录

二、withMode(CreateMode mode) // 设置节点属性 好比:CreateMode.PERSISTENT ,若是是递归建立,建立模式为临时节点,则只有叶子节点是临时界定啊,非叶子节点都为持久化节点

三、withACL(List aclList) // 设置ACL

四、forPath(String path) // 指定路径

七、删除节点

一、名词解释

  • 构建操做包装类(Builder):DeleteBuilder delete() -----CuratorFramework

  • DeleteBuilder

一、withVersion (int version) // 特定版本号

二、guaranteed() // 确保节点被删除

三、forPath(String path) // 指定路径

四、deletingChildrenIfNeeded() // 递归删除全部子节点

关于 guaranteed:

Solves edge cases where an operation may succeed on the server but connection failure occurs before a response can be successfully returned to the client

意思是: 解决当某个删除操做在服务器端可能成功,可是此时客户端与服务器端的链接中断,而删除的响 应没有成功返回到客户端 底层的本质:重试

二、源码分析

public Void forPath(String path) throws Exception {
        final String unfixedPath = path;
        path = this.client.fixForNamespace(path);
        if (this.backgrounding.inBackground()) {
            ErrorCallback<String> errorCallback = null;
            // 
            if (this.guaranteed) {
                errorCallback = new ErrorCallback<String>() {
                    public void retriesExhausted(OperationAndData<String> operationAndData) {
                        // 删除失败的集合
                        DeleteBuilderImpl.this.client.getFailedDeleteManager().addFailedDelete(unfixedPath);
                    }
                };
            }

            this.client.processBackgroundOperation(new OperationAndData(this, path, this.backgrounding.getCallback(), errorCallback, this.backgrounding.getContext()), (CuratorEvent)null);
        } else {
            this.pathInForeground(path, unfixedPath);
        }

        return null;
    }

二次执行删除

void addFailedDelete(String path) {
        if (this.debugListener != null) {
            this.debugListener.pathAddedForDelete(path);
        }
        
        // 客户端状态属于启动状态
        if (this.client.getState() == CuratorFrameworkState.STARTED) {
            this.log.debug("Path being added to guaranteed delete set: " + path);

            try {
                // 再次执行删除
                ((ErrorListenerPathable)this.client.delete().guaranteed().inBackground()).forPath(path);
            } catch (Exception var3) {
                ThreadUtils.checkInterrupted(var3);
                this.addFailedDelete(path);
            }
        }

    }

三、关于异步操做 inBackground

从参数看跟zk的原生异步API相同,多了一个线程池,用于执行回调

public interface Backgroundable<T> {
    T inBackground();

    T inBackground(Object var1);

    T inBackground(BackgroundCallback var1);

    T inBackground(BackgroundCallback var1, Object var2);

    T inBackground(BackgroundCallback var1, Executor var2);

    T inBackground(BackgroundCallback var1, Object var2, Executor var3);
}
相关文章
相关标签/搜索