java架构之路-(分布式zookeeper)zookeeper真实使用场景

  上几回博客,我说了一下Zookeeper的简单使用和API的使用,咱们接下来看一下他的真实场景。node

1、分布式集群管理✨web

  咱们如今有这样一个需求,请先抛开Zookeeper是集群仍是单机的概念,下面提到的都是以Zookeeper集群来讲的。redis

    1. 主动查看线上服务节点小程序

    2. 查看服务节点资源使用状况缓存

    3. 服务离线通知服务器

    4. 服务资源(CPU、内存、硬盘)超出阀值通知网络

   

咱们先来看一下代码实现流程吧。主要分为两个部分的,一个部分是写入Zookeeper集群,另外一部分是获取Zookeeper集群内部的数据。多线程

写入Zookeeper集群部分: 并发

  写入的信息包括该服务器的内存使用状况,CPU使用状况等信息。app

public void init() {
    zkClient = new ZkClient(server, 5000, 10000);
    System.out.println("zk链接成功" + server);
    // 建立根节点
    buildRoot();
    // 建立临时节点
    createServerNode();
    // 启动更新的线程
    stateThread = new Thread(() -> {
        while (true) {
            updateServerNode();//数据写到 当前的临时节点中去
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "zk_stateThread");
    stateThread.setDaemon(true);
    stateThread.start();
}
//建立根节点,若是根节点已经存在,则再也不重复建立
public void buildRoot() {
    if (!zkClient.exists(rootPath)) {
        zkClient.createPersistent(rootPath);
    }
}
// 生成服务节点
public void createServerNode() {
    nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
    System.out.println("建立节点:" + nodePath);
}

每个服务都有本身的惟一的临时序号节点。

// 获取服务节点状态
public String getOsInfo() {
    OsBean bean = new OsBean();
    bean.lastUpdateTime = System.currentTimeMillis();
    bean.ip = getLocalIp();
    bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
    MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
    bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
    bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
    bean.pid = ManagementFactory.getRuntimeMXBean().getName();
    ObjectMapper mapper = new ObjectMapper();
    try {
        return mapper.writeValueAsString(bean);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}
// 数据写到 当前的临时节点中去
public void updateServerNode() {
    zkClient.writeData(nodePath, getOsInfo());
}

当每一个服务开启的时候,咱们就应该向咱们的Zookeeper发送咱们的信息,也就是优先在根节点下建立一个临时序号节点,而且写入服务器的相关信息。就这样咱们的Zookeeper集群中就有了咱们的服务器相关的信息。

读取Zookeeper集群信息部分:

咱们对于咱们的节点递归监听就能够了。监听过程能够写入咱们的阈值限制,从而达到报警的目的。

// 初始化订阅事件
public void initSubscribeListener() {
    zkClient.unsubscribeAll();
    // 获取全部子节点
    zkClient.getChildren(rootPath)
            .stream()
            .map(p -> rootPath + "/" + p)// 得出子节点完整路径
            .forEach(p -> {
        zkClient.subscribeDataChanges(p, new DataChanges());// 数据变动的监听
    });
    //  监听子节点,的变动 增长,删除
    zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> initSubscribeListener());
}

咱们也能够将咱们获取到的信息写入到咱们的web页面中去。做为咱们Zookeeper集群对于服务器健康信息管理的小程序。(我服务器到期了,要不就给大家一套完整的代码演示了,过几天补全)

总结:就是每一个服务器往咱们的Zookeeper写入数据,在写入以前建立根节点,而后建立咱们的临时序号节点再来写入咱们的数据,也是利用了临时序号节点的特性,不会重复,并且断开链接会清理掉。也能够将server服务器和咱们的Zookeeper部署在同一个服务器也是不会影响的。(自行考虑内存,CPU,网络等问题)

2、分布式注册中心

   不少分布式项目,并非使用Spring Clould的Eureka的,自我以为Eureka和Zookeeper势均力敌吧。咱们来看一下咱们的需求。

现有一个积分系统,因为使用人数巨大,咱们须要同时部署四台服务器才能承载住咱们的并发压力。那么咱们的请求来了,由谁来控制请求哪台服务器呢?这时就有了咱们的Zookeeper注册中心(需结合dubbo)。

我来大体用图解的形式说一下原理,一会再说细节。

分布式注册中心原理:

  说到分布式注册中心,咱们须要知道几个名词。

  注册中心:注册中心是指咱们的Zookeeper集群,主要是用来存储咱们的接口信息和监听咱们服务提供者是否正常运行的。而且还保存了咱们服务消费者的相关信息。

  服务提供者:谁提供了这些接口,谁就是提供者。

  服务消费者:谁想调用这些接口,谁就是消费者。

工做流程:

  1.服务启动,也就是咱们的接口启动了,优先去咱们的注册中心去注册咱们的接口信息,也就是用临时序号节点来存咱们的接口信息。

  2.之后咱们的服务提供者会持续的发送消息到注册中心去,持续的告诉咱们的注册中心,咱们仍是可用的,仍是活着的。

  3.服务消费者来调用咱们的接口了,第一次,须要到咱们的注册中心去找一个合适接口。(具体如何分配,并非由Zookeeper来控制的),并将咱们的注册中心的提供服务IP列表缓存到本身的服务器上。

  4.只要服务提供方节点没有变更,咱们的消费者之后的调用,只许读取本身本地的缓存便可,不在须要去注册中心读取咱们的服务提供者IP列表。

 

 

 这里有一个最直观的好处就是,原来咱们写接口须要指定去哪一个IP调用,若是接口服务器IP变了,咱们还须要调整咱们的程序,这里咱们只须要调用Zookeeper便可,再也不须要调整程序了。

 

 

 注意:保存消费者,我暂时理解的是为了方便直观的看到当前都有哪些在调用咱们的接口。

3、分布式JOB

  分布式JOB,我第一次遇到这个名字的时候是懵的,我还记得我当时作项目要弄一个自动发送邮件的这样一个需求,可是咱们是横向部署的,三台服务器都有这段代码,每到半夜11.30的时候都会发三份彻底一致的邮件,有人会提出,咱们只写一个自动任务,一台服务器部署不就能够了吗?请你弄死他,咱们要的高可用,你一台服务器怎么保证高可用,这样的程序是明显不合理的。说到这咱们就有了咱们的分布式Job,分布式Job就是要解决这样相似的问题的。

  仍是先看一下实现原理和思路。

这样咱们就能保证只有master服务器能执行咱们的自动任务,若是master宕机了,咱们会有候补队员保证咱们的高可用。 

4、分布式锁

  咱们单机的程序,来使用synchronized关键字是能够实现多线程争抢的问题,分布式锁不少是redis集群来实现的,咱们来使用Zookeeper也是能够的实现的。

  程序内的锁通常分为共享读锁和排它写锁,也就是咱们加了共享读锁的时候,其它线程能够来读,可是不能改,而咱们的排它写锁,其它线程是不能进行任何操做的。

咱们能够这样来设计。

 

 

 来一个线程就往咱们的lock节点内添加一个临时序号节点,值设置为readLock或者是writeLock,标记咱们得到是什么类型的锁,当咱们再来线城时,优先监听咱们的Lock节点的数据,来判断咱们是否能够获得锁的资源,感受还不错,能够实现。但这样的实现并非很合理的,咱们图中画了三个等待的线程还好,若是等待的线程是100个1000个的话,lock节点数据变化了,也就是上一个锁释放掉了,咱们那1000个线程会疯抢咱们的锁(羊群效应),能够想象1000个大妈在超市抢鸡蛋的样子,可怕....

  咱们换一个实现的思路再来试试。

 

  咱们此次改成只监听比其小的节点数据便可,以图为例来讲,咱们的Tread3想得到写锁,一定等待Tread1和Tread2的读锁所有释放,咱们才能够给Tread3添加写锁,咱们持续监听Tread2线程,当Tread2线程锁释放掉,咱们的Tread3会继续监听到Tread1的使用状况,直到没有比他小的在使用锁资源,咱们才得到咱们的写锁资源。

  感受这个和咱们的分布式JOB差很少,最小的序号得到锁。只不过有一个共享读锁和排它写锁的区别而已。

  等我服务器续费的,上代码,下次博客继续来讲说Zookeeper的源码

 

 

 

最进弄了一个公众号,小菜技术,欢迎你们的加入

相关文章
相关标签/搜索