窃觉得,对于zookeeper这种东西,仅仅知道怎么安装是远远不够的(废话么这不是,,,),至少要对其几个典型的应用场景进行了解,才能比较全面的知道zk究竟能干啥,怎么玩儿,之后的日子里才能知道这货如何能为我所用。因而,有了以下的学习:html
咱们知道zookeeper能够用于搭建高可用服务框架,主要先看如下几个应用场景:
一、 master的选举基本思路和编码实现
二、 数据的发布和订阅
三、 软负载均衡
四、 分布式队列
五、 分布式锁
六、 命名服务java
目前zookeeper经常使用的开发包有zkclient跟curator,后者更为方便,平常开发使用较多。程序员
----------------正文分割线-----------------------------------------------------------算法
master选举数据库
一、使用场景及结构apache
如今不少时候咱们的服务须要7*24小时工做,假如一台机器挂了,咱们但愿能有其它机器顶替它继续工做。此类问题如今多采用master-salve模式,也就是常说的主从模式,正常状况下主机提供服务,备机负责监听主机状态,当主机异常时,能够自动切换到备机继续提供服务(这里有点儿相似于数据库主库跟备库,备机正常状况下只监听,不工做),这个切换过程当中选出下一个主机的过程就是master选举。服务器
对于以上提到的场景,传统的解决方式是采用一个备用节点,这个备用节点按期给当前主节点发送ping包,主节点收到ping包后会向备用节点发送应答ack,当备用节点收到应答,就认为主节点还活着,让它继续提供服务,不然就认为主节点挂掉了,本身将开始行使主节点职责。如图1所示:网络
图1 负载均衡
但这种方式会存在一个隐患,就是网络故障问题。看一下图2:框架
图2
也就是说,咱们的主节点并无挂掉,只是在备用节点ping主节点,请求应答的时候发生网络故障,这样咱们的备用节点一样收不到应答,就会认为主节点挂掉,而后备机会启动本身的master实例。这样就会致使系统中有两个主节点,也就是双master。出现双master之后,咱们的从节点会将它作的事情一部分汇报给主节点,一部分汇报给备用节点,这样服务就乱套了。为了防止这种状况出现,咱们能够考虑采用zookeeper,虽然它不能阻止网络故障的出现,但它能保证同一时刻系统中只存在一个主节点。咱们来看zookeeper是怎么实现的:
在此处,抢主程序是包含在服务程序中,须要程序员来手动写抢主逻辑的,好比当当开源框架elastic-job中,就有关于选主的部分,参见:elastic-job-core/main/java/com/dangdang/ddframe/job/internal/election文件夹下的选主代码。
一点额外的话:zookeeper本身在集群环境下的抢主算法有三种,能够经过配置文件来设定,默认采用FastLeaderElection,不做赘述;此处主要讨论集群环境中,应用程序利用master的特色,本身选主的过程。程序本身选主,每一个人都有本身的一套算法,有采用“最小编号”的,有采用相似“多数投票”的,各有优劣,本文的算法仅做演示理解使用:
结构图:
结构图解释:左侧树状结构为zookeeper集群,右侧为程序服务器。全部的服务器在启动的时候,都会订阅zookeeper中master节点的删除事件,以便在主服务器挂掉的时候进行抢主操做;全部服务器同时会在servers节点下注册一个临时节点(保存本身的基本信息),以便于应用程序读取当前可用的服务器列表。
选主原理介绍:zookeeper的节点有两种类型,持久节点跟临时节点。临时节点有个特性,就是若是注册这个节点的机器失去链接(一般是宕机),那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点做为master,谁注册了这个节点谁就是master),注册的时候,若是发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,做为从机存在。同时,抢主失败的当前服务器须要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操做。从机具体须要去哪里注册服务器列表的临时节点,节点保存什么信息,根据具体的业务不一样自行约定。选主的过程,其实就是简单的争抢在zookeeper注册临时节点的操做,谁注册了约定的临时节点,谁就是master。
ps:本文的例子中,并未用到结构图server节点下的数据。但换一种算法或者业务场景就会用到,算法好比提到的最小编号,主要逻辑是主节点挂掉后,从节点里边编号最小的成为主节点,此时会用到该节点内容。换一种业务场景:集群环境中,有不少任务要处理, 主节点负责接收任务,并根据必定算法将任务分配到不一样的机器上执行;这种状况下,主节点跟从节点的职责也是不一样的,主节点挂掉也会涉及到从节点进行master选举的问题。这种状况下,很显然,做为主节点须要知道当前有多少个从节点还活着,那么此时也会须要用到servers节点下的数据了。
二、编码实现
主要有两个类,WorkServer为主服务类,RunningData用于记录运行数据。由于是简单的demo,咱们只作抢master节点的编码,对于从节点应该去哪里注册服务列表信息,不做编码。
采用zkClient实现,代码以下:
WorkServer类:
1 package mastersalve; 2 3 import org.I0Itec.zkclient.IZkDataListener; 4 import org.I0Itec.zkclient.ZkClient; 5 import org.I0Itec.zkclient.exception.ZkInterruptedException; 6 import org.I0Itec.zkclient.exception.ZkNoNodeException; 7 import org.I0Itec.zkclient.exception.ZkNodeExistsException; 8 import org.apache.zookeeper.CreateMode; 9 10 import java.util.concurrent.Executors; 11 import java.util.concurrent.ScheduledExecutorService; 12 import java.util.concurrent.TimeUnit; 13 14 /** 15 * Created by nevermore on 16/6/22. 16 */ 17 public class WorkServer { 18 19 //客户端状态 20 private volatile boolean running = false; 21 22 private ZkClient zkClient; 23 24 //zk主节点路径 25 public static final String MASTER_PATH = "/master"; 26 27 //监听(用于监听主节点删除事件) 28 private IZkDataListener dataListener; 29 30 //服务器基本信息 31 private RunningData serverData; 32 //主节点基本信息 33 private RunningData masterData; 34 35 //调度器 36 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); 37 //延迟时间5s 38 private int delayTime = 5; 39 40 41 42 public WorkServer(RunningData runningData){ 43 this.serverData = runningData; 44 this.dataListener = new IZkDataListener() { 45 @Override 46 public void handleDataChange(String s, Object o) throws Exception { 47 48 } 49 50 @Override 51 public void handleDataDeleted(String s) throws Exception { 52 //takeMaster(); 53 54 if(masterData != null && masterData.getName().equals(serverData.getName())){//若以前master为本机,则当即抢主,不然延迟5秒抢主(防止小故障引发的抢主可能致使的网络数据风暴) 55 takeMaster(); 56 }else{ 57 delayExector.schedule(new Runnable() { 58 @Override 59 public void run() { 60 takeMaster(); 61 } 62 },delayTime, TimeUnit.SECONDS); 63 } 64 65 } 66 }; 67 } 68 69 //启动 70 public void start() throws Exception{ 71 if(running){ 72 throw new Exception("server has startup...."); 73 } 74 running = true; 75 zkClient.subscribeDataChanges(MASTER_PATH,dataListener); 76 takeMaster(); 77 } 78 79 //中止 80 public void stop() throws Exception{ 81 if(!running){ 82 throw new Exception("server has stopped....."); 83 } 84 running = false; 85 delayExector.shutdown(); 86 zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); 87 releaseMaster(); 88 } 89 90 //抢注主节点 91 private void takeMaster(){ 92 if(!running) return ; 93 94 try { 95 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); 96 masterData = serverData; 97 System.out.println(serverData.getName()+" is master"); 98 99 delayExector.schedule(new Runnable() {//测试抢主用,每5s释放一次主节点 100 @Override 101 public void run() { 102 if(checkMaster()){ 103 releaseMaster(); 104 } 105 } 106 },5,TimeUnit.SECONDS); 107 108 109 }catch (ZkNodeExistsException e){//节点已存在 110 RunningData runningData = zkClient.readData(MASTER_PATH,true); 111 if(runningData == null){//读取主节点时,主节点被释放 112 takeMaster(); 113 }else{ 114 masterData = runningData; 115 } 116 } catch (Exception e) { 117 // ignore; 118 } 119 120 } 121 //释放主节点 122 private void releaseMaster(){ 123 if(checkMaster()){ 124 zkClient.delete(MASTER_PATH); 125 } 126 } 127 //检验本身是不是主节点 128 private boolean checkMaster(){ 129 try { 130 RunningData runningData = zkClient.readData(MASTER_PATH); 131 masterData = runningData; 132 if (masterData.getName().equals(serverData.getName())) { 133 return true; 134 } 135 return false; 136 137 }catch (ZkNoNodeException e){//节点不存在 138 return false; 139 }catch (ZkInterruptedException e){//网络中断 140 return checkMaster(); 141 }catch (Exception e){//其它 142 return false; 143 } 144 } 145 146 public void setZkClient(ZkClient zkClient) { 147 this.zkClient = zkClient; 148 } 149 150 public ZkClient getZkClient() { 151 return zkClient; 152 } 153 }
RunningData类:
1 package mastersalve; 2 3 import java.io.Serializable; 4 5 /** 6 * Created by nevermore on 16/6/22. 7 */ 8 public class RunningData implements Serializable { 9 10 private static final long serialVersionUID = 4260577459043203630L; 11 12 13 //服务器id 14 private long cid; 15 //服务器名称 16 private String name; 17 18 19 public long getCid() { 20 return cid; 21 } 22 23 public void setCid(long cid) { 24 this.cid = cid; 25 } 26 27 public String getName() { 28 return name; 29 } 30 31 public void setName(String name) { 32 this.name = name; 33 } 34 }
说明:在实际生产环境中,可能会因为插拔网线等致使网络短时的不稳定,也就是网络抖动。因为正式生产环境中可能server在zk上注册的信息是比较多的,并且server的数量也是比较多的,那么每一次切换主机,每台server要同步的数据量(好比要获取谁是master,当前有哪些salve等信息,具体视业务不一样而定)也是比较大的。那么咱们但愿,这种短期的网络抖动最好不要影响咱们的系统稳定,也就是最好选出来的master仍是原来的机器,那么就能够避免发现master更换后,各个salve由于要同步数据等致使的zk数据网络风暴。因此在WorkServer中,54-63行,咱们抢主的时候,若是以前主机是本机,则当即抢主,不然延迟5s抢主。这样就给原来主机预留出必定时间让其在新一轮选主中占据优点,从而利于环境稳定。
测试代码:
1 package mastersalve; 2 3 import org.I0Itec.zkclient.ZkClient; 4 import org.I0Itec.zkclient.serialize.SerializableSerializer; 5 6 import java.io.BufferedReader; 7 import java.io.InputStreamReader; 8 import java.util.ArrayList; 9 import java.util.List; 10 11 /** 12 * Created by nevermore on 16/6/23. 13 */ 14 public class LeaderSelectorZkClient { 15 16 //启动的服务个数 17 private static final int CLIENT_QTY = 10; 18 //zookeeper服务器的地址 19 private static final String ZOOKEEPER_SERVER = "localhost:2181"; 20 21 22 public static void main(String[] args) throws Exception{ 23 //保存全部zkClient的列表 24 List<ZkClient> clients = new ArrayList<ZkClient>(); 25 //保存全部服务的列表 26 List<WorkServer> workServers = new ArrayList<WorkServer>(); 27 28 try{ 29 for ( int i = 0; i < CLIENT_QTY; ++i ){ 30 //建立zkClient 31 ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); 32 clients.add(client); 33 //建立serverData 34 RunningData runningData = new RunningData(); 35 runningData.setCid(Long.valueOf(i)); 36 runningData.setName("Client #" + i); 37 //建立服务 38 WorkServer workServer = new WorkServer(runningData); 39 workServer.setZkClient(client); 40 41 workServers.add(workServer); 42 workServer.start(); 43 } 44 45 System.out.println("敲回车键退出!\n"); 46 new BufferedReader(new InputStreamReader(System.in)).readLine(); 47 }finally{ 48 System.out.println("Shutting down..."); 49 50 for ( WorkServer workServer : workServers ){ 51 try { 52 workServer.stop(); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 for ( ZkClient client : clients ){ 58 try { 59 client.close(); 60 } catch (Exception e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 } 66 }
两次测试,本地模拟10台server,分别不启用防止网络抖动跟启动防抖动两次测试结果以下:
未启动防抖动:
启用防抖动:
能够看到,未启用的时候,断线后从新选出的主机是随机的,没规律;启用防抖动后,每次选出的master都是id为0的机器。
-----------------------------------------------------------------------------------------------------------------------------
至此,咱们已经经过编码实现了简单的master选举。可是,不知你有没有发现,,,,这个选主过程的代码还真是麻烦啊!
咱们只是作一个demo,其中并未考虑复杂的业务场景,但其中的 监听,异常 等代码的处理仍是让我以为有些头大,怎么办?Curator应运而生!
为了熟悉Apache Curator,接下来,将用curator来实现master选举的demo。