ZooKeeper Java Api 使用样例

package com.pa.zookeeper.test1;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

 
/** 
 * ZooKeeper Java Api 使用样例<br> 
 * ZK Api Version: 3.4.3  
 */ 
public class ZookeeperTest2 implements Watcher { 
 
    private static final int SESSION_TIMEOUT = 10000; 
    private static final String CONNECTION_STRING = "192.168.56.103:2181"; 
//    private static final String CONNECTION_STRING = "192.168.56.103:3000,192.168.56.103:3001,192.168.56.103:3002"; 
    private static final String ZK_PATH = "/nileader"; 
    private ZooKeeper zk = null; 
    private static int i = 0;
     
    private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); 
 
    static void p(String msg){
    	System.out.println(++i + ",msg=" + msg);
    }
    static ZookeeperTest2 sample = new ZookeeperTest2(); 
    public static void main( String[] args ) { 
        p("main-1");
        sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); 
        if ( sample.createPath( ZK_PATH, "我是节点初始内容" ) ) {
//            System.out.println(); 
            System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.writeData( ZK_PATH, "我是更新后的数据" ); 
            System.out.println( "更新后的数据内容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.deleteNode( ZK_PATH ); 
        } 
        sample.releaseConnection(); 
    } 
    
    /** 
     * 建立ZK链接 
     * @param connectString  ZK服务器地址列表 
     * @param sessionTimeout   Session超时时间 
     */ 
    public void createConnection( String connectString, int sessionTimeout ) { 
        this.releaseConnection(); 
        try { 
        	p("开始建立链接");
            zk = new ZooKeeper( connectString, sessionTimeout, this ); 
            connectedSemaphore.await(); 
            p("链接建立完毕");
        } catch ( InterruptedException e ) { 
            System.out.println( "链接建立失败,发生 InterruptedException" ); 
            e.printStackTrace(); 
        } catch ( IOException e ) { 
            System.out.println( "链接建立失败,发生 IOException" ); 
            e.printStackTrace(); 
        } 
    } 
 
    /** 
     * 关闭ZK链接 
     */ 
    public void releaseConnection() { 
        if ( this.zk != null ) { 
            try { 
            	p("关闭链接");
                this.zk.close(); 
            } catch ( InterruptedException e ) { 
                // ignore 
                e.printStackTrace(); 
            } 
        } 
    } 
    
    /** 
     * 收到来自Server的Watcher通知后的处理。 
     */ 
    @Override 
    public void process( WatchedEvent event ) {
       p("process收到事件通知:[state=" + event.getState() + ",type=" + event.getType() + ",path=" + event.getPath() + "]"); 
        if ( KeeperState.SyncConnected == event.getState() ) { 
            connectedSemaphore.countDown(); 
        } 
        if ( KeeperState.SyncConnected == event.getState() && !event.getPath().equals("") ) { 
        	System.out.println( "process------中获取数据内容: "); 
        	System.out.println( "process------" + sample.readData( ZK_PATH ) + "\n" ); 
        } 
    } 
 
    /** 
     *  建立节点 
     * @param path 节点path 
     * @param data 初始数据内容 
     * @return 
     */ 
    public boolean createPath( String path, String data ) { 
        try { 
            p( "开始建立节点, Path: " 
                    + this.zk.create( path, // 
                                              data.getBytes(), // 
                                              Ids.OPEN_ACL_UNSAFE, // 
                                              CreateMode.EPHEMERAL ) 
                    + ", content: " + data );
            p("节点建立完毕");
        } catch ( KeeperException e ) { 
            System.out.println( "节点建立失败,发生KeeperException" ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "节点建立失败,发生 InterruptedException" ); 
            e.printStackTrace(); 
        } 
        return true; 
    } 
 
    /** 
     * 读取指定节点数据内容 
     * @param path 节点path 
     * @return 
     */ 
    public String readData( String path ) { 
        try { 
            p("-------------开始获取数据,path:" + path ); 
            String data = new String( this.zk.getData( path, true, null ) ); 
            p("-------------获取数据完毕:" + data); 
            return data;
        } catch ( KeeperException e ) { 
            System.out.println( "读取数据失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } catch ( InterruptedException e ) { 
            System.out.println( "读取数据失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } 
    } 
 
    /** 
     * 更新指定节点数据内容 
     * @param path 节点path 
     * @param data  数据内容 
     * @return 
     */ 
    public boolean writeData( String path, String data ) { 
        try { 
            p( "++++++++++开始更新数据");
    		p("path:" + path + ", stat: " + 
                                                this.zk.setData( path, data.getBytes(), -1 ) );
            p("+++++++++++数据更新完毕");
        } catch ( KeeperException e ) { 
            System.out.println( "更新数据失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "更新数据失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
        return false; 
    } 
 
    /** 
     * 删除指定节点 
     * @param path 节点path 
     */ 
    public void deleteNode( String path ) { 
        try { 
        	p("开始删除节点");
            this.zk.delete( path, -1 ); 
            System.out.println( "删除节点成功,path:" + path ); 
        } catch ( KeeperException e ) { 
            System.out.println( "删除节点失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "删除节点失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
    } 
 
}

执行以后的日志以下:java

1,msg=main-1
2,msg=开始建立链接
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
3,msg=process收到事件通知:[state=SyncConnected,type=None,path=null]
4,msg=链接建立完毕
5,msg=开始建立节点, Path: /nileader, content: 我是节点初始内容
6,msg=节点建立完毕
7,msg=-------------开始获取数据,path:/nileader
8,msg=-------------获取数据完毕:我是节点初始内容
数据内容: 我是节点初始内容apache

9,msg=++++++++++开始更新数据
10,msg=process收到事件通知:[state=SyncConnected,type=NodeDataChanged,path=/nileader]
process------中获取数据内容: 
11,msg=-------------开始获取数据,path:/nileader
12,msg=path:/nileader, stat: 520,521,1484807555448,1484807555462,1,0,0,97307556030054430,24,0,520服务器

13,msg=+++++++++++数据更新完毕
14,msg=-------------开始获取数据,path:/nileader
15,msg=-------------获取数据完毕:我是更新后的数据
process------我是更新后的数据session

16,msg=-------------获取数据完毕:我是更新后的数据
更新后的数据内容: 我是更新后的数据app

17,msg=开始删除节点
18,msg=process收到事件通知:[state=SyncConnected,type=NodeDeleted,path=/nileader]
process------中获取数据内容: 
19,msg=-------------开始获取数据,path:/nileader
删除节点成功,path:/nileader
20,msg=关闭链接
 ide

经过屡次运行,上述代码出现过NoNodeException,应该是执行代码的顺序没有保证,偶尔出现执行的前后顺序不一样致使异常,后面再细看。。。this

例子是从其余地方修改的,出处已忘记了。。。日志

相关文章
相关标签/搜索