这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理java
<!--dependency--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency>
public class Processor { private final static CuratorFramework client; private final static DistributedDelayQueue<String> queue; static{ ZookeeperConfig config = ZookeeperConfig.getConfig(); // create client client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(), new ExponentialBackoffRetry(3000, 2)); // build queue queue = QueueBuilder.builder(client, new AutoSubmitConsumer(), new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath()) .buildDelayQueue(); // 开启执行计划 enable(); } /** * 生产数据 * * @param id * @param endTime * @throws Exception */ public void producer(String id, Date endTime) throws Exception { queue.put(id, endTime.getTime()); } private static void enable(){ try { client.start(); queue.start(); } catch (Exception e) { logger.error("enable queue fail, exception:{}", e); } } } // Serializer class AutoSubmitQueueSerializer implements QueueSerializer<String> { @Override public byte[] serialize(String s) { return s.getBytes("utf-8"); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } } // consumer AutoSubmitConsumer implements QueueConsumer<String> { @Override public void consumeMessage(String id) { logger.info("consumeMessage, :{}", id); // service processor. logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id); } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { } }
是临时节点仍是持久化节点,若是基于内存的话客户端或者服务端挂了之后就会存在数据丢失的问题? 是否会从新排序,zk是按照请求的时间前后顺序写入的,那么curator是怎么监听到期时间的呢?apache
针对第一点,咱们关闭zookeeper
服务端和客户端后从新启动后以前的节点还存在因此是持久化节点安全
经过客户端工具链接zookeeper
发现并不会每次请求的时候都会从新排序,也就是说可能在client端进行处理的分布式
如下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不肯定,第三部分是节点的序号
ide
// org.apache.curator.framework.recipes.queue.DistributedQueue#start // 部分片断 client.create().creatingParentContainersIfNeeded().forPath(queuePath); if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); // step1 return null; } } ); } // org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop // step1中的代码片断 while ( state.get() == State.STARTED ) { try { ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); currentVersion = data.version; // 诸如: //queue-|2E1D86A3BB6|0000000019 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 // 拿到全部的子节点 List<String> children = Lists.newArrayList(data.children); // 根据过时时间排序 // step6 sortChildren(children); // 排序后 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 //queue-|2E1D86A3BB6|0000000019 if ( children.size() > 0 ) { //获取到期时间 maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) continue; } else continue; // 死循环不断轮询是否有知足条件的节点; // 只要有知足条件的节点就将整个排序后的集合往下传递 processChildren(children, currentVersion); // step2 } } // org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren // step2对应的代码片断: private void processChildren(List<String> children, long currentVersion) { final Semaphore processedLatch = new Semaphore(0); final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; for ( final String itemNode : children ) { if ( Thread.currentThread().isInterrupted() ) { processedLatch.release(children.size()); break; } if ( !itemNode.startsWith(QUEUE_ITEM_NAME) ) { processedLatch.release(); continue; } if ( min-- <= 0 ) { if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) { processedLatch.release(children.size()); break; } } // step3 if ( getDelay(itemNode) > 0 ) { processedLatch.release(); continue; } //这里使用了线程池,为了保证每个节点都执行完毕后才返回方法因此使用了信号灯 executor.execute ( new Runnable() { @Override public void run() { try { //是否采用了分布式锁,由于咱们初始化的时候并未使用因此没有用到这里的安全锁,其实是进入到了else中 if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { // 看这里 step4 processNormally(itemNode, ProcessType.NORMAL); } }finally { processedLatch.release(); } } } ); } processedLatch.acquire(children.size()); } // org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String) // 对应step3处的代码片断 protected long getDelay(String itemNode) { return getDelay(itemNode, System.currentTimeMillis()); } private long getDelay(String itemNode, long sortTime) { // 会从key上获取时间戳 // step5 long epoch = getEpoch(itemNode); return epoch - sortTime; // 计算过时时间 } // 对应step5处的代码 private static long getEpoch(String itemNode) { // itemNode -> queue-|时间戳|序号 int index2 = itemNode.lastIndexOf(SEPARATOR); int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1; if ( (index1 > 0) && (index2 > (index1 + 1)) ) { try { String epochStr = itemNode.substring(index1 + 1, index2); return Long.parseLong(epochStr, 16); // 从这里能够知道queue-|这里是16进制的时间戳了|序号| 多是出于key长度的考量吧(更节省内存),用10进制的时间戳会长不少 } } return 0; } // org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren // 会根据延时时间排序 // step6处的代码片断 protected void sortChildren(List<String> children) { final long sortTime = System.currentTimeMillis(); Collections.sort ( children, new Comparator<String>() { @Override public int compare(String o1, String o2) { long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); } // 对应step4处的代码片断 private boolean processNormally(String itemNode, ProcessType type) throws Exception { try { String itemPath = ZKPaths.makePath(queuePath, itemNode); Stat stat = new Stat(); byte[] bytes = null; if ( type == ProcessType.NORMAL ) { // 获取key对应的value bytes = client.getData().storingStatIn(stat).forPath(itemPath); } if ( client.getState() == CuratorFrameworkState.STARTED ) { // 移除节点 client.delete().withVersion(stat.getVersion()).forPath(itemPath); } if ( type == ProcessType.NORMAL ) { //step7 processMessageBytes(itemNode, bytes); } return true; } return false; } //对应step7处代码,会回调咱们的业务代码 private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception { ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; MultiItem<T> items; try { // 根据咱们定义的序列化器序列化 items = ItemSerializer.deserialize(bytes, serializer); } for(;;) { // 省略一部分代码 try { consumer.consumeMessage(item); // 这里就会回调到咱们的业务代码 } } return resultCode; }