zookeeper系列(一)zookeeper必知
zookeeper系列(二)实战master选举
zookeeper系列(三)实战数据发布订阅
zookeeper系列(四)实战负载均衡
zookeeper系列(五)实战分布式锁
zookeeper系列(六)实战分布式队列
zookeeper系列(七)实战分布式命名服务
zookeeper系列(八)zookeeper运维java
在传统的单进程编程中,咱们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。node
分布式环境下,咱们一样须要一个相似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是咱们的分布式队列。算法
zookeeper能够经过顺序节点实现分布式队列。编程
图中左侧表明zookeeper集群,右侧表明消费者和生产者。
生产者经过在queue节点下建立顺序节点来存放数据,消费者经过读取顺序节点来消费数据。segmentfault
offer核心算法流程网络
poll核心算法流程数据结构
/** * 简单分布式队列 */ public class DistributedSimpleQueue<T> { protected final ZkClient zkClient; // queue节点 protected final String root; // 顺序节点前缀 protected static final String Node_NAME = "n_"; public DistributedSimpleQueue(ZkClient zkClient, String root) { this.zkClient = zkClient; this.root = root; } // 判断队列大小 public int size() { return zkClient.getChildren(root).size(); } // 判断队列是否为空 public boolean isEmpty() { return zkClient.getChildren(root).size() == 0; } // 向队列提供数据 public boolean offer(T element) throws Exception{ // 建立顺序节点 String nodeFullPath = root .concat( "/" ).concat( Node_NAME ); try { zkClient.createPersistentSequential(nodeFullPath , element); }catch (ZkNoNodeException e) { zkClient.createPersistent(root); offer(element); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } return true; } // 从队列取数据 public T poll() throws Exception { try { // 获取全部顺序节点 List<String> list = zkClient.getChildren(root); if (list.size() == 0) { return null; } // 排序 Collections.sort(list, new Comparator<String>() { public int compare(String lhs, String rhs) { return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME)); } }); // 循环每一个顺序节点名 for ( String nodeName : list ){ // 构造出顺序节点的完整路径 String nodeFullPath = root.concat("/").concat(nodeName); try { // 读取顺序节点的内容 T node = (T) zkClient.readData(nodeFullPath); // 删除顺序节点 zkClient.delete(nodeFullPath); return node; } catch (ZkNoNodeException e) { // ignore 由其余客户端把这个顺序节点消费掉了 } } return null; } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } private String getNodeNumber(String str, String nodeName) { int index = str.lastIndexOf(nodeName); if (index >= 0) { index += Node_NAME.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } }
public class User implements Serializable { String name; String id; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
public class TestDistributedSimpleQueue { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue"); User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { queue.offer(user1); queue.offer(user2); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } } }
上面实现了一个简单分布式队列,在此基础上,咱们再扩展一个阻塞分布式队列。代码以下:多线程
/** * 阻塞分布式队列 * 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据 */ public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{ public DistributedBlockingQueue(ZkClient zkClient, String root) { super(zkClient, root); } @Override public T poll() throws Exception { while (true){ // 结束在latch上的等待后,再来一次 final CountDownLatch latch = new CountDownLatch(1); final IZkChildListener childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { latch.countDown(); // 队列有变化,结束latch上的等待 } }; zkClient.subscribeChildChanges(root, childListener); try{ T node = super.poll(); // 获取队列数据 if ( node != null ){ return node; } else { latch.await(); // 拿不到队列数据,则在latch上await } } finally { zkClient.unsubscribeChildChanges(root, childListener); } } } }
public class TestDistributedBlockingQueue { public static void main(String[] args) { ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); int delayTime = 5; ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue"); final User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); final User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { delayExector.schedule(new Runnable() { public void run() { try { queue.offer(user1); queue.offer(user2); } catch (Exception e) { e.printStackTrace(); } } }, delayTime , TimeUnit.SECONDS); System.out.println("ready poll!"); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } finally{ delayExector.shutdown(); try { delayExector.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { } } } }
zookeeper系列(一)zookeeper必知
zookeeper系列(二)实战master选举
zookeeper系列(三)实战数据发布订阅
zookeeper系列(四)实战负载均衡
zookeeper系列(五)实战分布式锁
zookeeper系列(六)实战分布式队列
zookeeper系列(七)实战分布式命名服务
zookeeper系列(八)zookeeper运维架构