考虑7*24小时向外提供服务的系统,不能有单点故障,因而咱们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程当中,从备机选出一台机做为主机的过程,就是Master选举。segmentfault
左边是ZooKeeper集群,右边是3台工做服务器。工做服务器启动时,会去ZooKeeper的Servers节点下建立临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其余服务能够经过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫作服务发现。接着这些服务器会尝试建立Master临时节点,谁建立成功谁就是Master,其余的两台就做为Slave。全部的Work Server必需关注Master节点的删除事件。经过监听Master节点的删除事件,来了解Master服务器是否宕机(建立临时节点的服务器一旦宕机,它所建立的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。服务器
WorkServer对应架构图的WorkServer,是主工做类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient做为调度器来启动和中止WorkServer;网络
/** * 工做服务器信息 */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; private Long cid; private String name; public Long getCid() { return cid; } public void setCid(Long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
/** * 工做服务器 */ public class WorkServer { // 记录服务器状态 private volatile boolean running = false; private ZkClient zkClient; // Master节点对应zookeeper中的节点路径 private static final String MASTER_PATH = "/master"; // 监听Master节点删除事件 private IZkDataListener dataListener; // 记录当前节点的基本信息 private RunningData serverData; // 记录集群中Master节点的基本信息 private RunningData masterData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; public WorkServer(RunningData rd) { this.serverData = rd; // 记录服务器基本信息 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { //takeMaster(); if (masterData != null && masterData.getName().equals(serverData.getName())){ // 本身就是上一轮的Master服务器,则直接抢 takeMaster(); } else { // 不然,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免没必要要的数据迁移开销 delayExector.schedule(new Runnable(){ public void run(){ takeMaster(); } }, delayTime, TimeUnit.SECONDS); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } // 启动服务器 public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; // 订阅Master节点删除事件 zkClient.subscribeDataChanges(MASTER_PATH, dataListener); // 争抢Master权利 takeMaster(); } // 中止服务器 public void stop() throws Exception { if (!running) { throw new Exception("server has stoped"); } running = false; delayExector.shutdown(); // 取消Master节点事件订阅 zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener); // 释放Master权利 releaseMaster(); } // 争抢Master private void takeMaster() { if (!running) return; try { // 尝试建立Master临时节点 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); // 做为演示,咱们让服务器每隔5秒释放一次Master权利 delayExector.schedule(new Runnable() { public void run() { // TODO Auto-generated method stub if (checkMaster()){ releaseMaster(); } } }, 5, TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { // 已被其余服务器建立了 // 读取Master节点信息 RunningData runningData = zkClient.readData(MASTER_PATH, true); if (runningData == null) { takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢 } else { masterData = runningData; } } catch (Exception e) { // ignore; } } // 释放Master权利 private void releaseMaster() { if (checkMaster()) { zkClient.delete(MASTER_PATH); } } // 检测本身是否为Master private boolean checkMaster() { try { RunningData eventData = zkClient.readData(MASTER_PATH); masterData = eventData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; } catch (ZkNoNodeException e) { return false; // 节点不存在,本身确定不是Master了 } catch (ZkInterruptedException e) { return checkMaster(); } catch (ZkException e) { return false; } } }
/** * 调度器 */ public class LeaderSelectorZkClient { //启动的服务个数 private static final int CLIENT_QTY = 10; //zookeeper服务器的地址 private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; public static void main(String[] args) throws Exception { //保存全部zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //保存全部服务的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try { for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟建立10个服务器并启动 //建立zkClient ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //建立serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //建立服务 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
/** * 工做服务器信息 */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; private Long cid; private String name; public Long getCid() { return cid; } public void setCid(Long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
/** * 工做服务器 */ public class WorkServer { // 记录服务器状态 private volatile boolean running = false; private ZkClient zkClient; // Master节点对应zookeeper中的节点路径 private static final String MASTER_PATH = "/master"; // 监听Master节点删除事件 private IZkDataListener dataListener; // 记录当前节点的基本信息 private RunningData serverData; // 记录集群中Master节点的基本信息 private RunningData masterData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; public WorkServer(RunningData rd) { this.serverData = rd; // 记录服务器基本信息 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { //takeMaster(); if (masterData != null && masterData.getName().equals(serverData.getName())){ // 本身就是上一轮的Master服务器,则直接抢 takeMaster(); } else { // 不然,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免没必要要的数据迁移开销 delayExector.schedule(new Runnable(){ public void run(){ takeMaster(); } }, delayTime, TimeUnit.SECONDS); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } // 启动服务器 public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; // 订阅Master节点删除事件 zkClient.subscribeDataChanges(MASTER_PATH, dataListener); // 争抢Master权利 takeMaster(); } // 中止服务器 public void stop() throws Exception { if (!running) { throw new Exception("server has stoped"); } running = false; delayExector.shutdown(); // 取消Master节点事件订阅 zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener); // 释放Master权利 releaseMaster(); } // 争抢Master private void takeMaster() { if (!running) return; try { // 尝试建立Master临时节点 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); // 做为演示,咱们让服务器每隔5秒释放一次Master权利 delayExector.schedule(new Runnable() { public void run() { // TODO Auto-generated method stub if (checkMaster()){ releaseMaster(); } } }, 5, TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { // 已被其余服务器建立了 // 读取Master节点信息 RunningData runningData = zkClient.readData(MASTER_PATH, true); if (runningData == null) { takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢 } else { masterData = runningData; } } catch (Exception e) { // ignore; } } // 释放Master权利 private void releaseMaster() { if (checkMaster()) { zkClient.delete(MASTER_PATH); } } // 检测本身是否为Master private boolean checkMaster() { try { RunningData eventData = zkClient.readData(MASTER_PATH); masterData = eventData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; } catch (ZkNoNodeException e) { return false; // 节点不存在,本身确定不是Master了 } catch (ZkInterruptedException e) { return checkMaster(); } catch (ZkException e) { return false; } } }
/** * 调度器 */ public class LeaderSelectorZkClient { //启动的服务个数 private static final int CLIENT_QTY = 10; //zookeeper服务器的地址 private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; public static void main(String[] args) throws Exception { //保存全部zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //保存全部服务的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try { for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟建立10个服务器并启动 //建立zkClient ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //建立serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //建立服务 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
zookeeper系列(一)zookeeper必知
zookeeper系列(二)实战master选举
zookeeper系列(三)实战数据发布订阅
zookeeper系列(四)实战负载均衡
zookeeper系列(五)实战分布式锁
zookeeper系列(六)实战分布式队列
zookeeper系列(七)实战分布式命名服务
zookeeper系列(八)zookeeper运维架构
考虑7*24小时向外提供服务的系统,不能有单点故障,因而咱们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程当中,从备机选出一台机做为主机的过程,就是Master选举。负载均衡
左边是ZooKeeper集群,右边是3台工做服务器。工做服务器启动时,会去ZooKeeper的Servers节点下建立临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其余服务能够经过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫作服务发现。接着这些服务器会尝试建立Master临时节点,谁建立成功谁就是Master,其余的两台就做为Slave。全部的Work Server必需关注Master节点的删除事件。经过监听Master节点的删除事件,来了解Master服务器是否宕机(建立临时节点的服务器一旦宕机,它所建立的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。运维
WorkServer对应架构图的WorkServer,是主工做类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient做为调度器来启动和中止WorkServer;分布式
/** * 工做服务器信息 */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; private Long cid; private String name; public Long getCid() { return cid; } public void setCid(Long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
/** * 工做服务器 */ public class WorkServer { // 记录服务器状态 private volatile boolean running = false; private ZkClient zkClient; // Master节点对应zookeeper中的节点路径 private static final String MASTER_PATH = "/master"; // 监听Master节点删除事件 private IZkDataListener dataListener; // 记录当前节点的基本信息 private RunningData serverData; // 记录集群中Master节点的基本信息 private RunningData masterData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; public WorkServer(RunningData rd) { this.serverData = rd; // 记录服务器基本信息 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { //takeMaster(); if (masterData != null && masterData.getName().equals(serverData.getName())){ // 本身就是上一轮的Master服务器,则直接抢 takeMaster(); } else { // 不然,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免没必要要的数据迁移开销 delayExector.schedule(new Runnable(){ public void run(){ takeMaster(); } }, delayTime, TimeUnit.SECONDS); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } // 启动服务器 public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; // 订阅Master节点删除事件 zkClient.subscribeDataChanges(MASTER_PATH, dataListener); // 争抢Master权利 takeMaster(); } // 中止服务器 public void stop() throws Exception { if (!running) { throw new Exception("server has stoped"); } running = false; delayExector.shutdown(); // 取消Master节点事件订阅 zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener); // 释放Master权利 releaseMaster(); } // 争抢Master private void takeMaster() { if (!running) return; try { // 尝试建立Master临时节点 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); // 做为演示,咱们让服务器每隔5秒释放一次Master权利 delayExector.schedule(new Runnable() { public void run() { // TODO Auto-generated method stub if (checkMaster()){ releaseMaster(); } } }, 5, TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { // 已被其余服务器建立了 // 读取Master节点信息 RunningData runningData = zkClient.readData(MASTER_PATH, true); if (runningData == null) { takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢 } else { masterData = runningData; } } catch (Exception e) { // ignore; } } // 释放Master权利 private void releaseMaster() { if (checkMaster()) { zkClient.delete(MASTER_PATH); } } // 检测本身是否为Master private boolean checkMaster() { try { RunningData eventData = zkClient.readData(MASTER_PATH); masterData = eventData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; } catch (ZkNoNodeException e) { return false; // 节点不存在,本身确定不是Master了 } catch (ZkInterruptedException e) { return checkMaster(); } catch (ZkException e) { return false; } } }
/** * 调度器 */ public class LeaderSelectorZkClient { //启动的服务个数 private static final int CLIENT_QTY = 10; //zookeeper服务器的地址 private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; public static void main(String[] args) throws Exception { //保存全部zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //保存全部服务的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try { for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟建立10个服务器并启动 //建立zkClient ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //建立serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //建立服务 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }