【学习】026 Zookeeper

什么Zookeeper

Zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper自己能够以单机模式安装运行,不过它的长处在于经过分布式ZooKeeper集群(一个Leader,多个Follower),基于必定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。html

一、zookeeper是为别的分布式程序服务的java

二、Zookeeper自己就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)node

三、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统> 一名称服务等linux

四、虽说能够提供各类服务,可是zookeeper在底层其实只提供了两个功能:程序员

管理(存储,读取)用户程序提交的数据(相似namenode中存放的metadata); 
并为用户程序提供数据节点监听服务;redis

Zookeeper集群机制

Zookeeper集群的角色: Leader 和 follower 
只要集群中有半数以上节点存活,集群就能提供服务算法

Zookeeper特性

一、Zookeeper:一个leader,多个follower组成的集群shell

二、全局数据一致:每一个server保存一份相同的数据副本,client不管链接到哪一个server,数据都是一致的数据库

三、分布式读写,更新请求转发,由leader实施apache

四、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行

五、数据更新原子性,一次数据更新要么成功,要么失败

六、实时性,在必定时间范围内,client能读到最新数据

Zookeeper数据结构

一、层次化的目录结构,命名符合常规文件系统规范(相似文件系统) 

二、每一个节点在zookeeper中叫作znode,而且其有一个惟一的路径标识 

三、节点Znode能够包含数据和子节点(可是EPHEMERAL类型的节点不能有子节点)

节点类型 
a、Znode有两种类型:

短暂(ephemeral)(create -e /app1/test1 “test1” 客户端断开链接zk删除ephemeral类型节点) 
持久(persistent) (create -s /app1/test2 “test2” 客户端断开链接zk不删除persistent类型节点)

b、Znode有四种形式的目录节点(默认是persistent )

PERSISTENT 
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) 
EPHEMERAL 
EPHEMERAL_SEQUENTIAL

c、建立znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护 

d、在分布式系统中,顺序号能够被用于为全部的事件进行全局排序,这样客户端能够经过顺序号推断事件的顺序

Zookeeper应用场景

数据发布与订阅(配置中心)

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就很是适合使用。

负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,一般同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就需要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。

消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的 metaq都是经过zookeeper来作到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,所以metaq在运行过程当中,会把全部broker和对应的分区信息所有注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在经过ZK获取分区列表以后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头至尾循环往复的方式选择一个分区来发送消息。

消费负载均衡: 在消费过程当中,一个消费者会消费一个或多个分区中的消息,可是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

1. 每一个分区针对同一个group只挂载一个消费者。

2. 若是同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。

3. 若是同一个group的消费者数目小于分区数目,则有部分消费者须要额外承担消费任务。

在某个消费者故障或者重启等状况下,其余消费者会感知到这一变化(经过 zookeeper watch消费者列表),而后从新进行负载均衡,保证全部的分区都有消费者进行消费。

命名服务(Naming Service)

命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,经过使用命名服务,客户端应用可以根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体一般能够是集群中的机器,提供的服务地址,远程对象等等——这些咱们均可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。经过调用ZK提供的建立节点的API,可以很容易建立一个全局惟一的path,这个path就能够做为一个名称。

阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来做为其命名服务,维护全局的服务地址列表, 点击这里查看Dubbo开源项目。在Dubbo实现中:

服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入本身的URL地址,这个操做就完成了服务的发布。

服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入本身的URL地址。

注意,全部向ZK上注册的地址都是临时节点,这样就可以保证服务提供者和消费者可以自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下全部提供者和消费者的信息。

分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,可以很好的实现分布式环境下不一样系统之间的通知与协调,实现对数据变动的实时处理。使用方法一般是不一样系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode自己内容及子节点的),其中一个系统update了znode,那么另外一个系统可以收到通知,并做出相应处理

1. 另外一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是经过zk上某个节点关联,大大减小系统耦合。

2. 另外一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工做。管理人员在控制台做的一些操做,其实是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,因而,做出相应的推送任务。

3. 另外一种工做汇报模式:一些相似于任务分发系统,子任务启动后,到zk来注册一个临时节点,而且定时将本身的进度进行汇报(将进度写回这个临时节点),这样任务管理者就可以实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调可以大大下降系统之间的耦合

集群管理与Master选举

1. 集群机器监控:这一般用于那种对集群中机器状态,机器在线率有较高要求的场景,可以快速对集群中机器变化做出响应。这样的场景中,每每有一个监控系统,实时检测集群机器是否存活。过去的作法一般是:监控系统经过某种手段(好比ping)定时检测每一个机器,或者每一个机器本身定时向监控系统汇报“我还活着”。 这种作法可行,可是存在两个比较明显的问题:

1. 集群中机器有变更的时候,牵连修改的东西比较多。

2. 有必定的延时。

利用ZooKeeper有两个特性,就能够实现另外一种集群机器存活性监控系统:

1. 客户端在节点 x 上注册一个Watcher,那么若是 x?的子节点变化了,会通知该客户端。

2. 建立EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过时,那么该节点就会消失。

例如,监控系统在 /clusterServers 节点上注册一个Watcher,之后每动态加机器,那么就往 /clusterServers 下建立一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就可以实时知道机器的增减状况,至于后续处理就是监控系统的业务了。

2. Master选举则是zookeeper中最为经典的应用场景了。

在分布式环境中,相同的业务应用分布在不一样的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),每每只须要让整个集群中的某一台机器进行执行,其他机器能够共享这个结果,这样能够大大减小重复劳动,提升性能,因而这个master选举即是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,可以保证在分布式高并发状况下节点建立的全局惟一性,即:同时有多个客户端请求建立 /currentMaster 节点,最终必定只有一个客户端请求可以建立成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到EPHEMERAL_SEQUENTIAL类型节点的特性了。

上文中提到,全部客户端建立请求,最终只有一个可以建立成功。在这里稍微变化下,就是容许全部请求都可以建立成功,可是得有个建立顺序,因而全部的请求最终在ZK上建立结果的一种可能状况是这样: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器做为Master,若是这个机器挂了,因为他建立的节点会立刻小时,那么以后最小的那个机器就是Master了。

1. 在搜索系统中,若是集群中每一个机器都生成一份全量索引,不只耗时,并且不能保证彼此之间索引数据一致。所以让集群中的Master来进行全量索引的生成,而后同步到集群中其它机器。另外,Master选举的容灾措施是,能够随时进行手动指定master,就是说应用在zk在没法获取master信息时,能够经过好比http方式,向一个地方获取master。

2. 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把本身以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster能够随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会从新选举出一个HMaster来运行,从而避免了HMaster的单点问题

分布式锁

分布式锁,这个主要得益于 ZooKeeper 为咱们保证了数据的强一致性。锁服务能够分为两类,一个是 保持独占,另外一个是 控制时序。

1. 所谓保持独占,就是全部试图来获取这个锁的客户端,最终只有一个能够成功得到这把锁。一般的作法是把 zk 上的一个 znode 看做是一把锁,经过 create znode 的方式来实现。全部客户端都去建立 /distribute_lock 节点,最终成功建立的那个客户端也即拥有了这把锁。

2. 控制时序,就是全部视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。作法和上面基本相似,只是这里 /distributelock 已经预先存在,客户端在它下面建立临时有序节点(这个能够经过节点的属性控制:CreateMode.EPHEMERALSEQUENTIAL 来指定)。Zk 的父节点(/distribute_lock)维持一份 sequence, 保证子节点建立的时序性,从而也造成了每一个客户端的全局时序。

Zookeeper windows环境安装

环境要求:必需要有jdk环境,本次讲课使用jdk1.8

1.安装jdk

2.安装Zookeeper. 在官网http://zookeeper.apache.org/下载zookeeper.我下载的是zookeeper-3.4.6版本。

解压zookeeper-3.4.6至D:\machine\zookeeper-3.4.6.

在D:\machine 新建data及log目录。

3.ZooKeeper的安装模式分为三种,分别为:单机模式(stand-alone)、集群模式和集群伪分布模式。ZooKeeper 单机模式的安装相对比较简单,若是第一次接触ZooKeeper的话,建议安装ZooKeeper单机模式或者集群伪分布模式。

安装单击模式。 至D:\machine\zookeeper-3.4.6\conf 复制 zoo_sample.cfg 并粘贴到当前目录下,命名zoo.cfg.

Zookeeper集群环境搭建(linux)

环境要求:必需要有jdk环境,本次讲课使用jdk1.8

结构

一共三个节点
(zk服务器集群规模不小于3个节点),要求服务器之间系统时间保持一致。

上传zk而且解压
进行解压: tar -zxvf zookeeper-3.4.6.tar.gz
重命名: mv zookeeper-3.4.6 zookeeper

修改zookeeper环境变量

vi /etc/profile

export JAVA_HOME=/opt/jdk1.8.0_71

export ZOOKEEPER_HOME=/usr/local/zookeeper

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH

source /etc/profile

修改zoo_sample.cfg文件

cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg

修改conf: vi zoo.cfg 修改两处
(1)注意同时在zookeeper建立data目录

dataDir=/usr/local/zookeeper/data

(2)最后面添加

server.0=bhz:2888:3888
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888

建立服务器标识
服务器标识配置:
建立文件夹: mkdir data
建立文件myid并填写内容为0: vi
myid (内容为服务器标识 : 0)

复制zookeeper

进行复制zookeeper目录到hadoop01和hadoop02
还有/etc/profile文件
把hadoop0一、 hadoop02中的myid文件里的值修改成1和2
路径(vi /usr/local/zookeeper/data/myid)

启动zookeeper
启动zookeeper:
路径: /usr/local/zookeeper/bin
执行: zkServer.sh start
(注意这里3台机器都要进行启动)
状态: zkServer.sh
status(在三个节点上检验zk的mode,一个leader和俩个follower)

经常使用命令

zkServer.sh status 查询状态

Zookeeper配置文件介绍

# The number of milliseconds of each tick 
tickTime=2000 
# The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/myuser/zooA/data # the port at which the clients will connect clientPort=2181 # ZooKeeper server and its port no. # ZooKeeper ensemble should know about every other machine in the ensemble # specify server id by creating 'myid' file in the dataDir # use hostname instead of IP address for convenient maintenance server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2988:3988 server.3=127.0.0.1:2088:3088 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir # autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature <br> #autopurge.purgeInterval=1 dataLogDir=/home/myuser/zooA/log

tickTime:心跳时间,为了确保链接存在的,以毫秒为单位,最小超时时间为两个心跳时间

initLimit:多少个心跳时间内,容许其余server链接并初始化数据,若是ZooKeeper管理的数据较大,则应相应增大这个值

clientPort:服务的监听端口

dataDir:用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里(注意:一个配置文件只能包含一个dataDir字样,即便它被注释掉了。)

dataLogDir:用于单独设置transaction log的目录,transaction log分离能够避免和普通log还有快照的竞争

syncLimit:多少个tickTime内,容许follower同步,若是follower落后太多,则会被丢弃。

server.A=B:C:D:
A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址
C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口
D是在leader挂掉时专门用来进行选举leader所用

Zookeeper客户端

ZooKeeper命令行工具相似于Linux的shell环境,不过功能确定不及shell啦,可是使用它咱们能够简单的对ZooKeeper进行访问,数据建立,数据修改等操做.  使用 zkCli.sh -server 127.0.0.1:2181 链接到 ZooKeeper 服务,链接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息。

命令行工具的一些简单操做以下:

1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容

2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据

3. 建立文件,并设置初始内容: create /zk "test" 建立一个新的 znode节点“ zk ”以及与它关联的字符串

4. 获取文件内容: get /zk 确认 znode 是否包含咱们所建立的字符串

5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置

6. 删除文件: delete /zk 将刚才建立的 znode 删除

7. 退出客户端: quit

8. 帮助命令: help

Java操做Zookeeper

 Zookeeper说明

建立节点(znode) 方法:
create:
提供了两套建立节点的方法,同步和异步建立节点方式。
同步方式:
参数1,节点路径《名称)
 : InodeName (不容许递归建立节点,也就是说在父节点不存在
的状况下,不容许建立子节点)
参数2,节点内容:
 要求类型是字节数组(也就是说,不支持序列化方式,若是须要实现序
列化,可以使用java相关序列化框架,如HessianKryo框架)
参數3,节点权限:
 使用Ids.OPEN_ACL_UNSAFE开放权限便可。(这个参数通常在权展
没有过高要求的场景下,不必关注)
参数4,节点类型:
 建立节点的类型: CreateMode,提供四种首点象型

