ZooKeeper 使用 Java API

zkCli 工具适用于调试,不推荐使用 zkCli 工具来搭建系统。java

实际开发时通常也不直接使用 ZooKeeper 的 Java API,而是使用更高层次的封装库 Curator,不过对 Java API 的学习仍然有不少益处。算法

本篇文章介绍经过 ZooKeeper 的 Java API 来实现建立会话、实现监视点等功能,演示主从模式。apache

添加依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>
复制代码

创建会话

启动 ZooKeeper 服务端,经过 Java API 创建会话。编程

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
复制代码

其中 connectString 包含主机名及端口号,sessionTimeout 为会话超时时间,watcher 对象用于接收会话事件。bootstrap

Watcher 为一个接口,实现 Watcher 须要重写 void process(WatchedEvent event) 方法。网络

当遇到网络故障时,若是链接断开,ZooKeeper 客户端会自动从新链接。session

获取管理权

下面经过 ZooKeeper Java API 来实现简单的群首选举算法,确保同一时间只有一个主节点进程处于活动状态。dom

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Random;

public class Master implements Watcher {
    
    private ZooKeeper zk;
    private String serviceId = Integer.toString(new Random().nextInt());
    private boolean isLeader = false;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    private void stopZk() throws InterruptedException {
        zk.close();
    }

    public void process(WatchedEvent watchedEvent) {
        System.out.println("event: " + watchedEvent);
    }

    public static void main(String[] args) throws Exception {
		Master master = new Master();
		master.startZk();

		master.runForMaster();

        System.out.println("serviceId: " + master.serviceId);

		if (master.isLeader) {
            System.out.println("master");
            Thread.sleep(10000);
        } else {
            System.out.println("not master");
        }

		master.stopZk();
    }

    private boolean checkMaster() throws InterruptedException {
        while (true) {
            try {
                Stat stat = new Stat();
                byte data[] = zk.getData("/master", false, stat);
                isLeader = new String(data).equals(serviceId);
                return true;
            } catch (KeeperException.NoNodeException e) {
                return false;
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    private void runForMaster() throws InterruptedException {
        while (true) {
            try {
                zk.create("/master", serviceId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                isLeader = true;
                break;
            } catch (KeeperException.NodeExistsException e) {
                isLeader = false;
                break;
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            if (checkMaster()) {
                break;
            }
        }
    }
}
复制代码

主函数执行建立一个演示实例,实例会分配一个随机整数做为 id,创建 ZooKeeper 链接后尝试建立主节点 master。异步

若是 master 主节点建立成功,则该实例为群首 leader;若是节点已经存在则其余实例为 leader;发生断开链接等异常时,响应信息丢失,没法肯定当前进程是否为主节点,须要经过 checkMaster() 方法从新检查主节点状态。ide

屡次执行主函数,其中第一次执行时会打印 master,在 master 断开链接前的 10 秒钟内,再次执行会打印 not master,当第一次执行的 master 断开链接后,再次执行主函数,打印 master。

异步建立须要的目录

在 ZooKeeper 中全部的同步操做都有对应的异步操做,异步调用不会阻塞应用程序,还能简化应用的实现方式。

主从模型的设计须要用到 /tasks、/assign 和 /workers 3 个目录,能够经过某些系统配置来建立这些目录。下面的代码示例会经过异步的方式建立出所须要的目录。

private void bootstrap() {
    createParent("/workers", new byte[0]);
    createParent("/assign", new byte[0]);
    createParent("/tasks", new byte[0]);
}

private void createParent(String path, byte[] data) {
    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
}

AsyncCallback.StringCallback createParentCallback = new AsyncCallback.StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (KeeperException.Code.get(rc)) {
            case OK:
                System.out.println("parent " + path + " created");
                break;
            case NODEEXISTS:
                System.out.println("parent " + path + " already registered");
                break;
            case CONNECTIONLOSS:
                createParent(path, (byte[]) ctx);
                break;
            default:
                System.out.println("create " + path + " error");
        }
    }
};
复制代码

注册从节点

前面的部分已经有了主节点,为了使主节点能够发号施令,如今要配置从节点,在 /workers 下建立临时节点。

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Random;

public class Worker implements Watcher {

    private ZooKeeper zk;
    private String serviceId = Integer.toString(new Random().nextInt());

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private void register() {
        zk.create("/workers/worker-" + serviceId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL, createWorkerCallback, null);
    }

    private AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (KeeperException.Code.get(rc)) {
                case OK:
                    System.out.println("registered successfully: " + serviceId);
                    break;
                case NODEEXISTS:
                    System.out.println("already registered: " + serviceId);
                    break;
                case CONNECTIONLOSS:
                    register();
                    break;
                default:
                    System.out.println("error");
            }
        }
    };

    private String status;

    public void setStatus(String status) {
        this.status = status;
        updateStatus(status);
    }

    synchronized private void updateStatus(String status) {
        if (status.equals(this.status)) {
            zk.setData("/workers/worker-" + serviceId, status.getBytes(), -1, statusUpdateCallback, status);
        }
    }

    AsyncCallback.StatCallback statusUpdateCallback = new AsyncCallback.StatCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            switch (KeeperException.Code.get(rc)) {
                case CONNECTIONLOSS:
                    updateStatus((String) ctx);
                    break;
                default:
            }
        }
    };
    
    public static void main(String[] args) throws Exception {
		Worker worker = new Worker();
		worker.startZk();
		worker.register();
		Thread.sleep(30000);
    }
}
复制代码

