nacos-discovery中spring.factories:java
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
这里的核心类是NacosDiscoveryAutoConfiguration,它主要注册了NacosAutoServiceRegistrationspring
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }
NacosAutoServiceRegistrationbootstrap
继承了AbstractAutoServiceRegistration,它实现了ApplicationListener<WebServerInitializedEvent>,那么容器启动的最后阶段会去执行这里实现的onApplicationEvent方法。缓存
@Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); }
bind:服务器
@Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); // CAS修改端口 this.start(); // 调用start方法作一些事情 } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } }
能够看到start中会进行初始化的判断,而后调用register():数据结构
@Override public void register(Registration registration) { String serviceId = registration.getServiceId(); // 得到服务id Instance instance = getNacosInstanceFromRegistration(registration); // 建立instance对象 try { namingService.registerInstance(serviceId, instance); // 服务注册 log.info("nacos registry, {} {}:{} register finished", serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); } }
registerInstance:并发
@Override public void registerInstance(String serviceName, Instance instance) throws NacosException { registerInstance(serviceName, Constants.DEFAULT_GROUP, instance); } @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { // 是不是临时实例,默认true // 临时实例则构建心跳定时任务 BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); // DEFAULT_HEART_BEAT_INTERVAL 默认5s beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); // BeatReactor里面维护了一个ScheduledExecutorService,经过它添加定时任务 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } // 注册服务的逻辑 serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
首先看下添加心跳机制的addBeatInfo:app
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); // 直接添加一个定时任务BeatTask executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } // 向远程服务发送心跳请求,是一个PUT请求 long result = serverProxy.sendBeat(beatInfo); // 得到下次执行时间并添加到线程池中 long nextTime = result > 0 ? result : beatInfo.getPeriod(); executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
serverProxy.registerService:dom
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(9); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); }
能够看到也是向远程服务发送请求,只不过是POST类型。异步
至此客户端大概流程已分析完毕,接下来看下服务端逻辑:
InstanceController
能够看出是Restful风格的,一个实体一个方法,使用不一样的请求方式区分增删改查
服务注册:com.alibaba.nacos.naming.controllers.InstanceController#register
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 将request转为Instance对象 final Instance instance = parseInstance(request); // 服务注册 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
registerInstance:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 根据命名空间和服务名构建service createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 得到service Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加实例主要逻辑 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
createEmptyService:
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null); } public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { // 根据namespaceId得到serviceMap,它是一个ConcurrentHashMap // 根据serviceName从中取出服务 Service service = getService(namespaceId, serviceName); if (service == null) { // 服务为空则去构建 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 放入服务,初始化定时任务,入队 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
putServiceAndInit:
private void putServiceAndInit(Service service) throws NacosException { // 向serviceMap放入service putService(service); // 构建心跳检查任务 service.init(); // 向队列中添加一个Service,有异步线程处理 consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
心跳检查任务是一个ClientBeatCheckTask,主要逻辑:
// 得到全部实例 List<Instance> instances = service.allIPs(true); // 遍历实例集合 // 当前时间 - 上次心跳时间 > 超时时间(默认15s) // 超时则将健康状态设置为false for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } // 经过deleteIp方法调用本身本机的过时接口 for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } }
至此registerInstance方法中的createEmptyService逻辑已经分析完成,继续看addInstance的逻辑:
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 构建key,若是ephemeral为true则key中存在"ephemeral." String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { // 服务注册,使用CopyOnWrite思想 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); // 返回的instanceList是修改后的列表 Instances instances = new Instances(); instances.setInstanceList(instanceList); // key中存在"ephemeral."则consistencyService就是DistroConsistencyServiceImpl consistencyService.put(key, instances); } }
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put:
public void put(String key, Record value) throws NacosException { // 实例放入队列 onPut(key, value); // 若是是集群环境,将节点信息放入ConcurrentHashMap中 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
onPut:
notifier.addTask(key, DataOperation.CHANGE); public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); }
这里的tasks是
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
这里我有个没想通的点,为何要使用ArrayBlockingQueue?LinkedBlockingQueue性能更好才对
此处run方法会循环从队列中拿数据,拿到则去注册;由于是阻塞队列,队列为空时会睡眠,不存在CPU浪费的问题。
public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
distroProtocol.sync:集群环境所用,将修改的信息放入Map中,异步线程会拿到它去调用远程接口同步数据
public void sync(DistroKey distroKey, DataOperation action, long delay) { // 得到全部节点遍历 for (Member each : memberManager.allMembersWithoutSelf()) { // 节点信息封装为DistroKey DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); // 节点信息放入ConcurrentHashMap中 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } }
com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder:
@Component public class DistroTaskEngineHolder { private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); }
成员变量DistroDelayTaskExecuteEngine,它的父构造方法:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); // 向定时线程池添加ProcessRunnable processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }
ProcessRunnable逻辑:
private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } }
最终它会执行com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor#process,从上面的Map中取出信息放到一个队列中
com.alibaba.nacos.common.task.engine.TaskExecuteWorker.InnerWorker:
private class InnerWorker extends Thread { InnerWorker(String name) { setDaemon(false); setName(name); } @Override public void run() { while (!closed.get()) { try { Runnable task = queue.take(); long begin = System.currentTimeMillis(); task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { log.error("[DISTRO-FAILED] " + e.toString(), e); } } } }
从队列中拿出任务直接执行run方法,这里执行的是DistroSyncChangeTask中的逻辑:
public void run() { Loggers.DISTRO.info("[DISTRO-START] {}", toString()); try { String type = getDistroKey().getResourceType(); DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); if (!result) { handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e); handleFailedTask(); } }
这里syncData调用远程接口/distro/datum同步数据。
DistroProtocol
构造方法
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) { this.memberManager = memberManager; this.distroComponentHolder = distroComponentHolder; this.distroTaskEngineHolder = distroTaskEngineHolder; this.distroConfig = distroConfig; startDistroTask(); }
会调用startDistroTask():
private void startLoadTask() { DistroCallback loadCallback = new DistroCallback() { @Override public void onSuccess() { isInitialized = true; } @Override public void onFailed(Throwable throwable) { isInitialized = false; } }; GlobalExecutor.submitLoadDataTask( new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback)); }
向线程池添加定时任务DistroLoadDataTask:
public void run() { try { load(); if (!checkCompleted()) { GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis()); } else { loadCallback.onSuccess(); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success"); } } catch (Exception e) { loadCallback.onFailed(e); Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e); } } private void load() throws Exception { for (String each : distroComponentHolder.getDataStorageTypes()) { if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) { // 经过loadAllDataSnapshotFromRemote从远程拉取数据 loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); } } } private boolean loadAllDataSnapshotFromRemote(String resourceType) { DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); for (Member each : memberManager.allMembersWithoutSelf()) { try { DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); // 拉取数据的逻辑 boolean result = dataProcessor.processSnapshot(distroData); // 若是拉取成功则直接返回,保证只从一台服务器拉取数据 if (result) { return true; } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e); } } return false; }
拉取数据会调用远程的com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum:
最终调用dataProcessor.processData:
public boolean processData(DistroData distroData) { DistroHttpData distroHttpData = (DistroHttpData) distroData; Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent(); onPut(datum.key, datum.value); return true; }
会调用onPut去注册实例。
服务发现
客户端:
com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String):
核心逻辑在getServiceInfo方法中的getServiceInfo0方法中:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 从Map中获取服务 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 若是为空则调用updateServiceNow,得到远程服务列表放入到Map中 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 发布定时任务,定时拉取列表信息去更新Map scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
服务端:
服务端收到请求,进入com.alibaba.nacos.naming.controllers.InstanceController#list:
会调用doSrvIpxt,核心逻辑就是调用srvIPs方法:
public List<Instance> srvIPs(List<String> clusters) { if (CollectionUtils.isEmpty(clusters)) { clusters = new ArrayList<>(); clusters.addAll(clusterMap.keySet()); } // 得到实例集合并返回 return allIPs(clusters); } public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { // 这里的信息是注册时候放入的 Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } result.addAll(clusterObj.allIPs()); } return result; }
和Eureka对比
首先给阿里的编码风格点赞,看着就是舒服,Eureka代码一大片逻辑都写在一块儿了,相比下Nacos看起来简洁清爽了许多
Eureka注册部分代码:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 加读锁 read.lock(); // 得到微服务组 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); // 根据传入的id得到服务实例 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // 若是存在则赋值给registrant if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { registrant = existingLease.getHolder(); } } else { // 不存在,记录数量 synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } } // 使用registrant建立Lease // 会记录registrationTimestamp(服务注册时间) // lastUpdateTimestamp(最后操做时间) duration(失效时间数) Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { // 设置恢复正常时的状态 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 放入微服务组中 gMap.put(registrant.getId(), lease); } finally { read.unlock(); }
Eureka是同步去注册的,注册时加读锁,Nacos注册时直接入队列,异步线程去进行注册,注册时使用CopyOnWrite空间换时间,提高注册并发
服务发现对比
Nacos获取服务实例直接取自ephemeralInstances,Eureka服务注册和发现时会加锁,为了下降锁竞争,有三级缓存
// 无过时时间,保存服务信息的对外输出数据结构,定时从二级缓存拉取注册信息 private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); // 为了下降注册表registry读写锁竞争,下降读取频率,本质上是 guava 的缓存,包含定时失效机制 private final LoadingCache<Key, Value> readWriteCacheMap; private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
读取顺序:只读缓存->读写缓存->真实数据
只读缓存的数据只会来源于读写缓存,并且没有提供主动更新API。
读写缓存是使用Guava实现的,自己设置了 guava 的失效机制,隔一段时间后本身自动失效。
定时更新一级缓存的时候,会读取二级缓存,若是二级缓存没有数据,也会触发load,拉取registry的注册数据。