推荐阅读java
SpringCloud源码阅读0-SpringCloud必备知识spring
SpringCloud源码阅读1-EurekaServer源码的秘密缓存
配置类的做用通常就是配置框架运行的基本组件,因此看懂配置类,也就入了框架的门。app
当咱们在启动类上加入@EnableDiscoveryClient
或者@EnableEurekaClient
时,就能使Eureka客户端生效。框架
这两个注解最终都会使,Eureka客户端对应的配置类EurekaClientAutoConfiguration
生效。这里直接讲配置类,具体注解如何使他生效,不在此处赘述。dom
EurekaClientAutoConfiguration 做为EurekaClient的自动配置类,配了EurekaClient运行所须要的组件。ide
(1.注解上的Bean工具
@Configuration
@EnableConfigurationProperties//启动属性映射
@ConditionalOnClass(EurekaClientConfig.class)//须要EurekaClientConfig类存在
@Import(DiscoveryClientOptionalArgsConfiguration.class)//加载可选参数配置类到容器,
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)//须要开关存在
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)//须要eureka.client.enabled属性
//在这三个自动注入类以前解析
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
//在这三个自动注入类以后解析
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
}
复制代码
能够看出,注解大部分都是在作EurekaClientAutoConfiguration 配置类生效条件的判断。oop
其中@AutoConfigureAfter对应的三个配置类须要讲下:
RefreshAutoConfiguration:与刷新做用域有关的配置类,
EurekaDiscoveryClientConfiguration:
* 向容器中注入开关EurekaDiscoveryClientConfiguration.Marker
* 建立RefreshScopeRefreshedEvent事件监听器,
* 当eureka.client.healthcheck.enabled=true时,注入EurekaHealthCheckHandler用于健康检查
AutoServiceRegistrationAutoConfiguration:关于服务自动注册的相关配置。
复制代码
注解上没有引入重要组件。post
(2.类内部的Bean
(3. 内部类的Bean EurekaClientAutoConfiguration 内部有两个关于EurekaClient的配置类,
不论是哪一种EurekaClient都会注册三个组件:
(4. 分类总结:
EurekaClient 能够看作是客户端的上下文。他的初始化,卸载方法包括了客户端的整个生命周期
上文讲到,此时EurekaClient注册的是CloudEurekaClient。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//留下扩展点,能够经过参数配置各类处理器
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
//赋值applicationInfoManager属性
this.applicationInfoManager = applicationInfoManager;
//applicationInfoManager在初始化时,
//new InstanceInfoFactory().create(config);建立了当前实例信息
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
//备用注册处理器
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
//本地区域应用缓存初始化,Applications存储Application 列表
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//开始初始化默认区域
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 若是既不要向eureka server注册,又不要获取服务列表,就什么都不用初始化
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()){
....
return;
}
try {
//默认建立2个线程的调度池,用于TimedSupervisorTask任务。
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//心跳线程池,2个线程
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//缓存刷新线程池,2个线程
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//初始化与EurekaServer真正通讯的通讯器。
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
//配置区域映射器。可配置DNS映射。
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
//建立实例区域检查器。
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 若是须要从eureka server获取服务列表,而且尝试fetchRegistry(false)失败,
//调用BackupRegistry
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
//回调扩展点处理器。此处理器,在注册前处理。
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// 初始化全部定时任务
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//将client,clientConfig放到DiscoveryManager 统一管理,以便其余地方能够DI依赖注入。
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
}
复制代码
Eureka初始化仍是比较复杂,咱们找重点说说。
在DiscoveryClient初始化时,初始化EurekaTransport。
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
复制代码
EurekaTransport 是客户端与服务端底层通讯器。 有5个重要的属性:
scheduleServerEndpointTask方法完成EurekaTransport5个属性的初始化
transportClientFactory 属于低层次的http工厂。 EurekaHttpClientFactory 属于高层次的http工厂。 经过具备不一样功能的EurekaHttpClientFactory 工厂 对transportClientFactory 进行层层装饰。生产的http工具也具备不一样层次的功能。 列如: 最外层的SessionedEurekaHttpClient 具备会话功能的EurekaHttpClient 第二次RetryableEurekaHttpClient 具备重试功能的EurekaHttpClient
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//获取本地缓存
Applications applications = getApplications();
//若是增量拉取被禁用或是第一次拉取,全量拉取server端已经注册的服务实例信息
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//获取所有实例
getAndStoreFullRegistry();
} else {
//增量拉取服务实例
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 刷新本地缓存
onCacheRefreshed();
// 基于缓存中的实例数据更新远程实例状态, (发布StatusChangeEvent)
updateInstanceRemoteStatus();
// 注册表拉取成功后返回true
return true;
}
复制代码
全量获取最终调用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
复制代码
增量获取
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
复制代码
能够看出,客户端与服务端通讯底层是EurekaTransport在提供支持。
在此以前有必要说说:TimedSupervisorTask。 TimedSupervisorTask 是自动调节间隔的周期性任务,当不超时,将以初始化的间隔执行。当任务超时时,将下一个周期的间隔调大。每次超时都会增大相应倍数,直到外部设置的最大参数。一旦新任务再也不超时,间隔自动恢复默认值。
也就是说,这是一个具备自适应的周期性任务。(很是棒的设计啊)
private void initScheduledTasks() {
//1.若是获取服务列表,则建立周期性缓存更新(即获取服务列表任务)任务
if (clientConfig.shouldFetchRegistry()) {
//初始间隔时间(默认30秒)
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//最大倍数 默认10倍
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//执行TimedSupervisorTask ,监督CacheRefreshThread任务的执行。
//具体执行线程池cacheRefreshExecutor,具体任务CacheRefreshThread
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()//缓存刷新,调用fetchRegistry()获取服务列表
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//2. 如何注册,就建立周期性续租任务,维持心跳。
if (clientConfig.shouldRegisterWithEureka()) {
//心跳间隔,默认30秒。
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//最大倍数 默认10倍
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
//执行TimedSupervisorTask ,监督HeartbeatThread任务的执行。
//具体执行线程池heartbeatExecutor,具体任务HeartbeatThread
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
//3.建立应用实例信息复制器。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//4.建立状态改变监听器,监听StatusChangeEvent
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//状态有变化,使用信息复制器,执行一个任务,更新状态变化到注册中心
instanceInfoReplicator.onDemandUpdate();
}
};
//是否关注状态变化,将监听器添加到applicationInfoManager
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动InstanceInfo复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
复制代码
总结下initScheduledTasks()的工做: 如何配置获取服务
cacheRefresh
,监督服务获取CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)
线程的执行,默认每30秒执行一次。获取服务列表更新到本地缓存。任务内容refreshRegistry
,本地缓存localRegionApps
如何配置注册,
"heartbeat",
监督续约任务HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)
的执行,默认每30秒执行一次。任务内容为renew()
onDemandUpdate()
方法,更新变化到远程server(DiscoveryClient-InstanceInfoReplicator-%d)
,定时(默认40秒)检测当前实例的DataCenterInfo、LeaseInfo、InstanceStatus,若是有变动,执行InstanceInfoReplicator.this.run()
方法将变动信息同步到server下面咱们看看这几个重要的任务内容:
eurekaTransport.registrationClient.sendHeartBeat
向server发送当前实例信息public void run() {
try {
//1.刷新DataCenterInfo
//2.刷新LeaseInfo 租约信息
//3.从HealthCheckHandler中获取InstanceStatus
discoveryClient.refreshInstanceInfo();
//若是isInstanceInfoDirty=true代表须要更新,返回dirtyTimestamp。
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();//注册。
instanceInfo.unsetIsDirty(dirtyTimestamp);//注册完成,设置没有更新。
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
复制代码
能够看出实例注册就在discoveryClient.register()
。
那么第一次注册发生在何时呢?
initScheduledTasks方法中,执行instanceInfoReplicator.start时,会首先调用instanceInfo.setIsDirty(),初始化是否更新标志位为ture ,开启线程,40秒后发起第一次注册。(固然若是在这40秒内,若是有状态变化,会当即发起注册。)
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//发起注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
复制代码
能够看出,register方法本质也是经过eurekaTransport 来发起与server的通讯的。
注解@PreDestroy修饰的shutdown()会在Servlet被完全卸载以前执行。
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
//移除监听
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
//取消任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//下线
unregister();
}
//通讯中断
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
复制代码
EurekaAutoServiceRegistration实现了SmartLifecycle,会在spring启动完毕后,调用其start()方法。
@Override
public void start() {
// 设置端口
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//注册
this.serviceRegistry.register(this.registration);
//发布注册成功事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);//
}
}
复制代码
this.serviceRegistry.register(this.registration);
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
// 更改状态,会触发监听器的执行
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
复制代码
1.注册流程:
EurekaAutoServiceRegistration.start()
-->EurekaServiceRegistry.register(EurekaRegistration)
-->ApplicationInfoManager.setInstanceStatus 状态改变,
-->StatusChangeListener 监听器监听到状态改变
-->InstanceInfoReplicator.onDemandUpdate() 更新状态到server
-->InstanceInfoReplicator.run()
-->DiscoveryClient.register()
-->eurekaTransport.registrationClient.register(instanceInfo);
-->jerseyClient
2.客户端初始化的几个定时任务:
3.客户端的几个主要操做:
因为篇幅缘由,不少细节不能一一展现。本文志在说说一些原理,具体细节能够研读源码,会发现框架真优秀啊。