实现zookeeper节点的增删改查、节点监听、分布式读写锁、分布式计数器
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <zookeeper.version>3.4.8</zookeeper.version> <curator.version>2.11.1</curator.version> </properties> <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> </dependencies>
这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api
/** * zookeeper客户端 */ @Data @Slf4j public class ZkClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private CuratorFramework client; public TreeCache cache; private ZookeeperProperties zookeeperProperties; public ZkClient(ZookeeperProperties zookeeperProperties){ this.zookeeperProperties = zookeeperProperties; } /** * 初始化zookeeper客户端 */ public void init() { try{ RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries()); Builder builder = CuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy) .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs()) .namespace( zookeeperProperties.getNamespace()); if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){ builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8")); builder.aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); initLocalCache("/test"); // addConnectionStateListener(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { //连接丢失 logger.info("lost session with zookeeper"); } else if (state == ConnectionState.CONNECTED) { //连接新建 logger.info("connected with zookeeper"); } else if (state == ConnectionState.RECONNECTED) { logger.info("reconnected with zookeeper"); } } }); }catch(Exception e){ e.printStackTrace(); } } /** * 初始化本地缓存 * @param watchRootPath * @throws Exception */ private void initLocalCache(String watchRootPath) throws Exception { cache = new TreeCache(client, watchRootPath); TreeCacheListener listener = (client1, event) ->{ log.info("event:" + event.getType() + " |path:" + (null != event.getData() ? event.getData().getPath() : null)); if(event.getData()!=null && event.getData().getData()!=null){ log.info("发生变化的节点内容为:" + new String(event.getData().getData())); } // client1.getData(). }; cache.getListenable().addListener(listener); cache.start(); } public void stop() { client.close(); } public CuratorFramework getClient() { return client; } /** * 创建节点 * @param mode 节点类型 * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。 * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失 * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除 *4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。 * @param path 节点名称 * @param nodeData 节点数据 */ public void createNode(CreateMode mode, String path , String nodeData) { try { //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8")); } catch (Exception e) { logger.error("注册出错", e); } } /** * 创建节点 * @param mode 节点类型 * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。 * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失 * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除 * 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。 * @param path 节点名称 */ public void createNode(CreateMode mode,String path ) { try { //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path); } catch (Exception e) { logger.error("注册出错", e); } } /** * 删除节点数据 * * @param path */ public void deleteNode(final String path) { try { deleteNode(path,true); } catch (Exception ex) { log.error("{}",ex); } } /** * 删除节点数据 * @param path * @param deleteChildre 是否删除子节点 */ public void deleteNode(final String path,Boolean deleteChildre){ try { if(deleteChildre){ //guaranteed()删除一个节点,强制保证删除, // 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功 client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); }else{ client.delete().guaranteed().forPath(path); } } catch (Exception e) { e.printStackTrace(); } } /** * 设置指定节点的数据 * @param path * @param datas */ public void setNodeData(String path, byte[] datas){ try { client.setData().forPath(path, datas); }catch (Exception ex) { log.error("{}",ex); } } /** * 获取指定节点的数据 * @param path * @return */ public byte[] getNodeData(String path){ Byte[] bytes = null; try { if(cache != null){ ChildData data = cache.getCurrentData(path); if(data != null){ return data.getData(); } } client.getData().forPath(path); return client.getData().forPath(path); }catch (Exception ex) { log.error("{}",ex); } return null; } /** * 获取数据时先同步 * @param path * @return */ public byte[] synNodeData(String path){ client.sync(); return getNodeData( path); } /** * 判断路径是否存在 * * @param path * @return */ public boolean isExistNode(final String path) { client.sync(); try { return null != client.checkExists().forPath(path); } catch (Exception ex) { return false; } } /** * 获取节点的子节点 * @param path * @return */ public List<String> getChildren(String path) { List<String> childrenList = new ArrayList<>(); try { childrenList = client.getChildren().forPath(path); } catch (Exception e) { logger.error("获取子节点出错", e); } return childrenList; } /** * 随机读取一个path子路径, "/"为根节点对应该namespace * 先从cache中读取,如果没有,再从zookeeper中查询 * @param path * @return * @throws Exception */ public String getRandomData(String path) { try{ Map<String,ChildData> cacheMap = cache.getCurrentChildren(path); if(cacheMap != null && cacheMap.size() > 0) { logger.debug("get random value from cache,path="+path); Collection<ChildData> values = cacheMap.values(); List<ChildData> list = new ArrayList<>(values); Random rand = new Random(); byte[] b = list.get(rand.nextInt(list.size())).getData(); return new String(b,"utf-8"); } if(isExistNode(path)) { logger.debug("path [{}] is not exists,return null",path); return null; } else { logger.debug("read random from zookeeper,path="+path); List<String> list = client.getChildren().forPath(path); if(list == null || list.size() == 0) { logger.debug("path [{}] has no children return null",path); return null; } Random rand = new Random(); String child = list.get(rand.nextInt(list.size())); path = path + "/" + child; byte[] b = client.getData().forPath(path); String value = new String(b,"utf-8"); return value; } }catch(Exception e){ log.error("{}",e); } return null; } /** * 可重入共享锁 -- Shared Reentrant Lock * @param lockPath * @param time * @param dealWork 获取 * @return */ public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){ InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { if (!lock.acquire(time, TimeUnit.SECONDS)) { log.error("get lock fail:{}", " could not acquire the lock"); return null; } log.debug("{} get the lock",lockPath); Object b = dealWork.deal(); return b; }catch(Exception e){ log.error("{}", e); }finally{ try { lock.release(); } catch (Exception e) { //log.error("{}",e); } } return null; } /** * 获取读写锁 * @param path * @return */ public InterProcessReadWriteLock getReadWriteLock(String path){ InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path); return readWriteLock; } /** * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 */ ExecutorService pool = Executors.newFixedThreadPool(2); /** * 监听数据节点的变化情况 * @param watchPath * @param listener */ public void watchPath(String watchPath,TreeCacheListener listener){ // NodeCache nodeCache = new NodeCache(client, watchPath, false); TreeCache cache = new TreeCache(client, watchPath); cache.getListenable().addListener(listener,pool); try { cache.start(); } catch (Exception e) { e.printStackTrace(); } } }
zookeeper.enabled: true #zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038 zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181 zookeeper.namespace: demo zookeeper.digest: rt:rt #zkCli.sh acl 命令 addauth digest mpush zookeeper.sessionTimeoutMs: 1000 #会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间 zookeeper.connectionTimeoutMs: 6000 #连接创建超时时间,单位为毫秒 zookeeper.maxRetries: 3 #最大重试次数 zookeeper.baseSleepTimeMs: 1000 #初始sleep时间 ,毫秒
程序会创建节点demo为namespace,之后所有增删改查的操作都这节点下完成
@Api(tags="zookeeper基本操作") @RequestMapping("/zk") @RestController @Slf4j public class ZookeeperController { @Autowired private ZkClient zkClient; @Autowired private ZkClient zkClientTest; /** * 创建节点 * @param type * @param znode * @return */ @ApiOperation(value = "创建节点",notes = "在命名空间下创建节点") @ApiImplicitParams({ @ApiImplicitParam(name ="type",value = "节点类型:<br> 0 持久化节点<br> 1 临时节点<br> 2 持久顺序节点<br> 3 临时顺序节点", allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"), @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "path",required = true,dataType = "String"), @ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "body",dataType = "String") }) @RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST) private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){ znode = "/" + znode; try { zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData); } catch (KeeperException e) { e.printStackTrace(); } return znode; } /** * 设置节点数据 * @param znode * @return */ @ApiOperation(value = "设置节点数据",notes = "设置节点数据") @ApiImplicitParams({ @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"), @ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "query",required = true,dataType = "String") }) @RequestMapping(value = "/update",method=RequestMethod.POST) public String update(@RequestBody String znode,@RequestParam String nodeData){ znode = "/" + znode; zkClient.setNodeData(znode,nodeData.getBytes()); return "sucess"; } @ApiOperation(value = "删除节点",notes = "删除节点") @ApiImplicitParams({ @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String") }) @RequestMapping(value = "/delete",method=RequestMethod.GET) public String delete(@RequestParam String znode){ znode = "/" + znode; zkClient.deleteNode(znode); return "success"; } @ApiOperation(value = "查找节点的内容",notes = "查找节点的内容") @ApiImplicitParams({ @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String") }) @RequestMapping(value = "/find",method=RequestMethod.POST) public String find(@RequestBody String znode){ znode = "/" + znode; byte[] b = zkClient.getNodeData(znode); return new String(b); } /** * 给节点添加读写锁 * @param znode * @return */ @ApiOperation(value = "添加读写锁",notes = "写锁跟读锁互斥,读锁跟读锁共享") @ApiImplicitParams({ @ApiImplicitParam(name ="lockType",value = "锁类型:<br> 0 写锁<br> 1 读锁", allowableValues = "0,1",defaultValue="0",paramType = "query",required = true,dataType = "Long"), @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String") }) @RequestMapping(value = "/writeLock",method=RequestMethod.GET) public String readLock(@RequestParam Integer lockType,@RequestParam String znode){ znode = "/" + znode; InterProcessReadWriteLock readWriteLock = zkClient.getReadWriteLock(znode); InterProcessMutex writeLock = readWriteLock.writeLock(); InterProcessMutex readLock = readWriteLock.readLock(); Runnable writeRunnable = ()->{ try { System.out.println("------write lock-----------"); writeLock.acquire(); System.out.println("write acquire"); Thread.sleep(10_000); System.out.println("write release"); writeLock.release(); } catch (Exception e) { e.printStackTrace(); } }; Runnable readRunnable = ()->{ try { System.out.println("-------read lock----------"); readLock.acquire(); System.out.println("read acquire"); Thread.sleep(20_000); System.out.println("read release"); readLock.release(); } catch (Exception e) { e.printStackTrace(); } }; if(lockType == 0 ){ new Thread(writeRunnable).start(); }else if(lockType == 1){ new Thread(readRunnable).start(); } return "success"; } /** * 监听节点 * @param znode * @return */ @ApiOperation(value = "监听节点",notes = "监控整个树上的所有节点") @ApiImplicitParams( @ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String") ) @RequestMapping(value="/watchPath",method=RequestMethod.POST) public String watchPath(@RequestBody String znode){ znode = "/" + znode; zkClient.watchPath(znode,(client1, event) ->{ log.info("event:" + event.getType() + " |path:" + (null != event.getData() ? event.getData().getPath() : null)); if(event.getData()!=null && event.getData().getData()!=null){ log.info("发生变化的节点内容为:" + new String(event.getData().getData())); } }); return "success"; } /** * 测试计算器 * 并发越高耗时越长 * 要自己实现获取锁失败重试 * @return */ @ApiOperation(value = "模拟分布式计数器",notes = "模拟分布式计数器") @RequestMapping(value="/counter",method=RequestMethod.POST) public String counter(@RequestBody String znode){ SharedCount baseCount = new SharedCount(zkClientTest.getClient(), znode, 0); try { baseCount.start(); //生成线程池 ExecutorService executor = Executors.newCachedThreadPool(); Consumer<SharedCount> consumer = (SharedCount count) -> { try { List<Callable<Boolean>> callList = new ArrayList<>(); Callable<Boolean> call = () -> { boolean result = false; try { Long time = System.currentTimeMillis(); while(!result){ VersionedValue<Integer> oldVersion = baseCount.getVersionedValue(); int newCnt = oldVersion.getValue() + 1; result = baseCount.trySetCount(oldVersion, newCnt); if(System.currentTimeMillis()-time>10_000||result){ break; } TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)+1); } } catch (Exception e) { } return result; }; //5个线程 for (int i = 0; i < 100; i++) { callList.add(call); } List<Future<Boolean>> futures = executor.invokeAll(callList); } catch (Exception e) { } }; //测试分布式int类型的计数器 consumer.accept(baseCount); System.out.println("final cnt : " + baseCount.getCount()); } catch (Exception e) { e.printStackTrace(); } return "success:"+baseCount.getCount(); } /** * DistributedAtomicLong计数器可以自己设置重试的次数与间隔 * 并发越高耗时越长 * 要自己实现获取锁失败重试 */ @ApiOperation(value = "模拟分布式计数器2",notes = "模拟分布式计数器2") @RequestMapping(value="/counter2",method=RequestMethod.POST) public String distributedCount(@RequestBody String znode) throws Exception { DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong( zkClientTest.getClient(), znode, new RetryNTimes(10, 30)); //生成线程池 ExecutorService executor = Executors.newCachedThreadPool(); Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> { try { List<Callable<Boolean>> callList = new ArrayList<>(); Callable<Boolean> call = () -> { boolean result = false; try { AtomicValue<Long> val = count.increment(); System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded()); result = val.succeeded(); } catch (Exception e) { } finally { } return result; }; //5个线程 for (int i = 0; i < 500; i++) { callList.add(call); } List<Future<Boolean>> futures = executor.invokeAll(callList); } catch (Exception e) { } }; consumer.accept(distributedAtomicLong); return "success:"+distributedAtomicLong.get().postValue(); } /** * * @return * @throws KeeperException */ @ApiOperation(value = "模拟服务注册和随机获取服务",notes = "模拟服务注册和随机获取服务") @RequestMapping(value="/serviceRegistry",method=RequestMethod.POST) public String serviceRegistry() throws KeeperException { //服务注册 zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service1","http://1270.0.1:8001/"); zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service2","http://1270.0.1:8002/"); zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service3","http://1270.0.1:8003/"); return zkClient.getRandomData("/test"); } }
测试地址:http://127.0.0.1:8080/swagger-ui.html
在界面发送两次写锁
后面打印
2018-12-29 11:45:27.214 INFO 53332 --- [ Thread-24] : ------write lock----------- 2018-12-29 11:45:27.242 INFO 53332 --- [ Thread-24] : write acquire 2018-12-29 11:45:30.870 INFO 53332 --- [ Thread-25] : ------write lock----------- 2018-12-29 11:45:37.243 INFO 53332 --- [ Thread-24] : write release 2018-12-29 11:45:37.276 INFO 53332 --- [ Thread-25] : write acquire 2018-12-29 11:45:47.276 INFO 53332 --- [ Thread-25] : write release
可以看出写锁是互斥的,另外写锁跟读锁也是互斥的,读锁跟读锁之间是共享的(自行测试)
修改方法中的线程个数,会发现并发越大,计数器执行时间越长,而且很大可能数据不准确。所以不适用于高并发的场景。