Springboot2(29)集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器

源码地址

springboot2教程系列

实现zookeeper节点的增删改查、节点监听、分布式读写锁、分布式计数器

添加依赖

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<zookeeper.version>3.4.8</zookeeper.version>
		<curator.version>2.11.1</curator.version>
	</properties>
	
	<dependencies>

		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>${zookeeper.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>${curator.version}</version>
		</dependency>

    </dependencies>

ZkClient(curator)

这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api

/** * zookeeper客户端 */
@Data
@Slf4j
public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    public TreeCache cache;
    private ZookeeperProperties zookeeperProperties;

    public ZkClient(ZookeeperProperties zookeeperProperties){
        this.zookeeperProperties = zookeeperProperties;
    }

    /** * 初始化zookeeper客户端 */
    public void init() {
    	try{
    		RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),
                    zookeeperProperties.getMaxRetries());
            Builder builder   = CuratorFrameworkFactory.builder()
                    .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
                    .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
                    .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
                    .namespace( zookeeperProperties.getNamespace());
            if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
            	builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
                builder.aclProvider(new ACLProvider() {
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }

                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
            }
            client = builder.build();
            client.start();

            initLocalCache("/test");
         // addConnectionStateListener();


	        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
				public void stateChanged(CuratorFramework client, ConnectionState state) {
					if (state == ConnectionState.LOST) {
						//连接丢失
						logger.info("lost session with zookeeper");
					} else if (state == ConnectionState.CONNECTED) {
						//连接新建
						logger.info("connected with zookeeper");
					} else if (state == ConnectionState.RECONNECTED) {
						logger.info("reconnected with zookeeper");
					}
				}
	        });
    	}catch(Exception e){
    		e.printStackTrace();
    	}
    }

    /** * 初始化本地缓存 * @param watchRootPath * @throws Exception */
    private void initLocalCache(String watchRootPath) throws Exception {
        cache = new TreeCache(client, watchRootPath);
        TreeCacheListener listener = (client1, event) ->{
            log.info("event:" + event.getType() +
                    " |path:" + (null != event.getData() ? event.getData().getPath() : null));

            if(event.getData()!=null && event.getData().getData()!=null){
                log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
            }

           // client1.getData().
        };
        cache.getListenable().addListener(listener);
        cache.start();
    }


    public void stop() {
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }


    /** * 创建节点 * @param mode 节点类型 * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。 * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失 * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除 *4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。 * @param path 节点名称 * @param nodeData 节点数据 */
    public void createNode(CreateMode mode, String path , String nodeData) {
        try {
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

    /** * 创建节点 * @param mode 节点类型 * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。 * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失 * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除 * 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。 * @param path 节点名称 */
    public void createNode(CreateMode mode,String path ) {
        try {
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

    /** * 删除节点数据 * * @param path */
    public void deleteNode(final String path) {
        try {
            deleteNode(path,true);
        } catch (Exception ex) {
            log.error("{}",ex);
        }
    }


    /** * 删除节点数据 * @param path * @param deleteChildre 是否删除子节点 */
    public void deleteNode(final String path,Boolean deleteChildre){
        try {
            if(deleteChildre){
                //guaranteed()删除一个节点,强制保证删除,
                // 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
                client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
            }else{
                client.delete().guaranteed().forPath(path);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /** * 设置指定节点的数据 * @param path * @param datas */
    public void setNodeData(String path, byte[] datas){
        try {
            client.setData().forPath(path, datas);
        }catch (Exception ex) {
            log.error("{}",ex);
        }
    }

    /** * 获取指定节点的数据 * @param path * @return */
    public byte[] getNodeData(String path){
        Byte[] bytes = null;
        try {
            if(cache != null){
                ChildData data = cache.getCurrentData(path);
                if(data != null){
                    return data.getData();
                }
            }
            client.getData().forPath(path);
            return client.getData().forPath(path);
        }catch (Exception ex) {
            log.error("{}",ex);
        }
        return null;
    }

    /** * 获取数据时先同步 * @param path * @return */
    public byte[] synNodeData(String path){
        client.sync();
        return getNodeData( path);
    }

    /** * 判断路径是否存在 * * @param path * @return */
    public boolean isExistNode(final String path) {
        client.sync();
        try {
            return null != client.checkExists().forPath(path);
        } catch (Exception ex) {
            return false;
        }
    }


    /** * 获取节点的子节点 * @param path * @return */
    public List<String> getChildren(String path) {
        List<String> childrenList = new ArrayList<>();
        try {
            childrenList = client.getChildren().forPath(path);
        } catch (Exception e) {
            logger.error("获取子节点出错", e);
        }
        return childrenList;
    }

    /** * 随机读取一个path子路径, "/"为根节点对应该namespace * 先从cache中读取,如果没有,再从zookeeper中查询 * @param path * @return * @throws Exception */
	public String getRandomData(String path)  {
		try{
			Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
			if(cacheMap != null && cacheMap.size() > 0) {
				logger.debug("get random value from cache,path="+path);
				Collection<ChildData> values = cacheMap.values();
				List<ChildData> list = new ArrayList<>(values);
				Random rand = new Random();
				byte[] b = list.get(rand.nextInt(list.size())).getData();
				return new String(b,"utf-8");
			}
			if(isExistNode(path)) {
				logger.debug("path [{}] is not exists,return null",path);
				return null;
			} else {
				logger.debug("read random from zookeeper,path="+path);
				List<String> list = client.getChildren().forPath(path);
				if(list == null || list.size() == 0) {
					logger.debug("path [{}] has no children return null",path);
					return null;
				}
				Random rand = new Random();
				String child = list.get(rand.nextInt(list.size()));
				path = path + "/" + child;
				byte[] b = client.getData().forPath(path);
				String value = new String(b,"utf-8");
				return value;
			}
		}catch(Exception e){
			log.error("{}",e);
		}
		return null;

	}

	/** * 可重入共享锁 -- Shared Reentrant Lock * @param lockPath * @param time * @param dealWork 获取 * @return */
	public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
		InterProcessMutex lock = new InterProcessMutex(client, lockPath);
		try {
			if (!lock.acquire(time, TimeUnit.SECONDS)) {
	            log.error("get lock fail:{}", " could not acquire the lock");
	            return null;
	        }
            log.debug("{} get the lock",lockPath);
            Object b = dealWork.deal();
            return b;
        }catch(Exception e){
        	log.error("{}", e);
        }finally{
        	try {
				lock.release();
			} catch (Exception e) {
				//log.error("{}",e);
			}
        }
		return null;
	}

    /** * 获取读写锁 * @param path * @return */
	public InterProcessReadWriteLock getReadWriteLock(String path){
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
        return readWriteLock;
    }

    /** * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 */
    ExecutorService pool = Executors.newFixedThreadPool(2);

    /** * 监听数据节点的变化情况 * @param watchPath * @param listener */
    public void watchPath(String watchPath,TreeCacheListener listener){
     // NodeCache nodeCache = new NodeCache(client, watchPath, false);
        TreeCache cache = new TreeCache(client, watchPath);
        cache.getListenable().addListener(listener,pool);
        try {
            cache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

配置文件

zookeeper.enabled: true
#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038
zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
zookeeper.namespace: demo
zookeeper.digest: rt:rt                     #zkCli.sh acl 命令 addauth digest mpush
zookeeper.sessionTimeoutMs: 1000            #会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间
zookeeper.connectionTimeoutMs: 6000         #连接创建超时时间,单位为毫秒
zookeeper.maxRetries: 3                     #最大重试次数
zookeeper.baseSleepTimeMs: 1000             #初始sleep时间 ,毫秒

程序会创建节点demo为namespace,之后所有增删改查的操作都这节点下完成

Controller层方法

@Api(tags="zookeeper基本操作")
@RequestMapping("/zk")
@RestController
@Slf4j
public class ZookeeperController {

    @Autowired
    private ZkClient zkClient;

    @Autowired
    private ZkClient zkClientTest;

    /** * 创建节点 * @param type * @param znode * @return */
    @ApiOperation(value = "创建节点",notes = "在命名空间下创建节点")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="type",value = "节点类型:<br> 0 持久化节点<br> 1 临时节点<br> 2 持久顺序节点<br> 3 临时顺序节点",
                    allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "path",required = true,dataType = "String"),
            @ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "body",dataType = "String")
    })
    @RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)
    private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){
        znode = "/" + znode;
        try {
            zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return znode;
    }

    /** * 设置节点数据 * @param znode * @return */
    @ApiOperation(value = "设置节点数据",notes = "设置节点数据")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"),
            @ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "query",required = true,dataType = "String")
    })
    @RequestMapping(value = "/update",method=RequestMethod.POST)
    public String update(@RequestBody String znode,@RequestParam String nodeData){
        znode = "/" + znode;
        zkClient.setNodeData(znode,nodeData.getBytes());
        return "sucess";
    }

    @ApiOperation(value = "删除节点",notes = "删除节点")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")
    })
    @RequestMapping(value = "/delete",method=RequestMethod.GET)
    public String delete(@RequestParam String znode){
        znode = "/" + znode;
        zkClient.deleteNode(znode);
        return "success";
    }

    @ApiOperation(value = "查找节点的内容",notes = "查找节点的内容")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")
    })
    @RequestMapping(value = "/find",method=RequestMethod.POST)
    public String find(@RequestBody String znode){
        znode = "/" + znode;
        byte[] b =  zkClient.getNodeData(znode);
        return new String(b);
    }

    /** * 给节点添加读写锁 * @param znode * @return */
    @ApiOperation(value = "添加读写锁",notes = "写锁跟读锁互斥,读锁跟读锁共享")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="lockType",value = "锁类型:<br> 0 写锁<br> 1 读锁",
                    allowableValues = "0,1",defaultValue="0",paramType = "query",required = true,dataType = "Long"),
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")
    })
    @RequestMapping(value = "/writeLock",method=RequestMethod.GET)
    public String readLock(@RequestParam Integer lockType,@RequestParam String znode){
        znode = "/" + znode;
        InterProcessReadWriteLock readWriteLock = zkClient.getReadWriteLock(znode);
        InterProcessMutex writeLock = readWriteLock.writeLock();
        InterProcessMutex readLock = readWriteLock.readLock();
        Runnable writeRunnable = ()->{
            try {
                System.out.println("------write lock-----------");
                writeLock.acquire();
                System.out.println("write acquire");
                Thread.sleep(10_000);
                System.out.println("write release");
                writeLock.release();

            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        Runnable readRunnable = ()->{
            try {
                System.out.println("-------read lock----------");
                readLock.acquire();
                System.out.println("read acquire");
                Thread.sleep(20_000);
                System.out.println("read release");
                readLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        };

        if(lockType == 0 ){
            new Thread(writeRunnable).start();
        }else if(lockType == 1){
            new Thread(readRunnable).start();
        }
        return "success";
    }

    /** * 监听节点 * @param znode * @return */
    @ApiOperation(value = "监听节点",notes = "监控整个树上的所有节点")
    @ApiImplicitParams(
            @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")
    )
    @RequestMapping(value="/watchPath",method=RequestMethod.POST)
    public String watchPath(@RequestBody  String znode){
        znode = "/" + znode;
        zkClient.watchPath(znode,(client1, event) ->{
            log.info("event:" + event.getType() +
                    " |path:" + (null != event.getData() ? event.getData().getPath() : null));

            if(event.getData()!=null && event.getData().getData()!=null){
                log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
            }
        });
        return "success";
    }

    /** * 测试计算器 * 并发越高耗时越长 * 要自己实现获取锁失败重试 * @return */
    @ApiOperation(value = "模拟分布式计数器",notes = "模拟分布式计数器")
    @RequestMapping(value="/counter",method=RequestMethod.POST)
    public String counter(@RequestBody  String znode){
        SharedCount baseCount = new SharedCount(zkClientTest.getClient(), znode, 0);
        try {
            baseCount.start();
            //生成线程池
            ExecutorService executor = Executors.newCachedThreadPool();
            Consumer<SharedCount> consumer = (SharedCount count) -> {
                try {
                    List<Callable<Boolean>> callList = new ArrayList<>();
                    Callable<Boolean> call = () -> {
                        boolean result = false;
                        try {
                            Long time = System.currentTimeMillis();
                            while(!result){
                                VersionedValue<Integer> oldVersion = baseCount.getVersionedValue();
                                int newCnt = oldVersion.getValue() + 1;
                                result = baseCount.trySetCount(oldVersion, newCnt);
                                if(System.currentTimeMillis()-time>10_000||result){
                                    break;
                                }
                                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)+1);
                            }
                        } catch (Exception e) {
                        }
                        return result;
                    };
                    //5个线程
                    for (int i = 0; i < 100; i++) {
                        callList.add(call);
                    }
                    List<Future<Boolean>> futures = executor.invokeAll(callList);
                } catch (Exception e) {

                }
            };
            //测试分布式int类型的计数器
            consumer.accept(baseCount);
            System.out.println("final cnt : " + baseCount.getCount());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success:"+baseCount.getCount();
    }

    /** * DistributedAtomicLong计数器可以自己设置重试的次数与间隔 * 并发越高耗时越长 * 要自己实现获取锁失败重试 */
    @ApiOperation(value = "模拟分布式计数器2",notes = "模拟分布式计数器2")
    @RequestMapping(value="/counter2",method=RequestMethod.POST)
    public String distributedCount(@RequestBody  String znode) throws Exception {
        DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(
                zkClientTest.getClient(), znode, new RetryNTimes(10, 30));
        //生成线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {
            try {
                List<Callable<Boolean>> callList = new ArrayList<>();
                Callable<Boolean> call = () -> {
                    boolean result = false;
                    try {
                        AtomicValue<Long> val = count.increment();
                        System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded());
                        result = val.succeeded();
                    } catch (Exception e) {
                    } finally {
                    }
                    return result;
                };
                //5个线程
                for (int i = 0; i < 500; i++) {
                    callList.add(call);
                }
                List<Future<Boolean>> futures = executor.invokeAll(callList);
            } catch (Exception e) {

            }
        };
        consumer.accept(distributedAtomicLong);
        return "success:"+distributedAtomicLong.get().postValue();
    }
    
     /** * * @return * @throws KeeperException */
    @ApiOperation(value = "模拟服务注册和随机获取服务",notes = "模拟服务注册和随机获取服务")
    @RequestMapping(value="/serviceRegistry",method=RequestMethod.POST)
    public String serviceRegistry() throws KeeperException {
        //服务注册
        zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service1","http://1270.0.1:8001/");
        zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service2","http://1270.0.1:8002/");
        zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service3","http://1270.0.1:8003/");
        return zkClient.getRandomData("/test");
    }

}

测试

测试地址:http://127.0.0.1:8080/swagger-ui.html

读写锁测试

在界面发送两次写锁

在这里插入图片描述

后面打印

2018-12-29 11:45:27.214  INFO 53332 --- [      Thread-24]   : ------write lock-----------
2018-12-29 11:45:27.242  INFO 53332 --- [      Thread-24]   : write acquire
2018-12-29 11:45:30.870  INFO 53332 --- [      Thread-25]   : ------write lock-----------
2018-12-29 11:45:37.243  INFO 53332 --- [      Thread-24]   : write release
2018-12-29 11:45:37.276  INFO 53332 --- [      Thread-25]   : write acquire
2018-12-29 11:45:47.276  INFO 53332 --- [      Thread-25]   : write release

可以看出写锁是互斥的,另外写锁跟读锁也是互斥的,读锁跟读锁之间是共享的(自行测试)

计数器测试

修改方法中的线程个数,会发现并发越大,计数器执行时间越长,而且很大可能数据不准确。所以不适用于高并发的场景。
在这里插入图片描述