PERSISTENT                    #持久化节点
PERSISTENT_SEQUENTIAL        #顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
EPHEMERAL                     #临时节点, 客户端session超时这类节点就会被自动删除
EPHEMERAL_SEQUENTIAL         #临时自动编号节点

Maven依赖信息

     <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

Zookeeper客户端链接

package com.hongmoshui.test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class Test001
{

    // 链接地址
    private static final String ADDRES = "127.0.0.1:2181";
    // session 会话
    private static final int SESSION_OUTTIME = 2000;
    // 信号量,阻塞程序执行,用户等待zookeeper链接成功,发送成功信号,
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException
    {
        ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher()
        {

            public void process(WatchedEvent event)
            {
                // 获取事件状态
                KeeperState keeperState = event.getState();
                // 获取事件类型
                EventType eventType = event.getType();
                if (KeeperState.SyncConnected == keeperState)
                {
                    if (EventType.None == eventType)
                    {
                        countDownLatch.countDown();
                        System.out.println("zk 启动链接...");
                    }
                }
            }
        });
        // 进行阻塞
        countDownLatch.await();
        String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(result);
        zk.close();
    }
}

建立Zookeeper节点信息

        // 1. 建立持久节点,而且容许任何服务器能够操做
        String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("result:" + result);
        // 2. 建立临时节点
        String result = zk.create("/hongmoshui_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("result:" + result);

Watcher

ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperStateEventType两个枚举类,分别表明了通知状态和事件类型,同时定义了事件的回调方法:processWatchedEvent event)。

什么是Watcher接口

同一个事件类型在不一样的通知状态中表明的含义有所不一样,表7-3列举了常见的通知状态和事件类型。

 

7-3 Watcher通知状态与事件类型一览

KeeperState

EventType

触发条件

说明

 

None
-1

客户端与服务端成功创建链接

 

SyncConnected
0

NodeCreated
1

Watcher监听的对应数据节点被建立

 

 

NodeDeleted
2

Watcher监听的对应数据节点被删除

此时客户端和服务器处于链接状态

 

NodeDataChanged
3

Watcher监听的对应数据节点的数据内容发生变动

 

 

NodeChildChanged
4

Wather监听的对应数据节点的子节点列表发生变动

 

Disconnected
0

None
-1

客户端与ZooKeeper服务器断开链接

此时客户端和服务器处于断开链接状态

Expired
-112

Node
-1

会话超时

此时客户端会话失效,一般同时也会受到SessionExpiredException异常

AuthFailed
4

None
-1

一般有两种状况,1:使用错误的schema进行权限检查 2SASL权限检查失败

一般同时也会收到AuthFailedException异常

 7-3中列举了ZooKeeper中最多见的几个通知状态和事件类型。

回调方法process()

process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义以下:

abstract public void process(WatchedEvent event);

这个回调方法的定义很是简单,咱们重点看下方法的参数定义:WatchedEvent

WatchedEvent包含了每个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。

提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,二者表示的是同一个事物,都是对一个服务端事件的封装。不一样的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程当中所需的逻辑对象,而WatcherEvent由于实现了序列化接口,所以能够用于网络传输。

服务端在生成WatchedEvent事件以后,会调用getWrapper方法将本身包装成一个可序列化的WatcherEvent事件,以便经过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就可以解析出完整的服务端事件了。

须要注意的一点是,不管是WatchedEvent仍是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来讲,当/zk-book这个节点的数据发生变动时,服务端会发送给客户端一个“ZNode数据内容变动事件,客户端只可以接收到以下信

Watcher代码

