Zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper自己能够以单机模式安装运行,不过它的长处在于经过分布式ZooKeeper集群(一个Leader,多个Follower),基于必定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。node
一、Zookeeper是为别的分布式程序服务的
二、Zookeeper自己就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
三、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统> 一名称服务等
四、虽说能够提供各类服务,可是zookeeper在底层其实只提供了两个功能:
4.1:管理(存储,读取)用户程序提交的数据(相似namenode中存放的metadata)
4.2:为用户程序提供数据节点监听服务apache
Zookeeper集群的角色: Leader 和 follower
只要集群中有半数以上节点存活,集群就能提供服务bash
一、Zookeeper:一个leader,多个follower组成的集群 二、全局数据一致:每一个server保存一份相同的数据副本,client不管链接到哪一个server,数据都是一致的 三、分布式读写,更新请求转发,由leader实施 四、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行 五、数据更新原子性,一次数据更新要么成功,要么失败 六、实时性,在必定时间范围内,client能读到最新数据
数据存储形式服务器
zookeeper中对用户的数据采用kv形式存储app
只是zk有点特别:框架
key:是以路径的形式表示的,那就觉得着,各key之间有父子关系,好比dom
/ 是顶层keyeclipse
用户建的key只能在/ 下做为子节点,好比建一个key: /aa 这个key能够带value数据ssh
也能够建一个key: /bbsocket
也能够建key: /aa/xx
zookeeper中,对每个数据key,称做一个znode
综上所述,zk中的数据存储形式以下:
znode类型
zookeeper中的znode有多种类型:
- PERSISTENT 持久的:建立者就算跟集群断开联系,该类节点也会持久存在与zk集群中
- EPHEMERAL 短暂的:建立者一旦跟集群断开联系,zk就会将这个节点删除
- SEQUENTIAL 带序号的:这类节点,zk会自动拼接上一个序号,并且序号是递增的
组合类型:
PERSISTENT :持久不带序号
EPHEMERAL :短暂不带序号
PERSISTENT 且 SEQUENTIAL :持久且带序号
EPHEMERAL 且 SEQUENTIAL :短暂且带序号
集群选举示意图
解压Zokeeper安装包到apps目录下
tar -zxvf zookeeper-3.4.6.tar.gz -C appscd /root/apps/zookeeper-3.4.6/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改dataDir=/root/zkdata
在后面加上集群的机器:2888是leader和follower通信端口,3888是投票的
server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888对3台节点,都建立目录 mkdir /root/zkdata
对3台节点,在工做目录中生成myid文件,但内容要分别为各自的id: 1,2,3
echo 1 > /root/zkdata/myid
echo 2 > /root/zkdata/myid
echo 3 > /root/zkdata/myid从hdp20-01上scp安装目录到其余两个节点
cd apps
scp -r zookeeper-3.4.6/ hdp-02:$PWD
scp -r zookeeper-3.4.6/ hdp-03:$PWD启动zookeeper集群
zookeeper没有提供自动批量启动脚本,须要手动一台一台地起zookeeper进程
在每一台节点上,运行命令:
cd /root/apps/zookeeper-3.4.6
bin/zkServer.sh start
启动后,用jps应该能看到一个进程:QuorumPeerMain可是,光有进程不表明zk已经正常服务,须要用命令检查状态:
bin/zkServer.sh status
能看到角色模式:为leader或follower,即正常了。本身写个脚本,一键启动
vi zkmanage.sh#!/bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:$1ing....."
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done中止命令:sh zjmanage.sh stop
加个可执行权限:chmod +zkmanage.sh
启动命令:./zkmanage.sh start
可是出现没有Java环境变量问题,修改配置文件
vi zkmanage.sh
修改配置以下#!/bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:$1ing....."
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
donesleep 2
for host in hdp-01 hdp-02 hdp-03
do
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
done
启动集群结果
hdp-01:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hdp-02:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hdp-03:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
Zookeeper的Java客户端操做代码
public class ZookeeperCliDemo { ZooKeeper zk =null; @Before public void init() throws Exception { zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null); } @Test public void testCreate() throws Exception { //参数1:要建立的节点路径;参数2:数据;参数3:访问权限;参数4:节点类型 String create = zk.create("/eclipse", "hello eclipse".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(create); zk.close(); } @Test public void testUpdate() throws Exception { //参数1:节点路径;参数2:数据;参数3:所要修改的版本,-1表示任意版本 zk.setData("/eclipse","我喜欢青青".getBytes(),-1); zk.close(); } @Test public void testGet() throws Exception { //参数1:节点路径;参数2:事件监听;参数3:所要修改的版本,null表示最新版本 byte[] data = zk.getData("/eclipse", false, null); System.out.println(new String(data,"UTF-8")); zk.close(); } @Test public void testListChildren() throws KeeperException, InterruptedException { //参数1:节点路径;参数2:是否要监听 //注意:返回的结果只有子节点的名字,不带全路径 List<String> children = zk.getChildren("/cc", false); for(String child:children){ System.out.println(child); } zk.close(); } @Test public void testRm() throws KeeperException, InterruptedException { zk.delete("/eclipse",-1); zk.close(); } }
Zookeeper监听功能代码
public class ZookeeperWatchDemo { ZooKeeper zk =null; @Before public void init() throws Exception { zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected && watchedEvent.getType() == Event.EventType.NodeDataChanged) { System.out.println("收到事件所发生节点的路径" + watchedEvent.getPath()); System.out.println("收到事件所发生节点的状态" + watchedEvent.getState()); System.out.println("收到事件所发生节点的类型" + watchedEvent.getType()); System.out.println("watch事件通知。。换照片"); try { zk.getData("/mygirls", true, null); } catch (Exception e) { e.printStackTrace(); } }else if(watchedEvent.getState()==Event.KeeperState.SyncConnected && watchedEvent.getType()==Event.EventType.NodeChildrenChanged){ System.out.println("收到事件所发生节点的路径" + watchedEvent.getPath()); System.out.println("收到事件所发生节点的状态" + watchedEvent.getState()); System.out.println("收到事件所发生节点的类型" + watchedEvent.getType()); } } }); } @Test public void testGetWatch() throws Exception { byte[] data = zk.getData("/mygirls",true, null); List<String> children = zk.getChildren("/mygirls", true); System.out.println(new String(data,"UTF-8")); Thread.sleep(Long.MAX_VALUE); } }
Zookeeper开发分布式系统案例代码,动态上下线感知
服务代码
public class TimeQueryServer { ZooKeeper zk=null; public void connectZk()throws Exception{ zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null); } public void registerServerInfo(String hostname,String port)throws Exception{ /** * 先判断注册节点的父节点是否存在,若是不存在,则建立持久节点 */ Stat exists = zk.exists("/servers", false); if(exists==null){ zk.create("/servers",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } /** * 注册服务器数据到zk的约定注册节点下 */ String create = zk.create("/servers/server", (hostname + ":" + port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服务器向zk 注册成功,注册节点为:/servers"+create); } public static void main(String[] args) throws Exception { //1.构造zk链接 TimeQueryServer timeQueryServer = new TimeQueryServer(); timeQueryServer.connectZk(); //2.注册服务器信息 timeQueryServer.registerServerInfo("192.168.150.3","44772"); //3.启动业务线程开始处理业务 new TimeQueryService(44772).start(); } }
public class TimeQueryService extends Thread { int port=0; public TimeQueryService(int port){ this.port=port; } @Override public void run() { try { ServerSocket ss = new ServerSocket(port); System.out.println("业务线程已经绑定端口"+port+"开始接受客户端请求.."); while (true){ Socket sc = ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (Exception e) { e.printStackTrace(); } } }
消费者代码
public class Consumer { //定义一个list用于存放在线的服务器列表 private volatile ArrayList<String>onlineServers=new ArrayList<String>(); ZooKeeper zk=null; public void connectZk()throws Exception{ zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState()==Event.KeeperState.SyncConnected && watchedEvent.getType()==Event.EventType.NodeChildrenChanged){ try{ //事件回调逻辑中,再次查询zk上在线服务器节点便可,查询逻辑中又再次注册子节点变化事件监听 getOnlineServers(); }catch (Exception e){ e.printStackTrace(); } } } }); } //查询在线服务器列表 public void getOnlineServers()throws Exception{ List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<String>(); for (String child:children){ byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo=new String(data); servers.add(serverInfo); } onlineServers=servers; System.out.println("查询了一次zk,当前在线的服务器有:"+servers); } public void setRequest() throws Exception { Random random = new Random(); while (true){ try { int nextInt=random.nextInt(onlineServers.size()); String server=onlineServers.get(nextInt); String hostname=server.split(":")[0]; int port=Integer.parseInt(server.split(":")[1]); System.out.println("本次请求挑选的服务器为:"+server); Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); out.write("hahaha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read=in.read(buf); String s = new String(buf, 0, read); System.out.println("服务器响应时间为:"+s); out.close(); in.close(); socket.close(); Thread.sleep(2000); }catch (Exception e){ } } } public static void main(String[] args) throws Exception { //构造zk链接对象 Consumer consumer = new Consumer(); consumer.connectZk(); //查询在线服务器列表 consumer.getOnlineServers(); //处理业务 consumer.setRequest(); } }
pom
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> </dependencies>
启动多个服务
控制台输出
192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000018
业务线程已经绑定端口44772开始接受客户端请求..
192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000019
业务线程已经绑定端口44773开始接受客户端请求..
192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000020
业务线程已经绑定端口44774开始接受客户端请求..
消费者启动
控制台输出
查询了一次zk,当前在线的服务器有:[192.168.150.3:44773, 192.168.150.3:44772, 192.168.150.3:44774]
本次请求挑选的服务器为:192.168.150.3:44772
服务器响应时间为:Mon Jun 03 20:03:21 CST 2019
本次请求挑选的服务器为:192.168.150.3:44773
服务器响应时间为:Mon Jun 03 20:03:23 CST 2019
本次请求挑选的服务器为:192.168.150.3:44773
服务器响应时间为:Mon Jun 03 20:03:25 CST 2019
本次请求挑选的服务器为:192.168.150.3:44772
服务器响应时间为:Mon Jun 03 20:03:27 CST 2019
下线一个服务后,控制台输出
查询了一次zk,当前在线的服务器有:[192.168.150.3:44773, 192.168.150.3:44772]
本次请求挑选的服务器为:192.168.150.3:44773
服务器响应时间为:Mon Jun 03 20:04:19 CST 2019
本次请求挑选的服务器为:192.168.150.3:44773
服务器响应时间为:Mon Jun 03 20:04:21 CST 2019
本次请求挑选的服务器为:192.168.150.3:44773
服务器响应时间为:Mon Jun 03 20:04:23 CST 2019
本次请求挑选的服务器为:192.168.150.3:44773