咱们的注册中心是用zookeeper来实现,因此要监控dubbo服务目录状态,其实本着仍是对zookeeper的操做,能获取dubbo的服务目录,意味着咱们能对其服务进行进一步的管理提供了可能,我这里说的java是利用Curator结合spring来对dubbo目录进行监听的。html
在pom.xml中要引入以下代码段:java
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>com.netflix.curator</groupId> <artifactId>curator-framework</artifactId> <version>1.1.10</version> </dependency>
咱们要加一段properties文件内容,内容以下:node
## Dubbo dubbo.registry.address=192.168.43.33:2181 zookeeper.sessionTimeoutMs=6000 zookeeper.connectionTimeoutMs=6000 zookeeper.maxRetries=3 zookeeper.baseSleepTimeMs=1000 zookeeper.authstr=srp:srp
接下来实现Curator的zookeeper客户端代码:spring
package com.ws.zklisent; import com.ws.zklisent.ZkClientListener; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZkConfiguration { @Value("${dubbo.registry.address}") private String zookeeperServer; @Value(("${zookeeper.sessionTimeoutMs}")) private int sessionTimeoutMs; @Value("${zookeeper.connectionTimeoutMs}") private int connectionTimeoutMs; @Value("${zookeeper.maxRetries}") private int maxRetries; @Value("${zookeeper.baseSleepTimeMs}") private int baseSleepTimeMs; @Value("${zookeeper.authstr}") private String authstr; @Bean(initMethod = "init", destroyMethod = "stop") public ZkClientListener zkClient() { ZkClientListener zkClient = new ZkClientListener(); zkClient.setZookeeperServer(zookeeperServer); zkClient.setSessionTimeoutMs(sessionTimeoutMs); zkClient.setConnectionTimeoutMs(connectionTimeoutMs); zkClient.setMaxRetries(maxRetries); zkClient.setBaseSleepTimeMs(baseSleepTimeMs); zkClient.setAuthstr(authstr); return zkClient; } }
package com.ws.zklisent; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkClientListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static CuratorFramework client =null; private NodeCache nodeCache; private PathChildrenCache pathChildrenCache; private TreeCache treeCache; private String zookeeperServer; private int sessionTimeoutMs; private int connectionTimeoutMs; private int baseSleepTimeMs; private int maxRetries; private String authstr; public String getAuthstr() { return authstr; } public void setAuthstr(String authstr) { this.authstr = authstr; } public void setZookeeperServer(String zookeeperServer) { this.zookeeperServer = zookeeperServer; } public String getZookeeperServer() { return zookeeperServer; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public int getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setBaseSleepTimeMs(int baseSleepTimeMs) { this.baseSleepTimeMs = baseSleepTimeMs; } public int getBaseSleepTimeMs() { return baseSleepTimeMs; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public int getMaxRetries() { return maxRetries; } public void init() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy) .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).authorization("digest", authstr.getBytes()).build(); client.start(); try { watcherPath("/dubbo") ; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void stop() { if (client != null) CloseableUtils.closeQuietly(client); if (pathChildrenCache != null) CloseableUtils.closeQuietly(pathChildrenCache); if (nodeCache != null) CloseableUtils.closeQuietly(nodeCache); if (treeCache != null) CloseableUtils.closeQuietly(treeCache); } // 对path进行监听配置 public static void watcherPath(String path) throws Exception { //子节点的监听 PathChildrenCache cache = new PathChildrenCache(client, path, false); cache.start(); // 注册监听 cache.getListenable().addListener(plis); //对path路径下全部孩子节点的监听 TreeCache treeCache=new TreeCache(client, path); treeCache.start(); treeCache.getListenable().addListener(treeCacheListener); } /** * 子节点的监听 */ static PathChildrenCacheListener plis = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: { System.out .println("Node added: " + ZKPaths.getNodeFromPath(event.getData() .getPath())); break; } case CHILD_UPDATED: { System.out .println("Node changed: " + ZKPaths.getNodeFromPath(event.getData() .getPath())); break; } case CHILD_REMOVED: { System.out .println("Node removed: " + ZKPaths.getNodeFromPath(event.getData() .getPath())); break; } } } }; /** * 全部子节点的监听 */ static TreeCacheListener treeCacheListener=new TreeCacheListener() { public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { // TODO Auto-generated method stub switch (event.getType()) { case NODE_ADDED: System.out .println("TreeNode added: " +event.getData() .getPath()+" , data: "+new String(event.getData().getData())); break; case NODE_UPDATED: System.out .println("TreeNode updated: "+event.getData() .getPath()+" , data: "+new String(event.getData().getData())); break; case NODE_REMOVED: System.out .println("TreeNode removed: "+event.getData() .getPath()); break; default: break; } } }; }
项目跑起来控制台会打印相应日志:apache