package com.hongmoshui;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZkClientWatcher implements Watcher
{
    // 集群链接地址
    private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
    // 会话超时时间
    private static final int SESSIONTIME = 2000;
    // 信号量,让zk在链接以前等待,链接成功后才能往下走.
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    private static String LOG_MAIN = "【main】 ";

    private ZooKeeper zk;

    public void createConnection(String connectAddres, int sessionTimeOut)
    {
        try
        {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 开始启动链接服务器....");
            countDownLatch.await();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public boolean createPath(String path, String data)
    {
        try
        {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "节点建立成功, Path:" + path + ",data:" + data);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 判断指定节点是否存在
     * 
     * @param path 节点路径
     */
    public Stat exists(String path, boolean needWatch)
    {
        try
        {
            return this.zk.exists(path, needWatch);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            return null;
        }
    }

    public boolean updateNode(String path, String data) throws KeeperException, InterruptedException
    {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }

    public void process(WatchedEvent watchedEvent)
    {

        // 获取事件状态
        KeeperState keeperState = watchedEvent.getState();
        // 获取事件类型
        EventType eventType = watchedEvent.getType();
        // zk 路径
        String path = watchedEvent.getPath();
        System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判断是否创建链接
        if (KeeperState.SyncConnected == keeperState)
        {
            if (EventType.None == eventType)
            {
                // 若是创建创建成功,让后程序往下走
                System.out.println(LOG_MAIN + "zk 创建链接成功!");
                countDownLatch.countDown();
            }
            else if (EventType.NodeCreated == eventType)
            {
                System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
            }
            else if (EventType.NodeDataChanged == eventType)
            {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
            }
            else if (EventType.NodeDeleted == eventType)
            {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
            }

        }
        System.out.println("--------------------------------------------------------");
    }

    public static void main(String[] args) throws KeeperException, InterruptedException
    {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
//        boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
        zkClientWatcher.updateNode("/pa2", "7894561");
    }

}

什么是多线程

多线程为了可以提升应用程序的运行效率,在一个进程中有多条不一样的执行路径,同时并行执行,互不影响。

什么是线程安全

当多个线程同时共享,同一个全局变量或静态变量,作写的操做时,可能会发生数据冲突问题,也就是线程安全问题。可是作读操做是不会发生数据冲突问题。

解决办法

 使用同步代码块或者Lock锁机制,保证在多个线程共享同一个变量只能有一个线程进行操做

什么是Java内存模型

 

共享内存模型指的就是Java内存模型(简称JMM)JMM决定一个线程对共享变量的写入时,能对另外一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存(main memory)中,每一个线程都有一个私有的本地内存(local memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存,写缓冲区,寄存器以及其余的硬件和编译器优化。

从上图来看,线程A与线程B之间如要通讯的话,必需要经历下面2个步骤:

1. 首先,线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2. 而后,线程B到主内存中去读取线程A以前已更新过的共享变量。

下面经过示意图来讲明这两个步骤:

如上图所示,本地内存A和B有主内存中共享变量x的副本。假设初始时,这三个内存中的x值都为0。线程A在执行时,把更新后的x值(假设值为1)临时存放在本身的本地内存A中。当线程A和线程B须要通讯时,线程A首先会把本身本地内存中修改后的x值刷新到主内存中,此时主内存中的x值变为了1。随后,线程B到主内存中去读取线程A更新后的x值,此时线程B的本地内存的x值也变为了1。

从总体来看,这两个步骤实质上是线程A在向线程B发送消息,并且这个通讯过程必需要通过主内存。JMM经过控制主内存与每一个线程的本地内存之间的交互,来为java程序员提供内存可见性保证。

总结:什么是Java内存模型:java内存模型简称jmm,定义了一个线程对另外一个线程可见。共享变量存放在主内存中,每一个线程都有本身的本地内存,当多个线程同时访问一个数据的时候,可能本地内存没有及时刷新到主内存,因此就会发生线程安全问题。

分布式锁解决办法

传统方式生成订单号ID

业务场景

在分布式状况,生成全局订单号ID

生成订单号方案

  1. 使用时间戳
  2. 使用UUID
  3. 推特 (Twitter) 的 Snowflake 算法——用于生成惟一 ID

生成订单类

package com.hongmoshui.distributed;
import java.text.SimpleDateFormat;
import java.util.Date;

//生成订单类
public class OrderNumGenerator
{
    // 全局订单id
    public static int count = 0;

    public String getNumber()
    {
        try
        {
            Thread.sleep(200);
        }
        catch (Exception e)
        {
        }
        SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpt.format(new Date()) + "-" + ++count;
    }
}

使用多线程状况模拟生成订单号

package com.hongmoshui.distributed;

//使用多线程模拟生成订单号
public class OrderService implements Runnable
{
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    public void run()
    {
        getNumber();
    }

    public void getNumber()
    {
        String number = orderNumGenerator.getNumber();
        System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
    }

    public static void main(String[] args)
    {
        System.out.println("####生成惟一订单号###");
        for (int i = 0; i < 100; i++)
        {
            new Thread(new OrderService()).start();
        }

    }
}

多线程生成订单号,线程安全问题解决

使用synchronized或者loca锁

Synchronized同步代码块方式

package com.hongmoshui.distributed;

//使用多线程模拟生成订单号
public class OrderSynchronizedService implements Runnable
{
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    public void run()
    {
        getNumber();
    }

    public void getNumber()
    {
        synchronized (this)
        {
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
        }
    }

    public static void main(String[] args)
    {
        System.out.println("####生成惟一订单号###");
        OrderService orderService = new OrderService();
        for (int i = 0; i < 100; i++)
        {
            new Thread(orderService).start();
        }

    }
}

Lock锁方式

package com.hongmoshui.distributed;

import java.util.concurrent.locks.ReentrantLock;

public class OrderLockService implements Runnable
{
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    // 使用lock锁
    private java.util.concurrent.locks.Lock lock = new ReentrantLock();

    public void run()
    {
        getNumber();
    }

    public void getNumber()
    {
        try
        {
            // synchronized (this) {
            lock.lock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
            // }

        }
        catch (Exception e)
        {

        }
        finally
        {
            lock.unlock();
        }
    }

    public static void main(String[] args)
    {
        System.out.println("####生成惟一订单号###");
        OrderService orderService = new OrderService();
        for (int i = 0; i < 100; i++)
        {
            new Thread(orderService).start();
        }

    }
}

分布式场景下生成订单ID

业务场景

在分布式状况,生成全局订单号ID

产生问题

在分布式(集群)环境下,每台JVM不能实现同步,在分布式场景下使用时间戳生成订单号可能会重复

分布式状况下,怎么解决订单号生成不重复

  1. 使用分布式锁
  2. 提早生成好,订单号,存放在redis取。获取订单号,直接从redis中取。

使用分布式锁生成订单号技术

1.使用数据库实现分布式锁

缺点:性能差、线程出现异常时,容易出现死锁

2.使用redis实现分布式锁

缺点:锁的失效时间难控制、容易产生死锁、非阻塞式、不可重入

3.使用zookeeper实现分布式锁

实现相对简单、可靠性强、使用临时节点,失效时间容易控制

什么是分布式锁

分布式锁通常用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行屡次的状况,咱们能够借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务

使用Zookeeper实现分布式锁

Zookeeper实现分布式锁原理

使用zookeeper建立临时序列节点来实现分布式锁,适用于顺序执行的程序,大致思路就是建立临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成以后此序列节点消失,经过watch来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……

Maven依赖

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

建立Lock接口

package com.hongmoshui.distributed;

public interface Lock
{
    // 获取到锁的资源
    public void getLock();

    // 释放锁
    public void unLock();
}

建立ZookeeperAbstractLock抽象类

package com.hongmoshui.distributed;
import org.I0Itec.zkclient.ZkClient;

//将重复代码写入子类中..
public abstract class ZookeeperAbstractLock implements Lock
{
    // zk链接地址
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    // 建立zk链接
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
    protected static final String PATH = "/lock";

    public void getLock()
    {
        if (tryLock())
        {
            System.out.println("##获取lock锁的资源####");
        }
        else
        {
            // 等待
            waitLock();
            // 从新获取锁资源
            getLock();
        }
    }

    // 获取锁资源
    abstract boolean tryLock();
    // 等待
    abstract void waitLock();
    public void unLock()
    {
        if (zkClient != null)
        {
            zkClient.close();
            System.out.println("释放锁资源...");
        }
    }

}

ZookeeperDistrbuteLock类

package com.hongmoshui.distributed;

import java.util.concurrent.CountDownLatch;

import org.I0Itec.zkclient.IZkDataListener;

public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock
{
    private CountDownLatch countDownLatch = null;

    @Override
    boolean tryLock()
    {
        try
        {
            zkClient.createEphemeral(PATH);
            return true;
        }
        catch (Exception e)
        {
//            e.printStackTrace();
            return false;
        }

    }

    @Override
    void waitLock()
    {
        IZkDataListener izkDataListener = new IZkDataListener()
        {

            public void handleDataDeleted(String path) throws Exception
            {
                // 唤醒被等待的线程
                if (countDownLatch != null)
                {
                    countDownLatch.countDown();
                }
            }

            public void handleDataChange(String path, Object data) throws Exception
            {

            }
        };
        // 注册事件
        zkClient.subscribeDataChanges(PATH, izkDataListener);
        if (zkClient.exists(PATH))
        {
            countDownLatch = new CountDownLatch(1);
            try
            {
                countDownLatch.await();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        // 删除监听
        zkClient.unsubscribeDataChanges(PATH, izkDataListener);
    }

}

使用Zookeeper锁运行效果

package com.hongmoshui.distributed;
import com.hongmoshui.OrderNumGenerator;

public class OrderService implements Runnable
{
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    // 使用lock锁
    // private
    // java.util.concurrent.locks.Lock
    // lock = new ReentrantLock();
    private Lock lock = new ZookeeperDistrbuteLock();

    public void run()
    {
        getNumber();
    }

    public void getNumber()
    {
        try
        {
            lock.getLock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            lock.unLock();
        }
    }

    public static void main(String[] args)
    {
        System.out.println("####生成惟一订单号###");
//        OrderService orderService = new OrderService();
        for (int i = 0; i < 100; i++)
        {
            new Thread(new OrderService()).start();
        }
    }
}

使用Zookeeper实现负载均衡原理

思路

使用Zookeeper实现负载均衡原理,服务器端将启动的服务注册到,zk注册中心上,采用临时节点。客户端从zk节点上获取最新服务节点信息,本地使用负载均衡算法,随机分配服务器。

建立项目工程

Maven依赖

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.8</version>
        </dependency>

建立Server服务端

ZkServerScoekt服务

ServerHandler:

package com.hongmoshui.LoadBalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

//ServerHandler
public class ServerHandler implements Runnable
{
    private Socket socket;

    public ServerHandler(Socket socket)
    {
        this.socket = socket;
    }

    public void run()
    {
        BufferedReader in = null;
        PrintWriter out = null;
        try
        {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true)
            {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Receive : " + body);
                out.println("Hello, " + body);
            }

        }
        catch (Exception e)
        {
            if (in != null)
            {
                try
                {
                    in.close();
                }
                catch (IOException e1)
                {
                    e1.printStackTrace();
                }
            }
            if (out != null)
            {
                out.close();
            }
            if (this.socket != null)
            {
                try
                {
                    this.socket.close();
                }
                catch (IOException e1)
                {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

ZkServerScoekt:

package com.hongmoshui.LoadBalance;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

//##ServerScoekt服务端
public class ZkServerScoekt implements Runnable
{
    private int port = 18080;

    public static void main(String[] args) throws IOException
    {
        int port = 18080;
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    public ZkServerScoekt(int port)
    {
        this.port = port;
    }

    public void run()
    {
        ServerSocket serverSocket = null;
        try
        {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true)
            {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try
            {
                if (serverSocket != null)
                {
                    serverSocket.close();
                }
            }
            catch (Exception e2)
            {

            }
        }
    }

}

ZkServerClient

package com.hongmoshui.LoadBalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class ZkServerClient
{
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args)
    {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true)
        {
            String name;
            try
            {
                name = console.readLine();
                if ("exit".equals(name))
                {
                    System.exit(0);
                }
                client.send(name);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    // 注册全部server
    public static void initServer()
    {
        listServer.clear();
        listServer.add("127.0.0.1:18080");
    }

    // 获取当前server信息
    public static String getServer()
    {
        return listServer.get(0);
    }

    public void send(String name)
    {

        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try
        {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);

            out.println(name);
            while (true)
            {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0)
                {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            if (out != null)
            {
                out.close();
            }
            if (in != null)
            {
                try
                {
                    in.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
            if (socket != null)
            {
                try
                {
                    socket.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
}

改造ZkServerScoekt

package com.hongmoshui.LoadBalance;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import org.I0Itec.zkclient.ZkClient;

public class ZkServerScoekt2 implements Runnable
{
    private static int port = 18081;

    public static void main(String[] args) throws IOException
    {
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    public ZkServerScoekt2(int port)
    {
        this.port = port;
    }

    public void regServer()
    {
        // 向ZooKeeper注册当前服务器
        ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
        String path = "/test/server" + port;
        if (client.exists(path))
            client.delete(path);
        client.createEphemeral(path, "127.0.0.1:" + port);
    }

    public void run()
    {
        ServerSocket serverSocket = null;
        try
        {
            serverSocket = new ServerSocket(port);
            regServer();
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true)
            {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try
            {
                if (serverSocket != null)
                {
                    serverSocket.close();
                }
            }
            catch (Exception e2)
            {

            }
        }
    }

}

改造ZkServerClient

package com.hongmoshui.LoadBalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class ZkServerClient2
{
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args)
    {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true)
        {
            String name;
            try
            {
                name = console.readLine();
                if ("exit".equals(name))
                {
                    System.exit(0);
                }
                client.send(name);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    // 注册全部server
    public static void initServer()
    {
        final String path = "/test";
        final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
        List<String> children = zkClient.getChildren(path);
        listServer.clear();
        for (String p : children)
        {
            listServer.add((String) zkClient.readData(path + "/" + p));
        }
        // 订阅节点变化事件
        zkClient.subscribeChildChanges("/test", new IZkChildListener()
        {

            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
            {
                listServer.clear();
                for (String p : currentChilds)
                {
                    listServer.add((String) zkClient.readData(path + "/" + p));
                }
                System.out.println("####handleChildChange()####listServer:" + listServer.toString());
            }
        });
    }

    // 请求次数
    private static int count = 1;

    // 服务数量
    private static int serverCount = 2;

    // 获取当前server信息
    public static String getServer()
    {
        String serverName = listServer.get(count % serverCount);
        ++count;
        return serverName;
    }

    public void send(String name)
    {

        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try
        {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);

            out.println(name);
            while (true)
            {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0)
                {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            if (out != null)
            {
                out.close();
            }
            if (in != null)
            {
                try
                {
                    in.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
            if (socket != null)
            {
                try
                {
                    socket.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
}

使用Zookeeper实现选举策略

场景

  有一个向外提供的服务,服务必须7*24小时提供服务,不能有单点故障。因此采用集群的方式,采用master、slave的结构。一台主机多台备机。主机向外提供服务,备机负责监听主机的状态,一旦主机宕机,备机要迅速接代主机继续向外提供服务。从备机选择一台做为主机,就是master选举。

原理分析

 右边三台主机会尝试建立master节点,谁建立成功了,就是master,向外提供。其余两台就是slave。

全部slave必须关注master的删除事件(临时节点,若是服务器宕机了,Zookeeper会自动把master节点删除)。若是master宕机了,会进行新一轮的master选举。本次咱们主要关注master选举,服务注册、发现先不讨论。

使用Zookeeper原理

» 领导者(leader),负责进行投票的发起和决议,更新系统状态
  » 学习者(learner),包括跟随者(follower)和观察者(observer),follower用于接受客户端请求并想客户端返回结果,在选主过程当中参与投票
  » Observer能够接受客户端链接,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提升读取速度
  » 客户端(client),请求发起方

  • Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫作Zab协
     议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者
   崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步之后
    ,恢复模式就结束了。状态同步保证了leader和Server具备相同的系统状态。

  • 为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。全部的提议(    proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识      leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的    统治时期。低32位用于递增计数。   • 每一个Server在工做过程当中有三种状态:     LOOKING:当前Server不知道leader是谁,正在搜寻     LEADING:当前Server即为选举出来的leader     FOLLOWING:leader已经选举出来,当前Server与之同步

相关文章
相关标签/搜索