## 建立注册中心apache
服务注册操做对于 Dubbo 来讲不是必需的,经过服务直连的方式就能够绕过注册中心。但一般咱们不会这么作,直连方式不利于服务治理,仅推荐在测试服务时使用。对于 Dubbo 来讲,注册中心虽不是必需,但倒是必要的。app
在dubbo-config.xml配置注册中心,指定zookeeper做为注册中心实现方式ide
```测试
<!-- zookeeper注册中心--> <dubbo:registry protocol="zookeeper" address="${dubbo.registry.address}" check="false" port="${dubbo.protocol.port}"> <dubbo:parameter key="qos.enable" value="false"/> </dubbo:registry>
```ui
在RegistryProtocal中注册过程在export方法this
```url
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException { RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker); URL registryUrl = this.getRegistryUrl(originInvoker); Registry registry = this.getRegistry(originInvoker); URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); //服务注册到注册中心 if (register) { this.register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl); RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
```spa
register方法以下code
```router
public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = this.registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
```
ResistryFactory的有多种实现
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL var1); }
RegistryFactory接口是@SPI注解,若是未指定,默认使用dubbo做为注册中心
下面看下Zookeeper的实现
ZookeeperRegistry继承AbstractRegistry类
```
public abstract class AbstractRegistryFactory implements RegistryFactory { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); private static final ReentrantLock LOCK = new ReentrantLock(); private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap(); public AbstractRegistryFactory() { } public static Collection<Registry> getRegistries() { return Collections.unmodifiableCollection(REGISTRIES.values()); } //使用重入锁销毁注册中心 public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } LOCK.lock(); try { Iterator i$ = getRegistries().iterator(); while(i$.hasNext()) { Registry registry = (Registry)i$.next(); try { registry.destroy(); } catch (Throwable var6) { LOGGER.error(var6.getMessage(), var6); } } REGISTRIES.clear(); } finally { LOCK.unlock(); } } //获取注册中心实例 public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"}); String key = url.toServiceString(); LOCK.lock(); Registry var4; try { Registry registry = (Registry)REGISTRIES.get(key); if (registry == null) { registry = this.createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); var4 = registry; return var4; } var4 = registry; } finally { LOCK.unlock(); } return var4; } protected abstract Registry createRegistry(URL var1); }
```
createRegistry方法须要实现类重写
public class ZookeeperRegistry extends FailbackRegistry { private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); private static final int DEFAULT_ZOOKEEPER_PORT = 2181; private static final String DEFAULT_ROOT = "dubbo"; private final String root; private final Set<String> anyServices = new ConcurrentHashSet(); private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap(); private final ZookeeperClient zkClient; public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } else { //设置group 默认dubbo String group = url.getParameter("group", "dubbo"); if (!group.startsWith("/")) { group = "/" + group; } this.root = group; // 建立 Zookeeper 客户端 this.zkClient = zookeeperTransporter.connect(url); // 添加状态监听器 this.zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == 2) { try { ZookeeperRegistry.this.recover(); } catch (Exception var3) { ZookeeperRegistry.logger.error(var3.getMessage(), var3); } } } }); } } static String appendDefaultPort(String address) { if (address != null && address.length() > 0) { int i = address.indexOf(58); if (i < 0) { return address + ":" + 2181; } if (Integer.parseInt(address.substring(i + 1)) == 0) { return address.substring(0, i + 1) + 2181; } } return address; } public boolean isAvailable() { return this.zkClient.isConnected(); } public void destroy() { super.destroy(); try { this.zkClient.close(); } catch (Exception var2) { logger.warn("Failed to close zookeeper client " + this.getUrl() + ", cause: " + var2.getMessage(), var2); } } protected void doRegister(URL url) { try { this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true)); } catch (Throwable var3) { throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } } protected void doUnregister(URL url) { try { this.zkClient.delete(this.toUrlPath(url)); } catch (Throwable var3) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } } protected void doSubscribe(final URL url, final NotifyListener listener) { try { if ("*".equals(url.getServiceInterface())) { String root = this.toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { Iterator i$ = currentChilds.iterator(); while(i$.hasNext()) { String child = (String)i$.next(); child = URL.decode(child); if (!ZookeeperRegistry.this.anyServices.contains(child)) { ZookeeperRegistry.this.anyServices.add(child); ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener); } } } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(root, false); List<String> services = this.zkClient.addChildListener(root, zkListener); if (services != null && services.size() > 0) { Iterator i$ = services.iterator(); while(i$.hasNext()) { String service = (String)i$.next(); service = URL.decode(service); this.anyServices.add(service); this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener); } } } else { List<URL> urls = new ArrayList(); String[] arr$ = this.toCategoriesPath(url); int len$ = arr$.length; for(int i$ = 0; i$ < len$; ++i$) { String path = arr$[i$]; ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners == null) { this.zkListeners.putIfAbsent(url, new ConcurrentHashMap()); listeners = (ConcurrentMap)this.zkListeners.get(url); } ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(path, false); List<String> children = this.zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(this.toUrlsWithEmpty(url, path, children)); } } this.notify(url, listener, urls); } } catch (Throwable var11) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: " + var11.getMessage(), var11); } } protected void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url); if (listeners != null) { ChildListener zkListener = (ChildListener)listeners.get(listener); if (zkListener != null) { this.zkClient.removeChildListener(this.toUrlPath(url), zkListener); } } } public List<URL> lookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } else { try { List<String> providers = new ArrayList(); String[] arr$ = this.toCategoriesPath(url); int len$ = arr$.length; for(int i$ = 0; i$ < len$; ++i$) { String path = arr$[i$]; List<String> children = this.zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } return this.toUrlsWithoutEmpty(url, providers); } catch (Throwable var8) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + this.getUrl() + ", cause: " + var8.getMessage(), var8); } } } private String toRootDir() { return this.root.equals("/") ? this.root : this.root + "/"; } private String toRootPath() { return this.root; } private String toServicePath(URL url) { String name = url.getServiceInterface(); return "*".equals(name) ? this.toRootPath() : this.toRootDir() + URL.encode(name); } private String[] toCategoriesPath(URL url) { String[] categroies; if ("*".equals(url.getParameter("category"))) { categroies = new String[]{"providers", "consumers", "routers", "configurators"}; } else { categroies = url.getParameter("category", new String[]{"providers"}); } String[] paths = new String[categroies.length]; for(int i = 0; i < categroies.length; ++i) { paths[i] = this.toServicePath(url) + "/" + categroies[i]; } return paths; } private String toCategoryPath(URL url) { return this.toServicePath(url) + "/" + url.getParameter("category", "providers"); } private String toUrlPath(URL url) { return this.toCategoryPath(url) + "/" + URL.encode(url.toFullString()); } private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList(); if (providers != null && providers.size() > 0) { Iterator i$ = providers.iterator(); while(i$.hasNext()) { String provider = (String)i$.next(); provider = URL.decode(provider); if (provider.contains("://")) { URL url = URL.valueOf(provider); if (UrlUtils.isMatch(consumer, url)) { urls.add(url); } } } } return urls; } private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { List<URL> urls = this.toUrlsWithoutEmpty(consumer, providers); if (urls == null || urls.isEmpty()) { int i = path.lastIndexOf(47); String category = i < 0 ? path : path.substring(i + 1); URL empty = consumer.setProtocol("empty").addParameter("category", category); urls.add(empty); } return urls; } }
下面分析CuratorZookeeperClient实现
public CuratorZookeeperClient(URL url) { super(url); try { // 建立 CuratorFramework 构造器 Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 构建 CuratorFramework 实例 this.client = builder.build(); this.client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(0); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(1); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(2); } } }); this.client.start(); } catch (Exception var4) { throw new IllegalStateException(var4.getMessage(), var4); } }
以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下
## 服务注册
用法
<dubbo:service retries="0" interface="com.rbsn.tms.operatebill.service.WechatPayService" ref="wechatPayService"/>
服务注册在FailbackRegistry类中
```
public void register(URL url) { if (!this.destroyed.get()) { super.register(url); this.failedRegistered.remove(url); this.failedUnregistered.remove(url); try { //服务注册 this.doRegister(url); } catch (Exception var6) { Throwable t = var6; // 获取 check 参数,若 check = true 将会直接抛出异常 boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true) && !"consumer".equals(url.getProtocol()); boolean skipFailback = var6 instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = var6.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + this.getUrl().getAddress() + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t); } this.logger.error("Failed to register " + url + ", waiting for retry, cause: " + var6.getMessage(), var6); this.failedRegistered.add(url); } } }
```
doRegistry方法在ZookeeperResistry中, 经过 Zookeeper 客户端建立节点,节点路径由 toUrlPath 方法生成,
路径格式以下: // /${group}/${serviceInterface}/providers/${url} // 好比 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
```
protected void doRegister(URL url) { try { this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true)); } catch (Throwable var3) { throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } }
```
create方法在AbstractZookeeperClient类中,ephemeral参数表示是否建立临时节点 默认true
```
public void create(String path, boolean ephemeral) { int i = path.lastIndexOf(47); if (i > 0) { String parentPath = path.substring(0, i); // 若是要建立的节点类型非临时节点,那么这里要检测节点是否存在 if (!this.checkExists(parentPath)) { this.create(parentPath, false); } } if (ephemeral) { //建立临时节点 this.createEphemeral(path); } else { //建立持久化节点 this.createPersistent(path); } }
```
经过递归建立当前节点的上一级路径,而后再根据 ephemeral 的值决定建立临时仍是持久节点,createEphemeral方法实现以下
public void createEphemeral(String path) { try { ((ACLBackgroundPathAndBytesable)this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(path); } catch (NodeExistsException var3) { ; } catch (Exception var4) { throw new IllegalStateException(var4.getMessage(), var4); } }
整个过程可简单总结为:先建立注册中心实例,以后再经过注册中心实例注册服务