Zookeeper客户端Curator使用详解
简介
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了不少Zookeeper客户端很是底层的细节开发工做,包括链接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的由来是比较有趣的,下面的片断摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最先起源于雅虎的研究院的一个研究小组。在当时,研究人员发现,在雅虎内部不少大型的系统须要依赖一个相似的系统进行分布式协调,可是这些系统每每存在分布式单点问题。因此雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到不少项目都是用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师但愿给这个项目也取一个动物的名字。时任研究院的首席科学家Raghu Ramakrishnan开玩笑说:再这样下去,咱们这儿就变成动物园了。此话一出,你们纷纷表示就叫动物园管理员吧——由于各个以动物命名的分布式组件放在一块儿,雅虎的整个分布式系统看上去就像一个大型的动物园了,而Zookeeper正好用来进行分布式环境的协调——因而,Zookeeper的名字由此诞生了。java
Curator无疑是Zookeeper客户端中的瑞士军刀,它译做"馆长"或者''管理者'',不知道是否是开发小组有意而为之,笔者猜想有可能这样命名的缘由是说明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操做,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,若是跨版本会有兼容性问题,颇有可能致使节点操做失败):node
Curator的基本Api
建立会话
1.使用静态工程方法建立客户端
一个例子以下:git
newClient静态工厂方法包含四个主要参数:github
参数名
说明
connectionString
服务器列表,格式host1:port1,host2:port2,...
retryPolicy
重试策略,内建有四种重试策略,也能够自行实现RetryPolicy接口
sessionTimeoutMs
会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs
链接建立超时时间,单位毫秒,默认60000ms
2.使用Fluent风格的Api建立会话
核心参数变为流式设置,一个列子以下:算法
3.建立包含隔离命名空间的会话
为了实现不一样的Zookeeper业务之间的隔离,须要为每一个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操做都是基于该目录进行的。经过设置Chroot能够将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不一样应用之间的相互隔离十分有意义。数据库
启动客户端
当建立会话成功,获得client的实例而后能够直接调用其start( )方法:apache
数据节点操做
建立数据节点
Zookeeper的节点建立模式:api
- PERSISTENT:持久化
- PERSISTENT_SEQUENTIAL:持久化而且带序列号
- EPHEMERAL:临时
- EPHEMERAL_SEQUENTIAL:临时而且带序列号
建立一个节点,初始内容为空 缓存
注意:若是没有设置节点属性,节点建立模式默认为持久化节点,内容默认为空服务器
建立一个节点,附带初始化内容
建立一个节点,指定建立模式(临时节点),内容为空
建立一个节点,指定建立模式(临时节点),附带初始化内容
建立一个节点,指定建立模式(临时节点),附带初始化内容,而且自动递归建立父节点
这个creatingParentContainersIfNeeded()接口很是有用,由于通常状况开发人员在建立一个子节点必须判断它的父节点是否存在,若是不存在直接建立会抛出NoNodeException,使用creatingParentContainersIfNeeded()以后Curator可以自动递归建立全部所需的父节点。
删除数据节点
删除一个节点
注意,此方法只能删除叶子节点,不然会抛出异常。
删除一个节点,而且递归删除其全部的子节点
删除一个节点,强制指定版本进行删除
删除一个节点,强制保证删除
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操做,直到删除节点成功。
注意:上面的多个流式接口是能够自由组合的,例如:
读取数据节点数据
读取一个节点的数据内容
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat
更新数据节点数据
更新一个节点的数据内容
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新
检查节点是否存在
注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操做. 能够调用额外的方法(监控或者后台处理)并在最后调用forPath( )指定要操做的ZNode
获取某个节点的全部子节点路径
注意:该方法的返回值为List<String>,得到ZNode的子节点Path列表。 能够调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操做的父ZNode
事务
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 能够复合create, setData, check, and/or delete 等操做而后调用commit()做为一个原子操做提交。一个例子以下:
异步接口
上面提到的建立、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用以后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型
对应CuratorFramework实例的方法
CREATE
#create()
DELETE
#delete()
EXISTS
#checkExists()
GET_DATA
#getData()
SET_DATA
#setData()
CHILDREN
#getChildren()
SYNC
#sync(String,Object)
GET_ACL
#getACL()
SET_ACL
#setACL()
WATCHED
#Watcher(Watcher)
CLOSING
#close()
响应码(#getResultCode())
响应码
意义
0
OK,即调用成功
-4
ConnectionLoss,即客户端与服务端断开链接
-110
NodeExists,即节点已经存在
-112
SessionExpired,即会话过时
一个异步建立节点的例子以下:
注意:若是#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
Curator食谱(高级特性)
提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨
重要提醒:强烈推荐使用ConnectionStateListener监控链接的状态,当链接状态为LOST,curator-recipes下的全部Api将会失效或者过时,尽管后面全部的例子都没有使用到ConnectionStateListener。
缓存
Zookeeper原生支持经过注册Watcher来进行事件监听,可是开发者须要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,能够看做是对事件监听的本地缓存视图,可以自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
Path Cache
Path Cache用来监控一个ZNode的子节点. 当一个子节点增长, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将经过PathChildrenCacheListener通知。
实际使用时会涉及到四个类:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
经过下面的构造函数建立Path Cache:
想使用cache,必须调用它的方法,使用完后调用方法。 能够设置StartMode来实现启动的模式,
StartMode有下面几种:
- NORMAL:正常初始化。
- BUILD_INITIAL_CACHE:在调用以前会调用。
- POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
能够增长listener监听缓存的变化。
方法返回一个对象,能够遍历全部的子节点。
设置/更新、移除实际上是使用client (CuratorFramework)来操做, 不经过PathChildrenCache操做:
注意:若是new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
注意:示例中的Thread.sleep(10)能够注释掉,可是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!
Node Cache
Node Cache与Path Cache相似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:
- - Node Cache实现类
- - 节点监听器
- - 节点数据
注意:使用cache,依然要调用它的方法,使用完后调用方法。
getCurrentData()将获得节点当前的状态,经过它的状态能够获得当前的值。
注意:示例中的Thread.sleep(10)能够注释,可是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!
注意:NodeCache只能监听一个节点的状态变化。
Tree Cache
Tree Cache能够监控整个树上的全部节点,相似于PathCache和NodeCache的组合,主要涉及到下面四个类:
- TreeCache - Tree Cache实现类
- TreeCacheListener - 监听器类
- TreeCacheEvent - 触发的事件类
- ChildData - 节点数据
注意:在此示例中没有使用Thread.sleep(10),可是事件触发次数也是正常的。
注意:TreeCache在初始化(调用方法)的时候会回调实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时颇有可能致使空指针异常,这里应该主动处理并避免这种状况。
Leader选举
在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程做为组织者,将任务分发给各节点。 在任务开始前, 哪一个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每一个节点最终会获得一个惟一的节点做为任务leader. 除此以外, 选举还常常会发生在leader意外宕机的状况下,新的leader要被选举出来。
在zookeeper集群中,leader负责写操做,而后经过Zab协议实现follower的同步,leader或者follower均可以处理读操做。
Curator 有两种leader选举的recipe,分别是LeaderSelector和LeaderLatch。
前者是全部存活的客户端不间断的轮流作Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉从新触发选举,不然不会交出领导权。某党?
LeaderLatch
LeaderLatch有两个构造函数:
LeaderLatch的启动:
leaderLatch.start( );
一旦启动,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,而后其中一个最终会被选举为leader,能够经过方法查看LeaderLatch实例是否leader:
leaderLatch.hasLeadership( ); //返回true说明当前实例是leader
相似JDK的CountDownLatch, LeaderLatch在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用方法。 若是它是leader,会释放leadership, 其它的参与者将会选举一个leader。
异常处理: LeaderLatch实例能够增长ConnectionStateListener来监听网络链接问题。 当 SUSPENDED 或 LOST 时, leader再也不认为本身仍是leader。当LOST后链接重连后RECONNECTED,LeaderLatch会删除先前的ZNode而后从新建立一个。LeaderLatch用户必须考虑致使leadership丢失的链接问题。 强烈推荐你使用ConnectionStateListener。
一个LeaderLatch的使用例子:
能够添加test module的依赖方便进行测试,不须要启动真实的zookeeper服务端:
首先咱们建立了10个LeaderLatch,启动后它们中的一个会被选举为leader。 由于选举会花费一些时间,start后并不能立刻就获得leader。
经过查看本身是不是leader, 若是是的话返回true。
能够经过能够获得当前的leader的ID。
只能经过释放当前的领导权。
是一个阻塞方法, 尝试获取leader地位,可是未必能上位。
LeaderSelector
LeaderSelector使用的时候主要涉及下面几个类:
- LeaderSelector
- LeaderSelectorListener
- LeaderSelectorListenerAdapter
- CancelLeadershipException
核心类是LeaderSelector,它的构造函数以下:
相似LeaderLatch,LeaderSelector必须: 一旦启动,当实例取得领导权时你的listener的方法被调用。而takeLeadership()方法只有领导权被释放时才返回。 当你再也不使用LeaderSelector实例时,应该调用它的close方法。
异常处理 LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须当心链接状态的改变。若是实例成为leader, 它应该响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时, 实例必须假定在从新链接成功以前它可能再也不是leader了。 若是LOST状态出现, 实例再也不是leader, takeLeadership方法返回。
重要: 推荐处理方式是当收到SUSPENDED 或 LOST时抛出CancelLeadershipException异常.。这会致使LeaderSelector实例中断并取消执行takeLeadership方法的异常.。这很是重要, 你必须考虑扩展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推荐的处理逻辑。
下面的一个例子摘抄自官方:
你能够在takeLeadership进行任务的分配等等,而且不要返回,若是你想要要此实例一直是leader的话能够加一个死循环。调用 保证在此实例释放领导权以后还可能得到领导权。 在这里咱们使用AtomicInteger来记录此client得到领导权的次数, 它是”fair”, 每一个client有平等的机会得到领导权。
对比可知,LeaderLatch必须调用方法才会释放领导权,而对于LeaderSelector,经过能够对领导权进行控制, 在适当的时候释放领导权,这样每一个节点都有可能得到领导权。从而,LeaderSelector具备更好的灵活性和可控性,建议有LeaderElection应用场景下优先使用LeaderSelector。
分布式锁
提醒:
1.推荐使用ConnectionStateListener监控链接的状态,由于当链接LOST时你再也不拥有锁
2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
可重入共享锁—Shared Reentrant Lock
Shared意味着锁是全局可见的, 客户端均可以请求锁。 Reentrant和JDK的ReentrantLock相似,便可重入, 意味着同一个客户端在拥有锁的同时,能够屡次获取,不会被阻塞。 它是由类来实现。 它的构造函数为:
经过得到锁,并提供超时机制:
经过方法释放锁。 InterProcessMutex 实例能够重用。
Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法:
若是你请求撤销当前的锁, 调用方法,注意锁释放时将会回调。
二次提醒:错误处理 仍是强烈推荐你使用处理链接状态的改变。 当链接LOST时你再也不拥有锁。
首先让咱们建立一个模拟的共享资源, 这个资源指望只能单线程的访问,不然会有并发问题。
而后建立一个类, 它负责请求锁, 使用资源,释放锁这样一个完整的访问过程。
代码也很简单,生成10个client, 每一个client重复执行10次 请求锁–访问资源–释放锁的过程。每一个client都在独立的线程中。 结果能够看到,锁是随机的被每一个实例排他性的使用。
既然是可重用的,你能够在一个线程中屡次调用,在线程拥有锁时它老是返回true。
你不该该在多个线程中用同一个, 你能够在每一个线程中都生成一个新的InterProcessMutex实例,它们的path都同样,这样它们能够共享同一个锁。
不可重入共享锁—Shared Lock
这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。这个类是,使用方法和相似
运行后发现,有且只有一个client成功获取第一个锁(第一个方法返回true),而后它本身阻塞在第二个方法,获取第二个锁超时;其余全部的客户端都阻塞在第一个方法超时而且抛出异常。
这样也就验证了实现的锁是不可重入的。
可重入读写锁—Shared Reentrant Read Write Lock
相似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操做,另一个负责写操做。读操做在写锁没被使用时可同时由多个进程使用,而写锁在使用时不容许读(阻塞)。
此锁是可重入的。一个拥有写锁的线程可重入读锁,可是读锁却不能进入写锁。这也意味着写锁能够降级成读锁, 好比请求写锁 --->请求读锁--->释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。
可重入读写锁主要由两个类实现:、。使用时首先建立一个实例,而后再根据你的需求获得读锁或者写锁,读写锁的类型是。
信号量—Shared Semaphore
一个计数的信号量相似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Curator中称之为租约(Lease)。 有两种方式能够决定semaphore的最大租约数。第一种方式是用户给定path而且指定最大LeaseSize。第二种方式用户给定path而且使用类。若是不使用SharedCountReader, 必须保证全部实例在多进程中使用相同的(最大)租约数量,不然有可能出现A进程中的实例持有最大租约数量为10,可是在B进程中持有的最大租约数量为20,此时租约的意义就失效了。
此次调用会返回一个租约对象。 客户端必须在finally中close这些租约对象,不然这些租约会丢失掉。 可是, 可是,若是客户端session因为某种缘由好比crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端能够继续使用这些租约。 租约还能够经过下面的方式返还:
注意你能够一次性请求多个租约,若是Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
Shared Semaphore使用的主要类包括下面几个:
-
首先咱们先得到了5个租约, 最后咱们把它还给了semaphore。 接着请求了一个租约,由于semaphore还有5个租约,因此请求能够知足,返回一个租约,还剩4个租约。 而后再请求一个租约,由于租约不够,阻塞到超时,仍是没能知足,返回结果为null(租约不足会阻塞到超时,而后返回null,不会主动抛出异常;若是不设置超时时间,会一致阻塞)。
上面说讲的锁都是公平锁(fair)。 总ZooKeeper的角度看, 每一个客户端都按照请求的顺序得到锁,不存在非公平的抢占的状况。
多共享锁对象 —Multi Shared Lock
Multi Shared Lock是一个锁的容器。 当调用, 全部的锁都会被,若是请求失败,全部的锁都会被release。 一样调用release时全部的锁都被release(失败被忽略)。 基本上,它就是组锁的表明,在它上面的请求释放操做都会传递给它包含的全部的锁。
主要涉及两个类:
它的构造函数须要包含的锁的集合,或者一组ZooKeeper的path。
用法和Shared Lock相同。
新建一个, 包含一个重入锁和一个非重入锁。 调用后能够看到线程同时拥有了这两个锁。 调用看到这两个锁都被释放了。
最后再重申一次, 强烈推荐使用ConnectionStateListener监控链接的状态,当链接状态为LOST,锁将会丢失。
分布式计数器
顾名思义,计数器是用来计数的, 利用ZooKeeper能够实现一个集群共享的计数器。 只要使用相同的path就能够获得最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数(),一个用long来计数()。
分布式int计数器—SharedCount
这个类使用int类型来计数。 主要涉及三个类。
- SharedCount
- SharedCountReader
- SharedCountListener
表明计数器, 能够为它增长一个,当计数器改变时此Listener能够监听到改变的事件,而能够读取到最新的值, 包括字面值和带版本信息的值VersionedValue。
在这个例子中,咱们使用来监听计数值(方法来添加SharedCountListener )。 任意的SharedCount, 只要使用相同的path,均可以获得这个计数值。 而后咱们使用5个线程为计数值增长一个10之内的随机数。相同的path的SharedCount对计数值进行更改,将会回调给的SharedCountListener。
这里咱们使用去设置计数器。 第一个参数提供当前的VersionedValue,若是期间其它client更新了此计数值, 你的更新可能不成功, 可是这时你的client更新了最新的值,因此失败了你能够尝试再更新一次。 而是强制更新计数器的值。
注意计数器必须,使用完以后必须调用关闭它。
强烈推荐使用。 在本例中扩展。
分布式long计数器—DistributedAtomicLong
再看一个Long类型的计数器。 除了计数的范围比大了以外, 它首先尝试使用乐观锁的方式设置计数器, 若是不成功(好比期间计数器已经被其它client更新了), 它使用方式来更新计数值。
能够从它的内部实现中看出:
此计数器有一系列的操做:
- get(): 获取当前值
- increment(): 加一
- decrement(): 减一
- add(): 增长特定的值
- subtract(): 减去特定的值
- trySet(): 尝试设置计数值
- forceSet(): 强制设置计数值
你必须检查返回结果的, 它表明此操做是否成功。 若是操做成功, 表明操做前的值, 表明操做后的值。
分布式队列
使用Curator也能够简化Ephemeral Node (临时节点)的操做。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTS_EQUENTIAL节点, 能够保证放入到队列中的项目是按照顺序排队的。 若是单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特色。 若是你严格要求顺序,你就的使用单一的消费者,可使用Leader选举只让Leader做为惟一的消费者。
可是, 根据Netflix的Curator做者所说, ZooKeeper真心不适合作Queue,或者说ZK没有实现一个好的Queue,详细内容能够看 Tech Note 4, 缘由有五:
- ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,很是的大。
- 若是有不少节点,ZK启动时至关的慢。 而使用queue会致使好多ZNode. 你须要显著增大 initLimit 和 syncLimit.
- ZNode很大的时候很难清理。Netflix不得不建立了一个专门的程序作这事。
- 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得很差
- ZK的数据库彻底放在内存中。 大量的Queue意味着会占用不少的内存空间。
尽管如此, Curator仍是建立了各类Queue的实现。 若是Queue的数据量不太多,数据量不太大的状况下,酌情考虑,仍是可使用的。
分布式队列—DistributedQueue
DistributedQueue是最普通的一种队列。 它设计如下四个类:
- QueueBuilder - 建立队列使用QueueBuilder,它也是其它队列的建立类
- QueueConsumer - 队列中的消息消费者接口
- QueueSerializer - 队列消息序列化和反序列化接口,提供了对队列中的对象的序列化和反序列化
- DistributedQueue - 队列实现类
QueueConsumer是消费者,它能够接收队列的数据。处理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。
正常状况下先将消息从队列中移除,再交给消费者消费。但这是两个步骤,不是原子的。能够调用Builder的lockPath()消费者加锁,当消费者消费数据时持有锁,这样其它消费者不能消费此消息。若是消费失败或者进程死掉,消息能够交给其它进程。这会带来一点性能的损失。最好仍是单消费者模式使用队列。
例子中定义了两个分布式队列和两个消费者,由于PATH是相同的,会存在消费者抢占消费消息的状况。
带Id的分布式队列—DistributedIdQueue
DistributedIdQueue和上面的队列相似,可是能够为队列中的每个元素设置一个ID。 能够经过ID把队列中任意的元素移除。 它涉及几个类:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedQueue
经过下面方法建立:
放入元素时:
移除元素时:
在这个例子中, 有些元素尚未被消费者消费前就移除了,这样消费者不会收到删除的消息。
优先级分布式队列—DistributedPriorityQueue
优先级队列对队列中的元素按照优先级进行排序。 Priority越小, 元素越靠前, 越先被消费掉。 它涉及下面几个类:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedPriorityQueue
经过builder.buildPriorityQueue(minItemsBeforeRefresh)方法建立。 当优先级队列获得元素增删消息时,它会暂停处理当前的元素队列,而后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的队列的最小数量。 主要设置你的程序能够容忍的不排序的最小值。
放入队列时须要指定优先级:
例子:
有时候你可能会有错觉,优先级设置并无起效。那是由于优先级是对于队列积压的元素而言,若是消费速度过快有可能出如今后一个元素入队操做以前前一个元素已经被消费,这种状况下DistributedPriorityQueue会退化为DistributedQueue。
分布式延迟队列—DistributedDelayQueue
JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了相似的功能, 元素有个delay值, 消费者隔一段时间才能收到元素。 涉及到下面四个类。
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedDelayQueue
经过下面的语句建立:
放入元素时能够指定:
注意不是离如今的一个时间间隔, 好比20毫秒,而是将来的一个时间戳,如 System.currentTimeMillis() + 10秒。 若是delayUntilEpoch的时间已通过去,消息会马上被消费者接收。
SimpleDistributedQueue
前面虽然实现了各类队列,可是你注意到没有,这些队列并无实现相似JDK同样的接口。 提供了和JDK基本一致的接口(可是没有实现Queue接口)。 建立很简单:
增长元素:
删除元素:
另外还提供了其它方法:
没有方法, 多了方法。
方法在成功返回以前会被阻塞。 而方法在队列为空时直接返回null。
可是实际上发送了100条消息,消费完第一条以后,后面的消息没法消费,目前没找到缘由。查看一下官方文档推荐的demo使用下面几个Api:
可是实际使用发现仍是存在消费阻塞问题。
分布式屏障—Barrier
分布式Barrier是这样一个类: 它会阻塞全部节点上的等待进程,直到某一个被知足, 而后全部的节点继续进行。
好比赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,全部的赛马都飞奔而出。
DistributedBarrier
类实现了栅栏的功能。 它的构造函数以下:
首先你须要设置栅栏,它将阻塞在它上面等待的线程:
而后须要阻塞的线程调用方法等待放行条件:
当条件知足时,移除栅栏,全部等待的线程将继续执行:
异常处理 DistributedBarrier 会监控链接状态,当链接断掉时方法会抛出异常。
这个例子建立了来设置栅栏和移除栅栏。 咱们建立了5个线程,在此Barrier上等待。 最后移除栅栏后全部的线程才继续执行。
若是你开始不设置栅栏,全部的线程就不会阻塞住。
双栅栏—DistributedDoubleBarrier
双栅栏容许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是。 构造函数为:
是成员数量,当方法被调用时,成员被阻塞,直到全部的成员都调用了。 当方法被调用时,它也阻塞调用线程,直到全部的成员都调用了。 就像百米赛跑比赛, 发令枪响, 全部的运动员开始跑,等全部的运动员跑过终点线,比赛才结束。
DistributedDoubleBarrier会监控链接状态,当链接断掉时和方法会抛出异常。
参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》
《 跟着实例学习ZooKeeper的用法》博客系列
项目仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较方便导航到每一个章节,只是简书不支持,本文的MD原文放在项目的/resources/md目录下,有爱自取,文章用Typora编写,建议用Typora打开)
End on 2017-5-13 13:10.Help yourselves!我是throwable,在广州奋斗,白天上班,晚上和双休不定时加班,晚上有空坚持写下博客。但愿个人文章可以给你带来收获,共勉。
做者:zhrowable连接:http://www.jianshu.com/p/70151fc0ef5d來源:简书著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();client.start();client.create().forPath("path");client.create().forPath("path","init".getBytes());client.create().withMode(CreateMode.EPHEMERAL).forPath("path");client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());client.delete().forPath("path");client.delete().deletingChildrenIfNeeded().forPath("path");client.delete().withVersion(10086).forPath("path");client.delete().guaranteed().forPath("path");client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");client.getData().forPath("path");Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");client.setData().forPath("path","data".getBytes());client.setData().withVersion(10086).forPath("path","data".getBytes());client.checkExists().forPath("path");client.getChildren().forPath("path");client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)startclosestart()rebuild()public void addListener(PathChildrenCacheListener listener)getCurrentData()List<ChildData>public class PathCacheDemo {
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start();
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件类型:" + event.getType());
if (null != event.getData()) {
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
Thread.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
Thread.sleep(10);
client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
Thread.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath("/example/pathCache/test01");
Thread.sleep(10);
client.delete().forPath("/example/pathCache/test02");
Thread.sleep(1000 * 5);
cache.close();
client.close();
System.out.println("OK!");
}
}NodeCacheNodeCacheListenerChildDatastart()close()public class NodeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
final NodeCache cache = new NodeCache(client, PATH);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("节点被删除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
}
}public class TreeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
TreeCache cache = new TreeCache(client, PATH);
TreeCacheListener listener = (client1, event) ->
System.out.println("事件类型:" + event.getType() +
" | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
}
}start()TreeCacheListenerevent.getData().getPath()public LeaderLatch(CuratorFramework client, String latchPath) public LeaderLatch(CuratorFramework client, String latchPath, String id)hasLeadershipclosepublic void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedExceptionpublic class LeaderLatchDemo extends BaseConnectionInfo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
TestingServer server=new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}
});
examples.add(latch);
client.start();
latch.start();
}
Thread.sleep(10000);
LeaderLatch currentLeader = null;
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
currentLeader.close();
Thread.sleep(5000);
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
} finally {
for (LeaderLatch latch : examples) {
if (null != latch.getState())
CloseableUtils.closeQuietly(latch);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
</dependency>hasLeadership.getLeader().getId()closeawaitpublic LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener) public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)startleaderSelector.start();takeLeadership()public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}leaderSelector.autoRequeue();public class LeaderSelectorDemo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectorAdapter> examples = Lists.newArrayList();
TestingServer server = new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
examples.add(selectorAdapter);
client.start();
selectorAdapter.start();
}
System.out.println("Press enter/return to quit\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (LeaderSelectorAdapter exampleClient : examples) {
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
CloseableUtils.closeQuietly(server);
}
}
}close()LeaderSelectorListenerInterProcessMutexpublic InterProcessMutex(CuratorFramework client, String path)acquire()public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()
public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if notrelease()public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listenerattemptRevoke()RevocationListenerpublic static void attemptRevoke(CuratorFramework client,String path) throws Exception Utility to mark a lock for revocation. Assuming that the lock has been registered with a RevocationListener, it will get called and the lock should be released. Note, however, that revocation is cooperative. Parameters: client - the client path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()ConnectionStateListenerpublic class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
// 真实环境中咱们会在这里访问/维护一个共享的资源
//这个例子在使用锁的状况下不会非法并发异常IllegalStateException
//可是在无锁的状况因为sleep了一段时间,很容易抛出异常
if (!inUse.compareAndSet(false, true)) {
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false);
}
}
}InterProcessMutexDemopublic class InterProcessMutexDemo {
private InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
}
}acquire()InterProcessMutexInterProcessMutexInterProcessSemaphoreMutexInterProcessMutexpublic class InterProcessSemaphoreMutexDemo {
private InterProcessSemaphoreMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessSemaphoreMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit))
{
throw new IllegalStateException(clientName + " 不能获得互斥锁");
}
System.out.println(clientName + " 已获取到互斥锁");
if (!lock.acquire(time, unit))
{
throw new IllegalStateException(clientName + " 不能获得互斥锁");
}
System.out.println(clientName + " 再次获取到互斥锁");
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
lock.release(); // 获取锁几回 释放锁也要几回
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
Thread.sleep(Integer.MAX_VALUE);
}
}acquire()acquire()acquire()InterProcessSemaphoreMutexInterProcessReadWriteLockInterProcessMutexInterProcessReadWriteLockInterProcessMutexpublic class ReentrantReadWriteLockDemo {
private final InterProcessReadWriteLock lock;
private final InterProcessMutex readLock;
private final InterProcessMutex writeLock;
private final FakeLimitedResource resource;
private final String clientName;
public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessReadWriteLock(client, lockPath);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void doWork(long time, TimeUnit unit) throws Exception {
// 注意只能先获得写锁再获得读锁,不能反过来!!!
if (!writeLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " 不能获得写锁");
}
System.out.println(clientName + " 已获得写锁");
if (!readLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " 不能获得读锁");
}
System.out.println(clientName + " 已获得读锁");
try {
resource.use(); // 使用资源
Thread.sleep(1000);
} finally {
System.out.println(clientName + " 释放读写锁");
readLock.release();
writeLock.release();
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY ;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
}
}SharedCountReaderacquire()public void returnAll(Collection<Lease> leases) public void returnLease(Lease lease)public Lease acquire() public Collection<Lease> acquire(int qty) public Lease acquire(long time, TimeUnit unit) public Collection<Lease> acquire(int qty, long time, TimeUnit unit)InterProcessSemaphoreV2LeaseSharedCountReaderpublic class InterProcessSemaphoreDemo {
private static final int MAX_LEASE = 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection<Lease> leases = semaphore.acquire(5);
System.out.println("get " + leases.size() + " leases");
Lease lease = semaphore.acquire();
System.out.println("get another lease");
resource.use();
Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("Should timeout and acquire return " + leases2);
System.out.println("return one lease");
semaphore.returnLease(lease);
System.out.println("return another 5 leases");
semaphore.returnAll(leases);
}
}
}acquire()acquire()InterProcessMultiLockInterProcessLockpublic InterProcessMultiLock(List<InterProcessLock> locks) public InterProcessMultiLock(CuratorFramework client, List<String> paths)public class MultiSharedLockDemo {
private static final String PATH1 = "/examples/locks1";
private static final String PATH2 = "/examples/locks2";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("could not acquire the lock");
}
System.out.println("has got all lock");
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); //access resource exclusively
} finally {
System.out.println("releasing the lock");
lock.release(); // always release the lock in a finally block
}
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
}
}
}InterProcessMultiLockacquire()release()SharedCountDistributedAtomicLongSharedCountSharedCountListenerSharedCountReaderpublic class SharedCounterDemo implements SharedCountListener {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
final Random rand = new Random();
SharedCounterDemo example = new SharedCounterDemo();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
SharedCount baseCount = new SharedCount(client, PATH, 0);
baseCount.addListener(example);
baseCount.start();
List<SharedCount> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final SharedCount count = new SharedCount(client, PATH, 0);
examples.add(count);
Callable<Void> task = () -> {
count.start();
Thread.sleep(rand.nextInt(10000));
System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
for (int i = 0; i < QTY; ++i) {
examples.get(i).close();
}
baseCount.close();
}
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
System.out.println("State changed: " + arg1.toString());
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("Counter's value is changed to " + newCount);
}
}baseCountaddListenerbaseCountcount.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))trySetCountsetCountstartcloseConnectionStateListenerSharedCountListenerConnectionStateListenerSharedCountInterProcessMutexDistributedAtomicValue.trySet()AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}succeeded()preValue()postValue()public class DistributedAtomicLongDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
List<DistributedAtomicLong> examples = Lists.newArrayList();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
examples.add(count);
Callable<Void> task = () -> {
try {
AtomicValue<Long> value = count.increment();
System.out.println("succeed: " + value.succeeded());
if (value.succeeded())
System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
} catch (Exception e) {
e.printStackTrace();
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(Integer.MAX_VALUE);
}
}
}public class DistributedQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clientA.start();
CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clientB.start();
DistributedQueue<String> queueA;
QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
queueA = builderA.buildQueue();
queueA.start();
DistributedQueue<String> queueB;
QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
queueB = builderB.buildQueue();
queueB.start();
for (int i = 0; i < 100; i++) {
queueA.put(" test-A-" + i);
Thread.sleep(10);
queueB.put(" test-B-" + i);
}
Thread.sleep(1000 * 10);// 等待消息消费完成
queueB.close();
queueA.close();
clientB.close();
clientA.close();
System.out.println("OK!");
}
/** * 队列消息序列化实现类 */
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
/** * 定义队列消费者 */
private static QueueConsumer<String> createQueueConsumer(final String name) {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("链接状态改变: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("消费消息(" + name + "): " + message);
}
};
}
}builder.buildIdQueue()queue.put(aMessage, messageId);int numberRemoved = queue.remove(messageId);public class DistributedIdQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedIdQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildIdQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put(" test-" + i, "Id" + i);
Thread.sleep((long) (15 * Math.random()));
queue.remove("Id" + i);
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("consume one message: " + message);
}
};
}
}queue.put(aMessage, priority);public class DistributedPriorityQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedPriorityQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildPriorityQueue(0);
queue.start();
for (int i = 0; i < 10; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("test-" + i + " priority:" + priority);
queue.put("test-" + i, priority);
Thread.sleep((long) (50 * Math.random()));
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
Thread.sleep(1000);
System.out.println("consume one message: " + message);
}
};
}
}QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();delayUntilEpochqueue.put(aMessage, delayUntilEpoch);delayUntilEpochpublic class DistributedDelayQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedDelayQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildDelayQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put("test-" + i, System.currentTimeMillis() + 10000);
}
System.out.println(new Date().getTime() + ": already put all items");
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println(new Date().getTime() + ": consume one message: " + message);
}
};
}
}SimpleDistributedQueuepublic SimpleDistributedQueue(CuratorFramework client,String path)public boolean offer(byte[] data) throws Exceptionpublic byte[] take() throws Exceptionpublic byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exceptionaddtaketakepollpublic class SimpleDistributedQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
SimpleDistributedQueue queue;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
queue = new SimpleDistributedQueue(client, PATH);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer, "producer").start();
new Thread(consumer, "consumer").start();
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
public static class Producer implements Runnable {
private SimpleDistributedQueue queue;
public Producer(SimpleDistributedQueue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
boolean flag = queue.offer(("zjc-" + i).getBytes());
if (flag) {
System.out.println("发送一条消息成功:" + "zjc-" + i);
} else {
System.out.println("发送一条消息失败:" + "zjc-" + i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public static class Consumer implements Runnable {
private SimpleDistributedQueue queue;
public Consumer(SimpleDistributedQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
byte[] datas = queue.take();
System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}Creating a SimpleDistributedQueue
public SimpleDistributedQueue(CuratorFramework client, String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue
public boolean offer(byte[] data)
throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue
public byte[] take()
throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methodsDistributedBarrierpublic DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barriersetBarrier();public void waitOnBarrier()removeBarrier();waitOnBarrier()public class DistributedBarrierDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier();
for (int i = 0; i < QTY; ++i) {
final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
final int index = i;
Callable<Void> task = () -> {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " waits on Barrier");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " begins");
return null;
};
service.submit(task);
}
Thread.sleep(10000);
System.out.println("all Barrier instances should wait the condition");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(20000);
}
}
}controlBarrierDistributedDoubleBarrierpublic DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.
Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barriermemberQtyenter()enter()leave()leave()enter()leave()public class DistributedDoubleBarrierDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
final int index = i;
Callable<Void> task = () -> {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " enters");
barrier.enter();
System.out.println("Client #" + index + " begins");
Thread.sleep((long) (3000 * Math.random()));
barrier.leave();
System.out.println("Client #" + index + " left");
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(Integer.MAX_VALUE);
}
}
}
pasting