主函数建立 worker 实例,开启会话,执行注册逻辑,建立节点时如发生链接丢失则再次执行注册逻辑,注册所建立的节点为临时节点。

从节点开始处理某些任务时,须要经过 setStatus() 方法更新节点状态。

任务队列

系统中 client 组件用于添加任务,以便从节点执行任务。如下为 client 代码:

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import java.io.IOException;

public class Client implements Watcher {

    private ZooKeeper zk;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private String queueCommand(String command) {
        while (true) {
            try {
                String name = zk.create("/tasks/task-", command.getBytes()
                        , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                return name;
            } catch (Exception e) {
                System.out.println("error");
            }
        }
    }


    public static void main(String[] args) throws Exception {
		Client client = new Client();
		client.startZk();
		String name = client.queueCommand("command-1");
        System.out.println("created " + name);
    }
}
复制代码

Client 使用有序节点 task- 标示任务,task- 后面会跟随一个递增整数,在执行 create 时如发生链接丢失,则重试 create 操做,适用于【至少执行一次】策略的应用。如要采用【至多执行一次】策略,能够将任务的惟一标识添加到节点名中。

管理客户端

管理客户端 AdminClient 用于展现系统运行状态,代码以下:

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Date;

public class AdminClient implements Watcher {

    private ZooKeeper zk;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private void listState() throws KeeperException, InterruptedException {
        try {
            Stat stat = new Stat();
            byte[] masterData = zk.getData("/master", false, stat);
            Date startDate = new Date(stat.getCtime());
            System.out.println("master: " + new String(masterData) + " since " + startDate);
        } catch (KeeperException.NoNodeException e) {
            System.out.println("no master");
        }

        System.out.println("workers: \n");
        for (String worker : zk.getChildren("/workers", false)) {
            byte[] data = zk.getData("/workers/" + worker, false, null);
            String state = new String(data);
            System.out.println("worker: " + state);
        }

        // ...
    }

    public static void main(String[] args) throws Exception {
		AdminClient adminClient = new AdminClient();
		adminClient.startZk();
		adminClient.listState();
    }
}
复制代码

以上代码会简单的列出各个节点的信息。

经过 Java API 编程与 zkCli 命令很是接近,不一样的是 zkCli 经常使用于调试,通常会在一个相对稳定的环境下使用。经过 Java API 编写的程序,须要考虑到异常状况,尤为是 ConnectionLossException 异常,须要检查状态并合理恢复。

原文地址

相关文章
相关标签/搜索